source: trunk/src/concurrent.cpp @ 3068

Last change on this file since 3068 was 3068, checked in by Takeshi Nakazato, 8 years ago

New Development: No

JIRA Issue: No

Ready for Test: Yes

Interface Changes: Yes/No?

What Interface Changed: Please list interface changes

Test Programs: List test programs

Put in Release Notes: Yes/No?

Module(s): Module Names change impacts.

Description: Describe your changes here...


Make concurrent module warning free.

File size: 6.0 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#ifdef _OPENMP
14#include <omp.h>
15#endif
16#include <errno.h>
17#include <assert.h>
18#include <limits.h>
19#include "concurrent.h"
20
21//#define LOG(x) do {}while(false)
22#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
23
24namespace concurrent {
25/* ======================= Mutex ======================= */
26Mutex::Mutex() throw(int)
27{
28        int result = pthread_mutex_init(&mutex, NULL);
29        if (result != 0) {
30                LOG(result);
31                throw result;
32        }
33}
34
35Mutex::~Mutex()
36{
37        int result = pthread_mutex_destroy(&mutex);
38        if (result != 0) {
39                LOG(result);
40        }
41}
42
43void Mutex::lock() throw(int)
44{
45        int result = pthread_mutex_lock(&mutex);
46        if (result != 0) {
47                LOG(result);
48                throw result;
49        }
50}
51
52bool Mutex::try_lock() throw(int)
53{
54        int result = pthread_mutex_trylock(&mutex);
55        if (result == 0) {
56                return true;
57        }
58        if (result == EBUSY) {
59                return false;
60        }
61        LOG(result);
62        throw result;
63}
64
65void Mutex::unlock() throw(int)
66{
67        int result = pthread_mutex_unlock(&mutex);
68        if (result != 0) {
69                LOG(result);
70                throw result;
71        }
72}
73
74/* ======================= Semaphore ======================= */
75Semaphore::Semaphore(unsigned initial) throw(int)
76{
77        //mutex = PTHREAD_MUTEX_INITIALIZER;
78        //cond = PTHREAD_COND_INITIALIZER;
79        sem = initial;
80
81        int result = pthread_mutex_init(&mutex, NULL);
82        if (result != 0) {
83                LOG(result);
84                throw result;
85        }
86
87        result = pthread_cond_init(&cond, NULL);
88        if (result != 0) {
89                pthread_mutex_destroy(&mutex);
90                LOG(result);
91                throw result;
92        }
93}
94
95Semaphore::~Semaphore()
96{
97        int result = pthread_mutex_destroy(&mutex);
98  if (result != 0) {
99    LOG(result);
100  }
101        result = pthread_cond_destroy(&cond);
102  if (result != 0) {
103    LOG(result);
104  }
105}
106
107void Semaphore::up(unsigned amount) throw(int)
108{
109        assert(0 < amount && amount <= UINT_MAX - sem);
110        int result = pthread_mutex_lock(&mutex);
111        if (result == 0) {
112                sem += amount;
113                result = pthread_cond_signal(&cond);
114                int result2 = pthread_mutex_unlock(&mutex);
115                if (result == 0 && result2 == 0) {
116                        return;
117                }
118                if (result != 0 && result2 != 0) {
119                        LOG(result);
120                        result = 0;
121                }
122                if (result == 0) {
123                        result = result2;
124                }
125        }
126        LOG(result);
127        throw result;
128}
129
130void Semaphore::down(unsigned amount) throw(int)
131{
132        assert(0 < amount);
133        int result = pthread_mutex_lock(&mutex);
134        if (result == 0) {
135                while (sem < amount) {
136                        result = pthread_cond_wait(&cond, &mutex);
137                        if (result != 0) {
138                                LOG(result);
139                                break;
140                        }
141                }
142                if (sem >= amount) {
143                        sem -= amount;
144                }
145                int result2 = pthread_mutex_unlock(&mutex);
146                if (result == 0 && result2 == 0) {
147                        return;
148                }
149                if (result != 0 && result2 != 0) {
150                        LOG(result);
151                        result = 0;
152                }
153                if (result == 0) {
154                        result = result2;
155                }
156        }
157        LOG(result);
158        throw result;
159}
160
161/* ======================= Broker ======================= */
162Broker::Broker(bool (*producer)(void *context) throw(PCException),
163                           void (*consumer)(void *context) throw(PCException))
164{
165        this->producer = producer;
166        this->consumer = consumer;
167}
168
169Broker::~Broker()
170{
171}
172
173void Broker::enableNested()
174{
175#ifdef _OPENMP
176        omp_set_nested(1);
177#endif
178}
179
180void Broker::disableNested()
181{
182#ifdef _OPENMP
183        omp_set_nested(0);
184#endif
185}
186
187void Broker::setNestedState(bool nested)
188{
189#ifdef _OPENMP
190        omp_set_nested(static_cast<int>(nested));
191#endif
192}
193
194bool Broker::getNestedState()
195{
196#ifdef _OPENMP
197        return omp_get_nested() ? true : false;
198#else
199        return false;
200#endif
201}
202
203void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
204        throw(PCException)
205{
206        _run(context, do_ahead, ProdAsMaster);
207}
208
209void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
210        throw(PCException)
211{
212        _run(context, do_ahead, ConsAsMaster);
213}
214
215void Broker::run(void *context, unsigned do_ahead) throw(PCException)
216{
217        _run(context, do_ahead, Unspecified);
218}
219
220void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
221        throw(PCException)
222{
223        assert(do_ahead > 0);
224#ifdef _OPENMP
225        PCException *prodEx = NULL;
226        PCException *consEx = NULL;
227        unsigned queuedJobs = 0;
228        int consumerTerminated = 0;
229        Semaphore semaphore_for_consumer;
230        Semaphore semaphore_for_producer(do_ahead);
231
232        #pragma omp parallel num_threads(2) \
233                shared(semaphore_for_consumer, semaphore_for_producer, \
234                           consumerTerminated, queuedJobs)
235        {
236                //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
237                bool runProd = true;
238                if (threadSpec == Unspecified) {
239                        #pragma omp single
240                        {
241                                runProd = false;
242                        }
243                } else {
244                        bool isMaster = false;
245                        #pragma omp master
246                        {
247                                isMaster = true;
248                        }
249                        if (threadSpec == ProdAsMaster) {
250                                runProd = isMaster;
251                        } else { // ConsAsMaster
252                                runProd = ! isMaster;
253                        }
254                }
255
256                if (runProd) { // producer
257                        for (;;) {
258                                semaphore_for_producer.down();
259                                int consumerDead = 0;
260                                #pragma omp atomic
261                                consumerDead += consumerTerminated;
262                                if (consumerDead) {
263                                        break;
264                                }
265                                try {
266                                        bool produced = producer(context);
267                                        if (! produced) {
268                                                break;
269                                        }
270                                } catch (PCException &e) {
271                                        prodEx = &e;
272                                        break;
273                                }
274                                #pragma omp atomic
275                                queuedJobs++;
276                                semaphore_for_consumer.up();
277                        }
278                        // additional 'up' to give consumer a chance to terminate.
279                        semaphore_for_consumer.up();
280                } else { // consumer
281                        for (;;) {
282                                semaphore_for_consumer.down();
283                                unsigned remainingJobs = 0U;
284                                #pragma omp atomic
285                                remainingJobs += queuedJobs;
286                                if (remainingJobs == 0U) {
287                                        break;
288                                }
289                                #pragma omp atomic
290                                queuedJobs--;
291                                try {
292                                        consumer(context);
293                                } catch (PCException &e) {
294                                        consEx = &e;
295                                        break;
296                                }
297                                semaphore_for_producer.up();
298                        }
299                        #pragma omp atomic
300                        consumerTerminated++;
301                        // additional 'up' to give producer a chance to terminate.
302                        semaphore_for_producer.up();
303                }
304        }
305        if (prodEx) {
306                prodEx->raise();
307        } else if (consEx) {
308                consEx->raise();
309        }
310#else
311        runSequential(context);
312#endif
313}
314
315void Broker::runSequential(void *context) throw(PCException)
316{
317        for (;;) {
318                bool produced = producer(context);
319                if (! produced) {
320                        break;
321                }
322                consumer(context);
323        }
324}
325
326} // namespace
Note: See TracBrowser for help on using the repository browser.