source: trunk/src/concurrent.h

Last change on this file was 2395, checked in by KohjiNakamura, 12 years ago

comments changed

File size: 5.5 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#ifndef CONCURRENT_H
13#define CONCURRENT_H
14
15#include <assert.h>
16#include <pthread.h>
17
18namespace concurrent {
19        class PCException {
20        public:
21                virtual void raise() /*throw(PCException)*/ { throw *this; }
22                virtual ~PCException() {}
23        };
24
25        //typedef void (*produce_t)(void *context) throw(PCException);
26        //typedef void (*consume_t)(void *context) throw(PCException);
27
28        class Mutex {
29                pthread_mutex_t mutex;
30        public:
31                Mutex() throw(int);
32                virtual ~Mutex();
33                void lock() throw(int);
34                /**
35                 * Returns true if this thread could lock.
36                 * Returns false if already locked.
37                 */
38                bool try_lock() throw(int);
39                void unlock() throw(int);
40        private:
41                Mutex(Mutex const &other);
42                Mutex &operator =(Mutex const &other);
43        };
44
45        class Semaphore {
46                pthread_mutex_t mutex;
47                pthread_cond_t cond;
48                unsigned sem;
49        public:
50                explicit Semaphore(unsigned initial = 0U) throw(int);
51                void up(unsigned amount = 1U) throw(int);
52                void down(unsigned amount = 1U) throw(int);
53                virtual ~Semaphore();
54        private:
55                Semaphore(Semaphore const &other);
56                Semaphore &operator =(Semaphore const &other);
57        };
58
59        class FIFOException {
60        public:
61                virtual void raise() { throw *this; }
62                virtual ~FIFOException() {}
63        };
64
65        class EmptyException {
66        public:
67                virtual void raise() { throw *this; }
68                virtual ~EmptyException() {}
69        };
70
71        class FullException {
72        public:
73                virtual void raise() { throw *this; }
74                virtual ~FullException() {}
75        };
76
77        template <class T, size_t N>
78        class FIFO {
79                T elements[N + 1]; // +1 to make an implementation simple.
80                Mutex mutex;
81                size_t head;
82                size_t tail;
83
84                size_t wrap(size_t n) {
85                        return n % (N + 1);
86                }
87
88                void reset() {
89                        head = tail = 0;
90                }
91
92        public:
93                FIFO() {
94                        assert(N > 0);
95                        reset();
96                }
97                virtual ~FIFO() {
98                        mutex.lock(); // wait until current lock is released.
99                        mutex.unlock();
100                }
101
102                virtual void lock() {
103                        mutex.lock();
104                }
105
106                /**
107                 * Returns true if this thread could lock.
108                 * Returns false if already locked.
109                 */
110                virtual bool try_lock() {
111                        return mutex.try_lock();
112                }
113
114                virtual void unlock() {
115                        mutex.unlock();
116                }
117
118                virtual void clear() {
119                        reset();
120                }
121
122                virtual void put(T const &value) throw(FullException) {
123                        size_t new_tail = wrap(tail + 1);
124                        if (head == new_tail) {
125                                throw FullException();
126                        }
127                        elements[tail] = value;
128                        tail = new_tail;
129                }
130
131                virtual T get() throw(EmptyException) {
132                        if (head == tail) {
133                                throw EmptyException();
134                        }
135                        T result = elements[head];
136                        head = wrap(head + 1);
137                        return result;
138                }
139
140                /**
141                 * Returns capacity size.
142                 */
143                virtual size_t size() const {
144                        return N;
145                }
146
147                /**
148                 * Returns number of elements in this FIFO.
149                 */
150                virtual size_t length() const {
151                        size_t result = tail - head;
152                        if (head > tail) {
153                                result = N + 1 - (head - tail);
154                        }
155                        return result;
156                }
157        private:
158                FIFO(FIFO const &other);
159                FIFO &operator =(FIFO const &other);
160        };
161
162        class Broker {
163        protected:
164                enum ThreadSpec {
165                        ProdAsMaster, ConsAsMaster, Unspecified
166                };
167                bool (*producer)(void *context) throw(PCException);
168                void (*consumer)(void *context) throw(PCException);
169                virtual void _run(void *context, unsigned do_ahead, ThreadSpec threadSpec) throw(PCException);
170        public:
171                Broker(bool (*producer)(void *context) throw(PCException),
172                           void (*consumer)(void *context) throw(PCException));
173                virtual ~Broker();
174                static void enableNested();
175                static void disableNested();
176                static void setNestedState(bool nested);
177                static bool getNestedState();
178
179                virtual void run(void *context, unsigned do_ahead=1) throw(PCException);
180                virtual void runProducerAsMasterThread(void *context, unsigned do_ahead=1) throw(PCException);
181                virtual void runConsumerAsMasterThread(void *context, unsigned do_ahead=1) throw(PCException);
182                virtual void runSequential(void *context) throw(PCException);
183        };
184
185#if 0
186        template <class Context, class Product>
187                class Producer {
188        public:
189                virtual Product produce(Context &ctx) throw(PCException) = 0;
190                virtual ~Producer() {}
191        };
192
193        template <class Context, class Product>
194                class Consumer {
195        public:
196                virtual void consume(Context &ctx, Product const&product) throw(PCException) = 0;
197                virtual ~Consumer() {}
198        };
199
200class ProdCons {
201 public:
202        virtual ~ProdCons() {}
203
204        virtual void runProducerAsMasterThread(void *context) throw(PCException) = 0;
205        virtual void runConsumerAsMasterThread(void *context) throw(PCException) = 0;
206        virtual void produce(void *context) throw(PCException) = 0;
207        virtual void consume(void *context) throw(PCException) = 0;
208
209        /**
210         * <src>produce()</src> should  call this method to
211         * let consumer to know it is ready.
212         */
213        virtual void ready() = 0;
214
215        /**
216         * Returns true if ready,
217         * otherwise, finished or there was an error, returns false.
218         *
219         * <src>consume()</src> should call this method
220         * to check if it is ready or not.
221         * If false is returned, <src>isError()</src> or/and
222         * <src>isFinished()</src> should be called to see the reason.
223         */
224        virtual bool waitForReady() = 0;
225
226        /**
227         * Returns true if <src>produce()</src> was finished,
228         * otherwise returns false.
229         */
230        virtual bool isFinished() = 0;
231
232        /**
233         * <src>produce()</src>/<src>consume()</src> should call this method
234         * to report error to <src>consume()</src>/<src>produce()</src>.
235         */
236        virtual void reportError(void *errorInfo) = 0;
237
238        /**
239         * Returns true and set errorInfo if <src>reportError()</src> is called,
240         * otherwise returns false and set errorInfo to NULL.
241         */
242        virtual bool isError(void &*errorInfo) = 0;
243};
244#endif
245
246}
247
248
249#endif
Note: See TracBrowser for help on using the repository browser.