source: trunk/src/concurrent.cpp@ 3123

Last change on this file since 3123 was 3068, checked in by Takeshi Nakazato, 9 years ago

New Development: No

JIRA Issue: No

Ready for Test: Yes

Interface Changes: Yes/No

What Interface Changed: Please list interface changes

Test Programs: List test programs

Put in Release Notes: Yes/No

Module(s): Module Names change impacts.

Description: Describe your changes here...


Make concurrent module warning free.

File size: 6.0 KB
Line 
1//
2// C++ Interface: concurrent
3//
4// Description:
5//
6//
7// Author: Kohji Nakamura <k.nakamura@nao.ac.jp>, (C) 2012
8//
9// Copyright: See COPYING file that comes with this distribution
10//
11//
12#include <stdio.h>
13#ifdef _OPENMP
14#include <omp.h>
15#endif
16#include <errno.h>
17#include <assert.h>
18#include <limits.h>
19#include "concurrent.h"
20
21//#define LOG(x) do {}while(false)
22#define LOG(x) fprintf(stderr, "Error: %d\n", (x))
23
24namespace concurrent {
25/* ======================= Mutex ======================= */
26Mutex::Mutex() throw(int)
27{
28 int result = pthread_mutex_init(&mutex, NULL);
29 if (result != 0) {
30 LOG(result);
31 throw result;
32 }
33}
34
35Mutex::~Mutex()
36{
37 int result = pthread_mutex_destroy(&mutex);
38 if (result != 0) {
39 LOG(result);
40 }
41}
42
43void Mutex::lock() throw(int)
44{
45 int result = pthread_mutex_lock(&mutex);
46 if (result != 0) {
47 LOG(result);
48 throw result;
49 }
50}
51
52bool Mutex::try_lock() throw(int)
53{
54 int result = pthread_mutex_trylock(&mutex);
55 if (result == 0) {
56 return true;
57 }
58 if (result == EBUSY) {
59 return false;
60 }
61 LOG(result);
62 throw result;
63}
64
65void Mutex::unlock() throw(int)
66{
67 int result = pthread_mutex_unlock(&mutex);
68 if (result != 0) {
69 LOG(result);
70 throw result;
71 }
72}
73
74/* ======================= Semaphore ======================= */
75Semaphore::Semaphore(unsigned initial) throw(int)
76{
77 //mutex = PTHREAD_MUTEX_INITIALIZER;
78 //cond = PTHREAD_COND_INITIALIZER;
79 sem = initial;
80
81 int result = pthread_mutex_init(&mutex, NULL);
82 if (result != 0) {
83 LOG(result);
84 throw result;
85 }
86
87 result = pthread_cond_init(&cond, NULL);
88 if (result != 0) {
89 pthread_mutex_destroy(&mutex);
90 LOG(result);
91 throw result;
92 }
93}
94
95Semaphore::~Semaphore()
96{
97 int result = pthread_mutex_destroy(&mutex);
98 if (result != 0) {
99 LOG(result);
100 }
101 result = pthread_cond_destroy(&cond);
102 if (result != 0) {
103 LOG(result);
104 }
105}
106
107void Semaphore::up(unsigned amount) throw(int)
108{
109 assert(0 < amount && amount <= UINT_MAX - sem);
110 int result = pthread_mutex_lock(&mutex);
111 if (result == 0) {
112 sem += amount;
113 result = pthread_cond_signal(&cond);
114 int result2 = pthread_mutex_unlock(&mutex);
115 if (result == 0 && result2 == 0) {
116 return;
117 }
118 if (result != 0 && result2 != 0) {
119 LOG(result);
120 result = 0;
121 }
122 if (result == 0) {
123 result = result2;
124 }
125 }
126 LOG(result);
127 throw result;
128}
129
130void Semaphore::down(unsigned amount) throw(int)
131{
132 assert(0 < amount);
133 int result = pthread_mutex_lock(&mutex);
134 if (result == 0) {
135 while (sem < amount) {
136 result = pthread_cond_wait(&cond, &mutex);
137 if (result != 0) {
138 LOG(result);
139 break;
140 }
141 }
142 if (sem >= amount) {
143 sem -= amount;
144 }
145 int result2 = pthread_mutex_unlock(&mutex);
146 if (result == 0 && result2 == 0) {
147 return;
148 }
149 if (result != 0 && result2 != 0) {
150 LOG(result);
151 result = 0;
152 }
153 if (result == 0) {
154 result = result2;
155 }
156 }
157 LOG(result);
158 throw result;
159}
160
161/* ======================= Broker ======================= */
162Broker::Broker(bool (*producer)(void *context) throw(PCException),
163 void (*consumer)(void *context) throw(PCException))
164{
165 this->producer = producer;
166 this->consumer = consumer;
167}
168
169Broker::~Broker()
170{
171}
172
173void Broker::enableNested()
174{
175#ifdef _OPENMP
176 omp_set_nested(1);
177#endif
178}
179
180void Broker::disableNested()
181{
182#ifdef _OPENMP
183 omp_set_nested(0);
184#endif
185}
186
187void Broker::setNestedState(bool nested)
188{
189#ifdef _OPENMP
190 omp_set_nested(static_cast<int>(nested));
191#endif
192}
193
194bool Broker::getNestedState()
195{
196#ifdef _OPENMP
197 return omp_get_nested() ? true : false;
198#else
199 return false;
200#endif
201}
202
203void Broker::runProducerAsMasterThread(void *context, unsigned do_ahead)
204 throw(PCException)
205{
206 _run(context, do_ahead, ProdAsMaster);
207}
208
209void Broker::runConsumerAsMasterThread(void *context, unsigned do_ahead)
210 throw(PCException)
211{
212 _run(context, do_ahead, ConsAsMaster);
213}
214
215void Broker::run(void *context, unsigned do_ahead) throw(PCException)
216{
217 _run(context, do_ahead, Unspecified);
218}
219
220void Broker::_run(void *context, unsigned do_ahead, ThreadSpec threadSpec)
221 throw(PCException)
222{
223 assert(do_ahead > 0);
224#ifdef _OPENMP
225 PCException *prodEx = NULL;
226 PCException *consEx = NULL;
227 unsigned queuedJobs = 0;
228 int consumerTerminated = 0;
229 Semaphore semaphore_for_consumer;
230 Semaphore semaphore_for_producer(do_ahead);
231
232 #pragma omp parallel num_threads(2) \
233 shared(semaphore_for_consumer, semaphore_for_producer, \
234 consumerTerminated, queuedJobs)
235 {
236 //fprintf(stderr, "run: %p %d\n", context, omp_get_thread_num());
237 bool runProd = true;
238 if (threadSpec == Unspecified) {
239 #pragma omp single
240 {
241 runProd = false;
242 }
243 } else {
244 bool isMaster = false;
245 #pragma omp master
246 {
247 isMaster = true;
248 }
249 if (threadSpec == ProdAsMaster) {
250 runProd = isMaster;
251 } else { // ConsAsMaster
252 runProd = ! isMaster;
253 }
254 }
255
256 if (runProd) { // producer
257 for (;;) {
258 semaphore_for_producer.down();
259 int consumerDead = 0;
260 #pragma omp atomic
261 consumerDead += consumerTerminated;
262 if (consumerDead) {
263 break;
264 }
265 try {
266 bool produced = producer(context);
267 if (! produced) {
268 break;
269 }
270 } catch (PCException &e) {
271 prodEx = &e;
272 break;
273 }
274 #pragma omp atomic
275 queuedJobs++;
276 semaphore_for_consumer.up();
277 }
278 // additional 'up' to give consumer a chance to terminate.
279 semaphore_for_consumer.up();
280 } else { // consumer
281 for (;;) {
282 semaphore_for_consumer.down();
283 unsigned remainingJobs = 0U;
284 #pragma omp atomic
285 remainingJobs += queuedJobs;
286 if (remainingJobs == 0U) {
287 break;
288 }
289 #pragma omp atomic
290 queuedJobs--;
291 try {
292 consumer(context);
293 } catch (PCException &e) {
294 consEx = &e;
295 break;
296 }
297 semaphore_for_producer.up();
298 }
299 #pragma omp atomic
300 consumerTerminated++;
301 // additional 'up' to give producer a chance to terminate.
302 semaphore_for_producer.up();
303 }
304 }
305 if (prodEx) {
306 prodEx->raise();
307 } else if (consEx) {
308 consEx->raise();
309 }
310#else
311 runSequential(context);
312#endif
313}
314
315void Broker::runSequential(void *context) throw(PCException)
316{
317 for (;;) {
318 bool produced = producer(context);
319 if (! produced) {
320 break;
321 }
322 consumer(context);
323 }
324}
325
326} // namespace
Note: See TracBrowser for help on using the repository browser.