source: trunk/src/concurrent.h@ 2593

Last change on this file since 2593 was 2395, checked in by KohjiNakamura, 13 years ago

comments changed

File size: 5.5 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#ifndef CONCURRENT_H
13#define CONCURRENT_H
14
15#include <assert.h>
16#include <pthread.h>
17
18namespace concurrent {
19 class PCException {
20 public:
21 virtual void raise() /*throw(PCException)*/ { throw *this; }
22 virtual ~PCException() {}
23 };
24
25 //typedef void (*produce_t)(void *context) throw(PCException);
26 //typedef void (*consume_t)(void *context) throw(PCException);
27
28 class Mutex {
29 pthread_mutex_t mutex;
30 public:
31 Mutex() throw(int);
32 virtual ~Mutex();
33 void lock() throw(int);
34 /**
35 * Returns true if this thread could lock.
36 * Returns false if already locked.
37 */
38 bool try_lock() throw(int);
39 void unlock() throw(int);
40 private:
41 Mutex(Mutex const &other);
42 Mutex &operator =(Mutex const &other);
43 };
44
45 class Semaphore {
46 pthread_mutex_t mutex;
47 pthread_cond_t cond;
48 unsigned sem;
49 public:
50 explicit Semaphore(unsigned initial = 0U) throw(int);
51 void up(unsigned amount = 1U) throw(int);
52 void down(unsigned amount = 1U) throw(int);
53 virtual ~Semaphore();
54 private:
55 Semaphore(Semaphore const &other);
56 Semaphore &operator =(Semaphore const &other);
57 };
58
59 class FIFOException {
60 public:
61 virtual void raise() { throw *this; }
62 virtual ~FIFOException() {}
63 };
64
65 class EmptyException {
66 public:
67 virtual void raise() { throw *this; }
68 virtual ~EmptyException() {}
69 };
70
71 class FullException {
72 public:
73 virtual void raise() { throw *this; }
74 virtual ~FullException() {}
75 };
76
77 template <class T, size_t N>
78 class FIFO {
79 T elements[N + 1]; // +1 to make an implementation simple.
80 Mutex mutex;
81 size_t head;
82 size_t tail;
83
84 size_t wrap(size_t n) {
85 return n % (N + 1);
86 }
87
88 void reset() {
89 head = tail = 0;
90 }
91
92 public:
93 FIFO() {
94 assert(N > 0);
95 reset();
96 }
97 virtual ~FIFO() {
98 mutex.lock(); // wait until current lock is released.
99 mutex.unlock();
100 }
101
102 virtual void lock() {
103 mutex.lock();
104 }
105
106 /**
107 * Returns true if this thread could lock.
108 * Returns false if already locked.
109 */
110 virtual bool try_lock() {
111 return mutex.try_lock();
112 }
113
114 virtual void unlock() {
115 mutex.unlock();
116 }
117
118 virtual void clear() {
119 reset();
120 }
121
122 virtual void put(T const &value) throw(FullException) {
123 size_t new_tail = wrap(tail + 1);
124 if (head == new_tail) {
125 throw FullException();
126 }
127 elements[tail] = value;
128 tail = new_tail;
129 }
130
131 virtual T get() throw(EmptyException) {
132 if (head == tail) {
133 throw EmptyException();
134 }
135 T result = elements[head];
136 head = wrap(head + 1);
137 return result;
138 }
139
140 /**
141 * Returns capacity size.
142 */
143 virtual size_t size() const {
144 return N;
145 }
146
147 /**
148 * Returns number of elements in this FIFO.
149 */
150 virtual size_t length() const {
151 size_t result = tail - head;
152 if (head > tail) {
153 result = N + 1 - (head - tail);
154 }
155 return result;
156 }
157 private:
158 FIFO(FIFO const &other);
159 FIFO &operator =(FIFO const &other);
160 };
161
162 class Broker {
163 protected:
164 enum ThreadSpec {
165 ProdAsMaster, ConsAsMaster, Unspecified
166 };
167 bool (*producer)(void *context) throw(PCException);
168 void (*consumer)(void *context) throw(PCException);
169 virtual void _run(void *context, unsigned do_ahead, ThreadSpec threadSpec) throw(PCException);
170 public:
171 Broker(bool (*producer)(void *context) throw(PCException),
172 void (*consumer)(void *context) throw(PCException));
173 virtual ~Broker();
174 static void enableNested();
175 static void disableNested();
176 static void setNestedState(bool nested);
177 static bool getNestedState();
178
179 virtual void run(void *context, unsigned do_ahead=1) throw(PCException);
180 virtual void runProducerAsMasterThread(void *context, unsigned do_ahead=1) throw(PCException);
181 virtual void runConsumerAsMasterThread(void *context, unsigned do_ahead=1) throw(PCException);
182 virtual void runSequential(void *context) throw(PCException);
183 };
184
185#if 0
186 template <class Context, class Product>
187 class Producer {
188 public:
189 virtual Product produce(Context &ctx) throw(PCException) = 0;
190 virtual ~Producer() {}
191 };
192
193 template <class Context, class Product>
194 class Consumer {
195 public:
196 virtual void consume(Context &ctx, Product const&product) throw(PCException) = 0;
197 virtual ~Consumer() {}
198 };
199
200class ProdCons {
201 public:
202 virtual ~ProdCons() {}
203
204 virtual void runProducerAsMasterThread(void *context) throw(PCException) = 0;
205 virtual void runConsumerAsMasterThread(void *context) throw(PCException) = 0;
206 virtual void produce(void *context) throw(PCException) = 0;
207 virtual void consume(void *context) throw(PCException) = 0;
208
209 /**
210 * <src>produce()</src> should call this method to
211 * let consumer to know it is ready.
212 */
213 virtual void ready() = 0;
214
215 /**
216 * Returns true if ready,
217 * otherwise, finished or there was an error, returns false.
218 *
219 * <src>consume()</src> should call this method
220 * to check if it is ready or not.
221 * If false is returned, <src>isError()</src> or/and
222 * <src>isFinished()</src> should be called to see the reason.
223 */
224 virtual bool waitForReady() = 0;
225
226 /**
227 * Returns true if <src>produce()</src> was finished,
228 * otherwise returns false.
229 */
230 virtual bool isFinished() = 0;
231
232 /**
233 * <src>produce()</src>/<src>consume()</src> should call this method
234 * to report error to <src>consume()</src>/<src>produce()</src>.
235 */
236 virtual void reportError(void *errorInfo) = 0;
237
238 /**
239 * Returns true and set errorInfo if <src>reportError()</src> is called,
240 * otherwise returns false and set errorInfo to NULL.
241 */
242 virtual bool isError(void &*errorInfo) = 0;
243};
244#endif
245
246}
247
248
249#endif
Note: See TracBrowser for help on using the repository browser.