source: trunk/src/concurrent.cpp@ 2426

Last change on this file since 2426 was 2392, checked in by KohjiNakamura, 13 years ago

concurrent module added

File size: 5.8 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#include <omp.h>
14#include <errno.h>
15#include <assert.h>
16#include <limits.h>
17#include "concurrent.h"
18
19//#define LOG(x) do {}while(false)
20#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
21
22namespace concurrent {
23/* ======================= Mutex ======================= */
24Mutex::Mutex() throw(int)
25{
26 int result = pthread_mutex_init(&mutex, NULL);
27 if (result != 0) {
28 LOG(result);
29 throw result;
30 }
31}
32
33Mutex::~Mutex()
34{
35 int result = pthread_mutex_destroy(&mutex);
36 if (result != 0) {
37 LOG(result);
38 }
39}
40
41void Mutex::lock() throw(int)
42{
43 int result = pthread_mutex_lock(&mutex);
44 if (result != 0) {
45 LOG(result);
46 throw result;
47 }
48}
49
50bool Mutex::try_lock() throw(int)
51{
52 int result = pthread_mutex_trylock(&mutex);
53 if (result == 0) {
54 return true;
55 }
56 if (result == EBUSY) {
57 return false;
58 }
59 LOG(result);
60 throw result;
61}
62
63void Mutex::unlock() throw(int)
64{
65 int result = pthread_mutex_unlock(&mutex);
66 if (result != 0) {
67 LOG(result);
68 throw result;
69 }
70}
71
72/* ======================= Semaphore ======================= */
73Semaphore::Semaphore(unsigned initial) throw(int)
74{
75 //mutex = PTHREAD_MUTEX_INITIALIZER;
76 //cond = PTHREAD_COND_INITIALIZER;
77 sem = initial;
78
79 int result = pthread_mutex_init(&mutex, NULL);
80 if (result != 0) {
81 LOG(result);
82 throw result;
83 }
84
85 result = pthread_cond_init(&cond, NULL);
86 if (result != 0) {
87 pthread_mutex_destroy(&mutex);
88 LOG(result);
89 throw result;
90 }
91}
92
93Semaphore::~Semaphore()
94{
95 int result = pthread_mutex_destroy(&mutex);
96 result = pthread_cond_destroy(&cond);
97}
98
99void Semaphore::up(unsigned amount) throw(int)
100{
101 assert(0 < amount && amount <= UINT_MAX - sem);
102 int result = pthread_mutex_lock(&mutex);
103 if (result == 0) {
104 sem += amount;
105 result = pthread_cond_signal(&cond);
106 int result2 = pthread_mutex_unlock(&mutex);
107 if (result == 0 && result2 == 0) {
108 return;
109 }
110 if (result != 0 && result2 != 0) {
111 LOG(result);
112 result = 0;
113 }
114 if (result == 0) {
115 result = result2;
116 }
117 }
118 LOG(result);
119 throw result;
120}
121
122void Semaphore::down(unsigned amount) throw(int)
123{
124 assert(0 < amount);
125 int result = pthread_mutex_lock(&mutex);
126 if (result == 0) {
127 while (sem < amount) {
128 result = pthread_cond_wait(&cond, &mutex);
129 if (result != 0) {
130 LOG(result);
131 break;
132 }
133 }
134 if (sem >= amount) {
135 sem -= amount;
136 }
137 int result2 = pthread_mutex_unlock(&mutex);
138 if (result == 0 && result2 == 0) {
139 return;
140 }
141 if (result != 0 && result2 != 0) {
142 LOG(result);
143 result = 0;
144 }
145 if (result == 0) {
146 result = result2;
147 }
148 }
149 LOG(result);
150 throw result;
151}
152
153/* ======================= Broker ======================= */
154Broker::Broker(bool (*producer)(void *context) throw(PCException),
155 void (*consumer)(void *context) throw(PCException))
156{
157 this->producer = producer;
158 this->consumer = consumer;
159}
160
161Broker::~Broker()
162{
163}
164
165void Broker::enableNested()
166{
167 omp_set_nested(1);
168}
169
170void Broker::disableNested()
171{
172 omp_set_nested(0);
173}
174
175void Broker::setNestedState(bool nested)
176{
177 omp_set_nested(static_cast<int>(nested));
178}
179
180bool Broker::getNestedState()
181{
182 return omp_get_nested() ? true : false;
183}
184
185void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
186 throw(PCException)
187{
188 _run(context, do_ahead, ProdAsMaster);
189}
190
191void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
192 throw(PCException)
193{
194 _run(context, do_ahead, ConsAsMaster);
195}
196
197void Broker::run(void *context, unsigned do_ahead) throw(PCException)
198{
199 _run(context, do_ahead, Unspecified);
200}
201
202void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
203 throw(PCException)
204{
205 assert(do_ahead > 0);
206#ifdef _OPENMP
207 PCException *prodEx = NULL;
208 PCException *consEx = NULL;
209 unsigned queuedJobs = 0;
210 int consumerTerminated = 0;
211 Semaphore semaphore_for_consumer;
212 Semaphore semaphore_for_producer(do_ahead);
213
214 #pragma omp parallel num_threads(2) \
215 shared(semaphore_for_consumer, semaphore_for_producer, \
216 consumerTerminated, queuedJobs)
217 {
218 //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
219 bool runProd = true;
220 if (threadSpec == Unspecified) {
221 #pragma omp single
222 {
223 runProd = false;
224 }
225 } else {
226 bool isMaster = false;
227 #pragma omp master
228 {
229 isMaster = true;
230 }
231 if (threadSpec == ProdAsMaster) {
232 runProd = isMaster;
233 } else { // ConsAsMaster
234 runProd = ! isMaster;
235 }
236 }
237
238 if (runProd) { // producer
239 for (;;) {
240 semaphore_for_producer.down();
241 int consumerDead = 0;
242 #pragma omp atomic
243 consumerDead += consumerTerminated;
244 if (consumerDead) {
245 break;
246 }
247 try {
248 bool produced = producer(context);
249 if (! produced) {
250 break;
251 }
252 } catch (PCException &e) {
253 prodEx = &e;
254 break;
255 }
256 #pragma omp atomic
257 queuedJobs++;
258 semaphore_for_consumer.up();
259 }
260 // additional 'up' to give consumer a chance to terminate.
261 semaphore_for_consumer.up();
262 } else { // consumer
263 for (;;) {
264 semaphore_for_consumer.down();
265 unsigned remainingJobs = 0U;
266 #pragma omp atomic
267 remainingJobs += queuedJobs;
268 if (remainingJobs == 0U) {
269 break;
270 }
271 #pragma omp atomic
272 queuedJobs--;
273 try {
274 consumer(context);
275 } catch (PCException &e) {
276 consEx = &e;
277 break;
278 }
279 semaphore_for_producer.up();
280 }
281 #pragma omp atomic
282 consumerTerminated++;
283 // additional 'up' to give producer a chance to terminate.
284 semaphore_for_producer.up();
285 }
286 }
287 if (prodEx) {
288 prodEx->raise();
289 } else if (consEx) {
290 consEx->raise();
291 }
292#else
293 runSequential(context);
294#endif
295}
296
297void Broker::runSequential(void *context) throw(PCException)
298{
299 for (;;) {
300 bool produced = producer(context);
301 if (! produced) {
302 break;
303 }
304 consumer(context);
305 }
306}
307
308} // namespace
Note: See TracBrowser for help on using the repository browser.