source: trunk/src/concurrent.cpp @ 2392

Last change on this file since 2392 was 2392, checked in by KohjiNakamura, 12 years ago

concurrent module added

File size: 5.8 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#include <omp.h>
14#include <errno.h>
15#include <assert.h>
16#include <limits.h>
17#include "concurrent.h"
18
19//#define LOG(x) do {}while(false)
20#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
21
22namespace concurrent {
23/* ======================= Mutex ======================= */
24Mutex::Mutex() throw(int)
25{
26        int result = pthread_mutex_init(&mutex, NULL);
27        if (result != 0) {
28                LOG(result);
29                throw result;
30        }
31}
32
33Mutex::~Mutex()
34{
35        int result = pthread_mutex_destroy(&mutex);
36        if (result != 0) {
37                LOG(result);
38        }
39}
40
41void Mutex::lock() throw(int)
42{
43        int result = pthread_mutex_lock(&mutex);
44        if (result != 0) {
45                LOG(result);
46                throw result;
47        }
48}
49
50bool Mutex::try_lock() throw(int)
51{
52        int result = pthread_mutex_trylock(&mutex);
53        if (result == 0) {
54                return true;
55        }
56        if (result == EBUSY) {
57                return false;
58        }
59        LOG(result);
60        throw result;
61}
62
63void Mutex::unlock() throw(int)
64{
65        int result = pthread_mutex_unlock(&mutex);
66        if (result != 0) {
67                LOG(result);
68                throw result;
69        }
70}
71
72/* ======================= Semaphore ======================= */
73Semaphore::Semaphore(unsigned initial) throw(int)
74{
75        //mutex = PTHREAD_MUTEX_INITIALIZER;
76        //cond = PTHREAD_COND_INITIALIZER;
77        sem = initial;
78
79        int result = pthread_mutex_init(&mutex, NULL);
80        if (result != 0) {
81                LOG(result);
82                throw result;
83        }
84
85        result = pthread_cond_init(&cond, NULL);
86        if (result != 0) {
87                pthread_mutex_destroy(&mutex);
88                LOG(result);
89                throw result;
90        }
91}
92
93Semaphore::~Semaphore()
94{
95        int result = pthread_mutex_destroy(&mutex);
96        result = pthread_cond_destroy(&cond);
97}
98
99void Semaphore::up(unsigned amount) throw(int)
100{
101        assert(0 < amount && amount <= UINT_MAX - sem);
102        int result = pthread_mutex_lock(&mutex);
103        if (result == 0) {
104                sem += amount;
105                result = pthread_cond_signal(&cond);
106                int result2 = pthread_mutex_unlock(&mutex);
107                if (result == 0 && result2 == 0) {
108                        return;
109                }
110                if (result != 0 && result2 != 0) {
111                        LOG(result);
112                        result = 0;
113                }
114                if (result == 0) {
115                        result = result2;
116                }
117        }
118        LOG(result);
119        throw result;
120}
121
122void Semaphore::down(unsigned amount) throw(int)
123{
124        assert(0 < amount);
125        int result = pthread_mutex_lock(&mutex);
126        if (result == 0) {
127                while (sem < amount) {
128                        result = pthread_cond_wait(&cond, &mutex);
129                        if (result != 0) {
130                                LOG(result);
131                                break;
132                        }
133                }
134                if (sem >= amount) {
135                        sem -= amount;
136                }
137                int result2 = pthread_mutex_unlock(&mutex);
138                if (result == 0 && result2 == 0) {
139                        return;
140                }
141                if (result != 0 && result2 != 0) {
142                        LOG(result);
143                        result = 0;
144                }
145                if (result == 0) {
146                        result = result2;
147                }
148        }
149        LOG(result);
150        throw result;
151}
152
153/* ======================= Broker ======================= */
154Broker::Broker(bool (*producer)(void *context) throw(PCException),
155                           void (*consumer)(void *context) throw(PCException))
156{
157        this->producer = producer;
158        this->consumer = consumer;
159}
160
161Broker::~Broker()
162{
163}
164
165void Broker::enableNested()
166{
167        omp_set_nested(1);
168}
169
170void Broker::disableNested()
171{
172        omp_set_nested(0);
173}
174
175void Broker::setNestedState(bool nested)
176{
177        omp_set_nested(static_cast<int>(nested));
178}
179
180bool Broker::getNestedState()
181{
182        return omp_get_nested() ? true : false;
183}
184
185void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
186        throw(PCException)
187{
188        _run(context, do_ahead, ProdAsMaster);
189}
190
191void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
192        throw(PCException)
193{
194        _run(context, do_ahead, ConsAsMaster);
195}
196
197void Broker::run(void *context, unsigned do_ahead) throw(PCException)
198{
199        _run(context, do_ahead, Unspecified);
200}
201
202void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
203        throw(PCException)
204{
205        assert(do_ahead > 0);
206#ifdef _OPENMP
207        PCException *prodEx = NULL;
208        PCException *consEx = NULL;
209        unsigned queuedJobs = 0;
210        int consumerTerminated = 0;
211        Semaphore semaphore_for_consumer;
212        Semaphore semaphore_for_producer(do_ahead);
213
214        #pragma omp parallel num_threads(2) \
215                shared(semaphore_for_consumer, semaphore_for_producer, \
216                           consumerTerminated, queuedJobs)
217        {
218                //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
219                bool runProd = true;
220                if (threadSpec == Unspecified) {
221                        #pragma omp single
222                        {
223                                runProd = false;
224                        }
225                } else {
226                        bool isMaster = false;
227                        #pragma omp master
228                        {
229                                isMaster = true;
230                        }
231                        if (threadSpec == ProdAsMaster) {
232                                runProd = isMaster;
233                        } else { // ConsAsMaster
234                                runProd = ! isMaster;
235                        }
236                }
237
238                if (runProd) { // producer
239                        for (;;) {
240                                semaphore_for_producer.down();
241                                int consumerDead = 0;
242                                #pragma omp atomic
243                                consumerDead += consumerTerminated;
244                                if (consumerDead) {
245                                        break;
246                                }
247                                try {
248                                        bool produced = producer(context);
249                                        if (! produced) {
250                                                break;
251                                        }
252                                } catch (PCException &e) {
253                                        prodEx = &e;
254                                        break;
255                                }
256                                #pragma omp atomic
257                                queuedJobs++;
258                                semaphore_for_consumer.up();
259                        }
260                        // additional 'up' to give consumer a chance to terminate.
261                        semaphore_for_consumer.up();
262                } else { // consumer
263                        for (;;) {
264                                semaphore_for_consumer.down();
265                                unsigned remainingJobs = 0U;
266                                #pragma omp atomic
267                                remainingJobs += queuedJobs;
268                                if (remainingJobs == 0U) {
269                                        break;
270                                }
271                                #pragma omp atomic
272                                queuedJobs--;
273                                try {
274                                        consumer(context);
275                                } catch (PCException &e) {
276                                        consEx = &e;
277                                        break;
278                                }
279                                semaphore_for_producer.up();
280                        }
281                        #pragma omp atomic
282                        consumerTerminated++;
283                        // additional 'up' to give producer a chance to terminate.
284                        semaphore_for_producer.up();
285                }
286        }
287        if (prodEx) {
288                prodEx->raise();
289        } else if (consEx) {
290                consEx->raise();
291        }
292#else
293        runSequential(context);
294#endif
295}
296
297void Broker::runSequential(void *context) throw(PCException)
298{
299        for (;;) {
300                bool produced = producer(context);
301                if (! produced) {
302                        break;
303                }
304                consumer(context);
305        }
306}
307
308} // namespace
Note: See TracBrowser for help on using the repository browser.