source: trunk/src/concurrent.cpp@ 2454

Last change on this file since 2454 was 2446, checked in by KohjiNakamura, 13 years ago

fixed to be able to compile without omp.h

File size: 5.9 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#ifdef _OPENMP
14#include <omp.h>
15#endif
16#include <errno.h>
17#include <assert.h>
18#include <limits.h>
19#include "concurrent.h"
20
21//#define LOG(x) do {}while(false)
22#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
23
24namespace concurrent {
25/* ======================= Mutex ======================= */
26Mutex::Mutex() throw(int)
27{
28 int result = pthread_mutex_init(&mutex, NULL);
29 if (result != 0) {
30 LOG(result);
31 throw result;
32 }
33}
34
35Mutex::~Mutex()
36{
37 int result = pthread_mutex_destroy(&mutex);
38 if (result != 0) {
39 LOG(result);
40 }
41}
42
43void Mutex::lock() throw(int)
44{
45 int result = pthread_mutex_lock(&mutex);
46 if (result != 0) {
47 LOG(result);
48 throw result;
49 }
50}
51
52bool Mutex::try_lock() throw(int)
53{
54 int result = pthread_mutex_trylock(&mutex);
55 if (result == 0) {
56 return true;
57 }
58 if (result == EBUSY) {
59 return false;
60 }
61 LOG(result);
62 throw result;
63}
64
65void Mutex::unlock() throw(int)
66{
67 int result = pthread_mutex_unlock(&mutex);
68 if (result != 0) {
69 LOG(result);
70 throw result;
71 }
72}
73
74/* ======================= Semaphore ======================= */
75Semaphore::Semaphore(unsigned initial) throw(int)
76{
77 //mutex = PTHREAD_MUTEX_INITIALIZER;
78 //cond = PTHREAD_COND_INITIALIZER;
79 sem = initial;
80
81 int result = pthread_mutex_init(&mutex, NULL);
82 if (result != 0) {
83 LOG(result);
84 throw result;
85 }
86
87 result = pthread_cond_init(&cond, NULL);
88 if (result != 0) {
89 pthread_mutex_destroy(&mutex);
90 LOG(result);
91 throw result;
92 }
93}
94
95Semaphore::~Semaphore()
96{
97 int result = pthread_mutex_destroy(&mutex);
98 result = pthread_cond_destroy(&cond);
99}
100
101void Semaphore::up(unsigned amount) throw(int)
102{
103 assert(0 < amount && amount <= UINT_MAX - sem);
104 int result = pthread_mutex_lock(&mutex);
105 if (result == 0) {
106 sem += amount;
107 result = pthread_cond_signal(&cond);
108 int result2 = pthread_mutex_unlock(&mutex);
109 if (result == 0 && result2 == 0) {
110 return;
111 }
112 if (result != 0 && result2 != 0) {
113 LOG(result);
114 result = 0;
115 }
116 if (result == 0) {
117 result = result2;
118 }
119 }
120 LOG(result);
121 throw result;
122}
123
124void Semaphore::down(unsigned amount) throw(int)
125{
126 assert(0 < amount);
127 int result = pthread_mutex_lock(&mutex);
128 if (result == 0) {
129 while (sem < amount) {
130 result = pthread_cond_wait(&cond, &mutex);
131 if (result != 0) {
132 LOG(result);
133 break;
134 }
135 }
136 if (sem >= amount) {
137 sem -= amount;
138 }
139 int result2 = pthread_mutex_unlock(&mutex);
140 if (result == 0 && result2 == 0) {
141 return;
142 }
143 if (result != 0 && result2 != 0) {
144 LOG(result);
145 result = 0;
146 }
147 if (result == 0) {
148 result = result2;
149 }
150 }
151 LOG(result);
152 throw result;
153}
154
155/* ======================= Broker ======================= */
156Broker::Broker(bool (*producer)(void *context) throw(PCException),
157 void (*consumer)(void *context) throw(PCException))
158{
159 this->producer = producer;
160 this->consumer = consumer;
161}
162
163Broker::~Broker()
164{
165}
166
167void Broker::enableNested()
168{
169#ifdef _OPENMP
170 omp_set_nested(1);
171#endif
172}
173
174void Broker::disableNested()
175{
176#ifdef _OPENMP
177 omp_set_nested(0);
178#endif
179}
180
181void Broker::setNestedState(bool nested)
182{
183#ifdef _OPENMP
184 omp_set_nested(static_cast<int>(nested));
185#endif
186}
187
188bool Broker::getNestedState()
189{
190#ifdef _OPENMP
191 return omp_get_nested() ? true : false;
192#else
193 return false;
194#endif
195}
196
197void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
198 throw(PCException)
199{
200 _run(context, do_ahead, ProdAsMaster);
201}
202
203void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
204 throw(PCException)
205{
206 _run(context, do_ahead, ConsAsMaster);
207}
208
209void Broker::run(void *context, unsigned do_ahead) throw(PCException)
210{
211 _run(context, do_ahead, Unspecified);
212}
213
214void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
215 throw(PCException)
216{
217 assert(do_ahead > 0);
218#ifdef _OPENMP
219 PCException *prodEx = NULL;
220 PCException *consEx = NULL;
221 unsigned queuedJobs = 0;
222 int consumerTerminated = 0;
223 Semaphore semaphore_for_consumer;
224 Semaphore semaphore_for_producer(do_ahead);
225
226 #pragma omp parallel num_threads(2) \
227 shared(semaphore_for_consumer, semaphore_for_producer, \
228 consumerTerminated, queuedJobs)
229 {
230 //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
231 bool runProd = true;
232 if (threadSpec == Unspecified) {
233 #pragma omp single
234 {
235 runProd = false;
236 }
237 } else {
238 bool isMaster = false;
239 #pragma omp master
240 {
241 isMaster = true;
242 }
243 if (threadSpec == ProdAsMaster) {
244 runProd = isMaster;
245 } else { // ConsAsMaster
246 runProd = ! isMaster;
247 }
248 }
249
250 if (runProd) { // producer
251 for (;;) {
252 semaphore_for_producer.down();
253 int consumerDead = 0;
254 #pragma omp atomic
255 consumerDead += consumerTerminated;
256 if (consumerDead) {
257 break;
258 }
259 try {
260 bool produced = producer(context);
261 if (! produced) {
262 break;
263 }
264 } catch (PCException &e) {
265 prodEx = &e;
266 break;
267 }
268 #pragma omp atomic
269 queuedJobs++;
270 semaphore_for_consumer.up();
271 }
272 // additional 'up' to give consumer a chance to terminate.
273 semaphore_for_consumer.up();
274 } else { // consumer
275 for (;;) {
276 semaphore_for_consumer.down();
277 unsigned remainingJobs = 0U;
278 #pragma omp atomic
279 remainingJobs += queuedJobs;
280 if (remainingJobs == 0U) {
281 break;
282 }
283 #pragma omp atomic
284 queuedJobs--;
285 try {
286 consumer(context);
287 } catch (PCException &e) {
288 consEx = &e;
289 break;
290 }
291 semaphore_for_producer.up();
292 }
293 #pragma omp atomic
294 consumerTerminated++;
295 // additional 'up' to give producer a chance to terminate.
296 semaphore_for_producer.up();
297 }
298 }
299 if (prodEx) {
300 prodEx->raise();
301 } else if (consEx) {
302 consEx->raise();
303 }
304#else
305 runSequential(context);
306#endif
307}
308
309void Broker::runSequential(void *context) throw(PCException)
310{
311 for (;;) {
312 bool produced = producer(context);
313 if (! produced) {
314 break;
315 }
316 consumer(context);
317 }
318}
319
320} // namespace
Note: See TracBrowser for help on using the repository browser.