00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #define CPTHREAD_IS_HERE
00023 #define SUPPRESS_PTHREADS
00024 #include "cpthreads.h"
00025 #include <stdlib.h>
00026 #include <errno.h>
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #define PT_MAGIC 0x8173292a
00038 #define ATTR_MAGIC 0x783a2004
00039 #define KEY_MAGIC 0x99934315
00040 #define FKEY_MAGIC 0x99934315
00041 #define MATTR_MAGIC 0x12673434
00042 #define CATTR_MAGIC 0xA865B812
00043
00044 #undef MUTEX_MAGIC
00045 #undef COND_MAGIC
00046 #define MUTEX_MAGIC 0x13237770
00047 #define COND_MAGIC 0x99431664
00048
00049
00050
00051
00052
00053
00054
00055 typedef void *(*voidfn)(void *);
00056
00057 struct Cpthread_s
00058 {
00059 int magic;
00060 voidfn startfn;
00061 void *startarg;
00062 int detached;
00063 void *joinstatus;
00064 Cpthread_cleanup_t cleanups;
00065 CthThread waiting;
00066 CthThread thread;
00067 };
00068
00069 #define errcode(n) { Cpthread_errno=(n); return -1; }
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087 struct Cpthread_key_s
00088 {
00089 int magic;
00090 int offset;
00091 void (*destructo)(void *);
00092 Cpthread_key_t next;
00093 };
00094
00095 Cpthread_key_t keys_active = 0;
00096 Cpthread_key_t keys_inactive = 0;
00097
00098 int Cpthread_key_create(Cpthread_key_t *keyp, void (*destructo)(void *))
00099 {
00100 Cpthread_key_t key;
00101 key = keys_inactive;
00102 if (key) {
00103 keys_inactive = key->next;
00104 } else {
00105 key = (Cpthread_key_t)malloc(sizeof(struct Cpthread_key_s));
00106 _MEMCHECK(key);
00107 key->offset = CthRegister(sizeof(void *));
00108 }
00109 key->magic = KEY_MAGIC;
00110 key->destructo = destructo;
00111 key->next = keys_active;
00112 keys_active = key;
00113 *keyp = key;
00114 return 0;
00115 }
00116
00117 int Cpthread_key_delete(Cpthread_key_t key)
00118 {
00119 Cpthread_key_t active = keys_active;
00120 if (key->magic != KEY_MAGIC) errcode(EINVAL);
00121 if (active==key) {
00122 keys_active = key->next;
00123 } else {
00124 while (active) {
00125 if (active->next == key) {
00126 active->next = key->next;
00127 goto deleted;
00128 }
00129 active = active->next;
00130 }
00131 return -1;
00132 }
00133 deleted:
00134 key->magic = FKEY_MAGIC;
00135 key->next = keys_inactive;
00136 keys_inactive = key;
00137 return 0;
00138 }
00139
00140 int Cpthread_setspecific(Cpthread_key_t key, void *val)
00141 {
00142 char *data;
00143 data = CthCpvAccess(CthData);
00144 if (key->magic != KEY_MAGIC) errcode(EINVAL);
00145 *((void **)(data+(key->offset))) = val;
00146 return 0;
00147 }
00148
00149 void *Cpthread_getspecific(Cpthread_key_t key)
00150 {
00151 char *data = CthCpvAccess(CthData);
00152 if (key->magic != KEY_MAGIC) return 0;
00153 return *((void **)(data+(key->offset)));
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165 struct Cpthread_cleanup_s
00166 {
00167 void (*routine)(void *);
00168 void *argument;
00169 Cpthread_cleanup_t next;
00170 };
00171
00172 void Cpthread_cleanup_push(void (*routine)(void*), void *arg)
00173 {
00174 Cpthread_t pt = CtvAccess(Cpthread_current);
00175 Cpthread_cleanup_t c =
00176 (Cpthread_cleanup_t)malloc(sizeof(struct Cpthread_cleanup_s));
00177 _MEMCHECK(c);
00178 c->routine = routine;
00179 c->argument = arg;
00180 c->next = pt->cleanups;
00181 pt->cleanups = c;
00182 }
00183
00184 void Cpthread_cleanup_pop(int execute)
00185 {
00186 Cpthread_t pt = CtvAccess(Cpthread_current);
00187 Cpthread_cleanup_t c = pt->cleanups;
00188 if (c) {
00189 pt->cleanups = c->next;
00190 if (execute) (c->routine)(c->argument);
00191 free(c);
00192 }
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209 int Cpthread_attr_init(Cpthread_attr_t *attr)
00210 {
00211 attr->magic = ATTR_MAGIC;
00212 attr->detached = 0;
00213 attr->stacksize = 0;
00214 return 0;
00215 }
00216
00217 int Cpthread_attr_destroy(Cpthread_attr_t *attr)
00218 {
00219 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00220 attr->magic = 0;
00221 return 0;
00222 }
00223
00224 int Cpthread_attr_getstacksize(Cpthread_attr_t *attr, size_t *size)
00225 {
00226 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00227 *size = attr->stacksize;
00228 return 0;
00229 }
00230
00231 int Cpthread_attr_setstacksize(Cpthread_attr_t *attr, size_t size)
00232 {
00233 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00234 attr->stacksize = size;
00235 return 0;
00236 }
00237
00238 int Cpthread_attr_getdetachstate(Cpthread_attr_t *attr, int *state)
00239 {
00240 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00241 *state = attr->detached;
00242 return 0;
00243 }
00244
00245 int Cpthread_attr_setdetachstate(Cpthread_attr_t *attr, int state)
00246 {
00247 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00248 attr->detached = state;
00249 return 0;
00250 }
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263 void Cpthread_top(Cpthread_t pt)
00264 {
00265 Cpthread_key_t k; char *data; void *result;
00266
00267 data = CthCpvAccess(CthData);
00268 for (k=keys_active; k; k=k->next)
00269 *(void **)(data+(k->offset)) = 0;
00270 CtvAccess(Cpthread_errcode) = 0;
00271 CtvAccess(Cpthread_current) = pt;
00272 result = pt->startfn(pt->startarg);
00273 Cpthread_exit(result);
00274 }
00275
00276 int Cpthread_create(Cpthread_t *thread, Cpthread_attr_t *attr,
00277 voidfn fn, void *arg)
00278 {
00279 Cpthread_t pt;
00280 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00281 pt = (Cpthread_t)malloc(sizeof(struct Cpthread_s));
00282 _MEMCHECK(pt);
00283 pt->magic = PT_MAGIC;
00284 pt->startfn = fn;
00285 pt->startarg = arg;
00286 pt->detached = attr->detached;
00287 pt->joinstatus = 0;
00288 pt->cleanups = 0;
00289 pt->waiting = 0;
00290 pt->thread = CthCreate((CthVoidFn)Cpthread_top, (void *)pt, attr->stacksize);
00291 CthSetStrategyDefault(pt->thread);
00292 CthAwaken(pt->thread);
00293 *thread = pt;
00294 return 0;
00295 }
00296
00297 void Cpthread_exit(void *status)
00298 {
00299 Cpthread_t pt; Cpthread_cleanup_t c, cn; Cpthread_key_t k;
00300 void *priv; char *data; CthThread t;
00301
00302 pt = CtvAccess(Cpthread_current);
00303 t = pt->thread;
00304 c = pt->cleanups;
00305 data = CthCpvAccess(CthData);
00306
00307
00308 while (c) {
00309 (c->routine)(c->argument);
00310 cn = c->next;
00311 free(c); c=cn;
00312 }
00313
00314 k = keys_active;
00315 while (k) {
00316 if (k->destructo) {
00317 priv = *(void **)(data+(k->offset));
00318 if (priv) (k->destructo)(priv);
00319 }
00320 k=k->next;
00321 }
00322
00323 if (pt->detached) {
00324 pt->magic = 0;
00325 free(pt);
00326 } else {
00327 pt->joinstatus = status;
00328 pt->thread = 0;
00329 if (pt->waiting) CthAwaken(pt->waiting);
00330 }
00331 CthFree(t);
00332 CthSuspend();
00333 }
00334
00335 int Cpthread_equal(Cpthread_t t1, Cpthread_t t2)
00336 {
00337 return (t1==t2);
00338 }
00339
00340 int Cpthread_detach(Cpthread_t pt)
00341 {
00342 if (pt->magic != PT_MAGIC) errcode(EINVAL);
00343 if (pt->thread==0) {
00344 pt->magic = 0;
00345 free(pt);
00346 } else {
00347 pt->detached = 1;
00348 }
00349 return 0;
00350 }
00351
00352 int Cpthread_join(Cpthread_t pt, void **status)
00353 {
00354 if (pt->magic != PT_MAGIC) errcode(EINVAL);
00355 if (pt->thread) {
00356 pt->waiting = CthSelf();
00357 CthSuspend();
00358 }
00359 *status = pt->joinstatus;
00360 free(pt);
00361 return 0;
00362 }
00363
00364 int Cpthread_once(Cpthread_once_t *once, void (*fn)(void))
00365 {
00366 int rank = CmiMyRank();
00367 if (rank>=32) {
00368 CmiPrintf("error: cpthreads current implementation limited to 32 PE's per node.\n");
00369 exit(1);
00370 }
00371 if (once->flag[rank]) return 0;
00372 once->flag[rank] = 1;
00373 fn();
00374 return 1;
00375 }
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386 static void errspan(void)
00387 {
00388 CmiPrintf("Error: Cpthreads sync primitives do not work across processor boundaries.\n");
00389 exit(1);
00390 }
00391
00392 int Cpthread_mutexattr_init(Cpthread_mutexattr_t *mattr)
00393 {
00394 mattr->magic = MATTR_MAGIC;
00395 return 0;
00396 }
00397
00398 int Cpthread_mutexattr_destroy(Cpthread_mutexattr_t *mattr)
00399 {
00400 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00401 mattr->magic = 0;
00402 return 0;
00403 }
00404
00405 int Cpthread_mutexattr_getpshared(Cpthread_mutexattr_t *mattr, int *pshared)
00406 {
00407 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00408 *pshared = mattr->pshared;
00409 return 0;
00410 }
00411
00412 int Cpthread_mutexattr_setpshared(Cpthread_mutexattr_t *mattr, int pshared)
00413 {
00414 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00415 mattr->pshared = pshared;
00416 return 0;
00417 }
00418
00419 int Cpthread_mutex_init(Cpthread_mutex_t *mutex, Cpthread_mutexattr_t *mattr)
00420 {
00421 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00422 mutex->magic = MUTEX_MAGIC;
00423 mutex->onpe = CmiMyPe();
00424 mutex->users = CdsFifo_Create();
00425 return 0;
00426 }
00427
00428 int Cpthread_mutex_destroy(Cpthread_mutex_t *mutex)
00429 {
00430 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00431 if (mutex->onpe != CmiMyPe()) errspan();
00432 if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
00433 mutex->magic = 0;
00434 CdsFifo_Destroy(mutex->users);
00435 return 0;
00436 }
00437
00438 int Cpthread_mutex_lock(Cpthread_mutex_t *mutex)
00439 {
00440 CthThread self = CthSelf();
00441 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00442 if (mutex->onpe != CmiMyPe()) errspan();
00443 CdsFifo_Enqueue(mutex->users, self);
00444 if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
00445 return 0;
00446 }
00447
00448 int Cpthread_mutex_trylock(Cpthread_mutex_t *mutex)
00449 {
00450 CthThread self = CthSelf();
00451 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00452 if (mutex->onpe != CmiMyPe()) errspan();
00453 if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
00454 CdsFifo_Enqueue(mutex->users, self);
00455 return 0;
00456 }
00457
00458 int Cpthread_mutex_unlock(Cpthread_mutex_t *mutex)
00459 {
00460 CthThread self = CthSelf();
00461 CthThread sleeper;
00462 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00463 if (mutex->onpe != CmiMyPe()) errspan();
00464 if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
00465 CdsFifo_Pop(mutex->users);
00466 sleeper = (CthThread)CdsFifo_Peek(mutex->users);
00467 if (sleeper) CthAwaken(sleeper);
00468 return 0;
00469 }
00470
00471
00472
00473
00474
00475
00476
00477 int Cpthread_condattr_init(Cpthread_condattr_t *cattr)
00478 {
00479 cattr->magic = CATTR_MAGIC;
00480 return 0;
00481 }
00482
00483 int Cpthread_condattr_destroy(Cpthread_condattr_t *cattr)
00484 {
00485 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00486 return 0;
00487 }
00488
00489 int Cpthread_condattr_getpshared(Cpthread_condattr_t *cattr, int *pshared)
00490 {
00491 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00492 *pshared = cattr->pshared;
00493 return 0;
00494 }
00495
00496 int Cpthread_condattr_setpshared(Cpthread_condattr_t *cattr, int pshared)
00497 {
00498 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00499 cattr->pshared = pshared;
00500 return 0;
00501 }
00502
00503 int Cpthread_cond_init(Cpthread_cond_t *cond, Cpthread_condattr_t *cattr)
00504 {
00505 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00506 cond->magic = COND_MAGIC;
00507 cond->onpe = CmiMyPe();
00508 cond->users = CdsFifo_Create();
00509 return 0;
00510 }
00511
00512 int Cpthread_cond_destroy(Cpthread_cond_t *cond)
00513 {
00514 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00515 if (cond->onpe != CmiMyPe()) errspan();
00516 cond->magic = 0;
00517 CdsFifo_Destroy(cond->users);
00518 return 0;
00519 }
00520
00521 int Cpthread_cond_wait(Cpthread_cond_t *cond, Cpthread_mutex_t *mutex)
00522 {
00523 CthThread self = CthSelf();
00524 CthThread sleeper;
00525
00526 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00527 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00528 if (cond->onpe != CmiMyPe()) errspan();
00529 if (mutex->onpe != CmiMyPe()) errspan();
00530
00531 if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
00532 CdsFifo_Pop(mutex->users);
00533 sleeper = (CthThread)CdsFifo_Peek(mutex->users);
00534 if (sleeper) CthAwaken(sleeper);
00535 CdsFifo_Enqueue(cond->users, self);
00536 CthSuspend();
00537 CdsFifo_Enqueue(mutex->users, self);
00538 if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
00539 return 0;
00540 }
00541
00542 int Cpthread_cond_signal(Cpthread_cond_t *cond)
00543 {
00544 CthThread sleeper;
00545 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00546 if (cond->onpe != CmiMyPe()) errspan();
00547 sleeper = (CthThread)CdsFifo_Dequeue(cond->users);
00548 if (sleeper) CthAwaken(sleeper);
00549 return 0;
00550 }
00551
00552 int Cpthread_cond_broadcast(Cpthread_cond_t *cond)
00553 {
00554 CthThread sleeper;
00555 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00556 if (cond->onpe != CmiMyPe()) errspan();
00557 while (1) {
00558 sleeper = (CthThread)CdsFifo_Dequeue(cond->users);
00559 if (sleeper==0) break;
00560 CthAwaken(sleeper);
00561 }
00562 return 0;
00563 }
00564
00565
00566
00567
00568
00569
00570
00571 typedef struct CmiMainFnArg_s
00572 {
00573 CmiStartFn fn;
00574 char **argv;
00575 int argc;
00576 } CmiMainFnArg;
00577
00578 static void Cpthread_main_wrapper(CmiMainFnArg *arg)
00579 {
00580 arg->fn(arg->argc, arg->argv);
00581 free(arg);
00582 }
00583
00584 int Cpthread_init(void)
00585 {
00586 return 0;
00587 }
00588
00589 extern "C" void CpthreadModuleInit(void)
00590 {
00591 CtvInitialize(Cpthread_t, Cpthread_current);
00592 CtvInitialize(int, Cpthread_errcode);
00593 }
00594
00595 void Cpthread_start_main(CmiStartFn fn, int argc, char **argv)
00596 {
00597 Cpthread_t pt;
00598 Cpthread_attr_t attrib;
00599 CmiIntPtr pargc = argc;
00600 if (CmiMyRank()==0) {
00601 CmiMainFnArg * arg = (CmiMainFnArg *)malloc(sizeof(CmiMainFnArg));
00602
00603 Cpthread_attr_init(&attrib);
00604 Cpthread_attr_setdetachstate(&attrib, 1);
00605
00606 arg->fn = fn;
00607 arg->argc = argc;
00608 arg->argv = argv;
00609
00610 Cpthread_create(&pt, &attrib, (voidfn)Cpthread_main_wrapper, arg);
00611 }
00612 }