source: trunk/src/concurrent.cpp @ 2446

Last change on this file since 2446 was 2446, checked in by KohjiNakamura, 12 years ago

fixed to be able to compile without omp.h

File size: 5.9 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#ifdef _OPENMP
14#include <omp.h>
15#endif
16#include <errno.h>
17#include <assert.h>
18#include <limits.h>
19#include "concurrent.h"
20
21//#define LOG(x) do {}while(false)
22#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
23
24namespace concurrent {
25/* ======================= Mutex ======================= */
26Mutex::Mutex() throw(int)
27{
28        int result = pthread_mutex_init(&mutex, NULL);
29        if (result != 0) {
30                LOG(result);
31                throw result;
32        }
33}
34
35Mutex::~Mutex()
36{
37        int result = pthread_mutex_destroy(&mutex);
38        if (result != 0) {
39                LOG(result);
40        }
41}
42
43void Mutex::lock() throw(int)
44{
45        int result = pthread_mutex_lock(&mutex);
46        if (result != 0) {
47                LOG(result);
48                throw result;
49        }
50}
51
52bool Mutex::try_lock() throw(int)
53{
54        int result = pthread_mutex_trylock(&mutex);
55        if (result == 0) {
56                return true;
57        }
58        if (result == EBUSY) {
59                return false;
60        }
61        LOG(result);
62        throw result;
63}
64
65void Mutex::unlock() throw(int)
66{
67        int result = pthread_mutex_unlock(&mutex);
68        if (result != 0) {
69                LOG(result);
70                throw result;
71        }
72}
73
74/* ======================= Semaphore ======================= */
75Semaphore::Semaphore(unsigned initial) throw(int)
76{
77        //mutex = PTHREAD_MUTEX_INITIALIZER;
78        //cond = PTHREAD_COND_INITIALIZER;
79        sem = initial;
80
81        int result = pthread_mutex_init(&mutex, NULL);
82        if (result != 0) {
83                LOG(result);
84                throw result;
85        }
86
87        result = pthread_cond_init(&cond, NULL);
88        if (result != 0) {
89                pthread_mutex_destroy(&mutex);
90                LOG(result);
91                throw result;
92        }
93}
94
95Semaphore::~Semaphore()
96{
97        int result = pthread_mutex_destroy(&mutex);
98        result = pthread_cond_destroy(&cond);
99}
100
101void Semaphore::up(unsigned amount) throw(int)
102{
103        assert(0 < amount && amount <= UINT_MAX - sem);
104        int result = pthread_mutex_lock(&mutex);
105        if (result == 0) {
106                sem += amount;
107                result = pthread_cond_signal(&cond);
108                int result2 = pthread_mutex_unlock(&mutex);
109                if (result == 0 && result2 == 0) {
110                        return;
111                }
112                if (result != 0 && result2 != 0) {
113                        LOG(result);
114                        result = 0;
115                }
116                if (result == 0) {
117                        result = result2;
118                }
119        }
120        LOG(result);
121        throw result;
122}
123
124void Semaphore::down(unsigned amount) throw(int)
125{
126        assert(0 < amount);
127        int result = pthread_mutex_lock(&mutex);
128        if (result == 0) {
129                while (sem < amount) {
130                        result = pthread_cond_wait(&cond, &mutex);
131                        if (result != 0) {
132                                LOG(result);
133                                break;
134                        }
135                }
136                if (sem >= amount) {
137                        sem -= amount;
138                }
139                int result2 = pthread_mutex_unlock(&mutex);
140                if (result == 0 && result2 == 0) {
141                        return;
142                }
143                if (result != 0 && result2 != 0) {
144                        LOG(result);
145                        result = 0;
146                }
147                if (result == 0) {
148                        result = result2;
149                }
150        }
151        LOG(result);
152        throw result;
153}
154
155/* ======================= Broker ======================= */
156Broker::Broker(bool (*producer)(void *context) throw(PCException),
157                           void (*consumer)(void *context) throw(PCException))
158{
159        this->producer = producer;
160        this->consumer = consumer;
161}
162
163Broker::~Broker()
164{
165}
166
167void Broker::enableNested()
168{
169#ifdef _OPENMP
170        omp_set_nested(1);
171#endif
172}
173
174void Broker::disableNested()
175{
176#ifdef _OPENMP
177        omp_set_nested(0);
178#endif
179}
180
181void Broker::setNestedState(bool nested)
182{
183#ifdef _OPENMP
184        omp_set_nested(static_cast<int>(nested));
185#endif
186}
187
188bool Broker::getNestedState()
189{
190#ifdef _OPENMP
191        return omp_get_nested() ? true : false;
192#else
193        return false;
194#endif
195}
196
197void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
198        throw(PCException)
199{
200        _run(context, do_ahead, ProdAsMaster);
201}
202
203void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
204        throw(PCException)
205{
206        _run(context, do_ahead, ConsAsMaster);
207}
208
209void Broker::run(void *context, unsigned do_ahead) throw(PCException)
210{
211        _run(context, do_ahead, Unspecified);
212}
213
214void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
215        throw(PCException)
216{
217        assert(do_ahead > 0);
218#ifdef _OPENMP
219        PCException *prodEx = NULL;
220        PCException *consEx = NULL;
221        unsigned queuedJobs = 0;
222        int consumerTerminated = 0;
223        Semaphore semaphore_for_consumer;
224        Semaphore semaphore_for_producer(do_ahead);
225
226        #pragma omp parallel num_threads(2) \
227                shared(semaphore_for_consumer, semaphore_for_producer, \
228                           consumerTerminated, queuedJobs)
229        {
230                //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
231                bool runProd = true;
232                if (threadSpec == Unspecified) {
233                        #pragma omp single
234                        {
235                                runProd = false;
236                        }
237                } else {
238                        bool isMaster = false;
239                        #pragma omp master
240                        {
241                                isMaster = true;
242                        }
243                        if (threadSpec == ProdAsMaster) {
244                                runProd = isMaster;
245                        } else { // ConsAsMaster
246                                runProd = ! isMaster;
247                        }
248                }
249
250                if (runProd) { // producer
251                        for (;;) {
252                                semaphore_for_producer.down();
253                                int consumerDead = 0;
254                                #pragma omp atomic
255                                consumerDead += consumerTerminated;
256                                if (consumerDead) {
257                                        break;
258                                }
259                                try {
260                                        bool produced = producer(context);
261                                        if (! produced) {
262                                                break;
263                                        }
264                                } catch (PCException &e) {
265                                        prodEx = &e;
266                                        break;
267                                }
268                                #pragma omp atomic
269                                queuedJobs++;
270                                semaphore_for_consumer.up();
271                        }
272                        // additional 'up' to give consumer a chance to terminate.
273                        semaphore_for_consumer.up();
274                } else { // consumer
275                        for (;;) {
276                                semaphore_for_consumer.down();
277                                unsigned remainingJobs = 0U;
278                                #pragma omp atomic
279                                remainingJobs += queuedJobs;
280                                if (remainingJobs == 0U) {
281                                        break;
282                                }
283                                #pragma omp atomic
284                                queuedJobs--;
285                                try {
286                                        consumer(context);
287                                } catch (PCException &e) {
288                                        consEx = &e;
289                                        break;
290                                }
291                                semaphore_for_producer.up();
292                        }
293                        #pragma omp atomic
294                        consumerTerminated++;
295                        // additional 'up' to give producer a chance to terminate.
296                        semaphore_for_producer.up();
297                }
298        }
299        if (prodEx) {
300                prodEx->raise();
301        } else if (consEx) {
302                consEx->raise();
303        }
304#else
305        runSequential(context);
306#endif
307}
308
309void Broker::runSequential(void *context) throw(PCException)
310{
311        for (;;) {
312                bool produced = producer(context);
313                if (! produced) {
314                        break;
315                }
316                consumer(context);
317        }
318}
319
320} // namespace
Note: See TracBrowser for help on using the repository browser.