00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #define CPTHREAD_IS_HERE
00023 #define SUPPRESS_PTHREADS
00024 #include "cpthreads.h"
00025 #include <stdlib.h>
00026 #include <errno.h>
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #define PT_MAGIC 0x8173292a
00038 #define ATTR_MAGIC 0x783a2004
00039 #define KEY_MAGIC 0x99934315
00040 #define FKEY_MAGIC 0x99934315
00041 #define MATTR_MAGIC 0x12673434
00042 #define CATTR_MAGIC 0xA865B812
00043
00044 #undef MUTEX_MAGIC
00045 #undef COND_MAGIC
00046 #define MUTEX_MAGIC 0x13237770
00047 #define COND_MAGIC 0x99431664
00048
00049
00050
00051
00052
00053
00054
00055 typedef void *(*voidfn)();
00056
00057 struct Cpthread_s
00058 {
00059 int magic;
00060 voidfn startfn;
00061 void *startarg1;
00062 void *startarg2;
00063 void *startarg3;
00064 int detached;
00065 void *joinstatus;
00066 Cpthread_cleanup_t cleanups;
00067 CthThread waiting;
00068 CthThread thread;
00069 };
00070
00071 #define errcode(n) { Cpthread_errno=(n); return -1; }
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089 struct Cpthread_key_s
00090 {
00091 int magic;
00092 int offset;
00093 void (*destructo)(void *);
00094 Cpthread_key_t next;
00095 };
00096
00097 Cpthread_key_t keys_active = 0;
00098 Cpthread_key_t keys_inactive = 0;
00099
00100 int Cpthread_key_create(Cpthread_key_t *keyp, void (*destructo)(void *))
00101 {
00102 Cpthread_key_t key;
00103 key = keys_inactive;
00104 if (key) {
00105 keys_inactive = key->next;
00106 } else {
00107 key = (Cpthread_key_t)malloc(sizeof(struct Cpthread_key_s));
00108 _MEMCHECK(key);
00109 key->offset = CthRegister(sizeof(void *));
00110 }
00111 key->magic = KEY_MAGIC;
00112 key->destructo = destructo;
00113 key->next = keys_active;
00114 keys_active = key;
00115 *keyp = key;
00116 return 0;
00117 }
00118
00119 int Cpthread_key_delete(Cpthread_key_t key)
00120 {
00121 Cpthread_key_t active = keys_active;
00122 if (key->magic != KEY_MAGIC) errcode(EINVAL);
00123 if (active==key) {
00124 keys_active = key->next;
00125 } else {
00126 while (active) {
00127 if (active->next == key) {
00128 active->next = key->next;
00129 goto deleted;
00130 }
00131 active = active->next;
00132 }
00133 return -1;
00134 }
00135 deleted:
00136 key->magic = FKEY_MAGIC;
00137 key->next = keys_inactive;
00138 keys_inactive = key;
00139 return 0;
00140 }
00141
00142 int Cpthread_setspecific(Cpthread_key_t key, void *val)
00143 {
00144 char *data;
00145 data = CthCpvAccess(CthData);
00146 if (key->magic != KEY_MAGIC) errcode(EINVAL);
00147 *((void **)(data+(key->offset))) = val;
00148 return 0;
00149 }
00150
00151 void *Cpthread_getspecific(Cpthread_key_t key)
00152 {
00153 char *data = CthCpvAccess(CthData);
00154 if (key->magic != KEY_MAGIC) return 0;
00155 return *((void **)(data+(key->offset)));
00156 }
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 struct Cpthread_cleanup_s
00168 {
00169 void (*routine)(void *);
00170 void *argument;
00171 Cpthread_cleanup_t next;
00172 };
00173
00174 void Cpthread_cleanup_push(void (*routine)(void*), void *arg)
00175 {
00176 Cpthread_t pt = CtvAccess(Cpthread_current);
00177 Cpthread_cleanup_t c =
00178 (Cpthread_cleanup_t)malloc(sizeof(struct Cpthread_cleanup_s));
00179 _MEMCHECK(c);
00180 c->routine = routine;
00181 c->argument = arg;
00182 c->next = pt->cleanups;
00183 pt->cleanups = c;
00184 }
00185
00186 void Cpthread_cleanup_pop(int execute)
00187 {
00188 Cpthread_t pt = CtvAccess(Cpthread_current);
00189 Cpthread_cleanup_t c = pt->cleanups;
00190 if (c) {
00191 pt->cleanups = c->next;
00192 if (execute) (c->routine)(c->argument);
00193 free(c);
00194 }
00195 }
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211 int Cpthread_attr_init(Cpthread_attr_t *attr)
00212 {
00213 attr->magic = ATTR_MAGIC;
00214 attr->detached = 0;
00215 attr->stacksize = 0;
00216 return 0;
00217 }
00218
00219 int Cpthread_attr_destroy(Cpthread_attr_t *attr)
00220 {
00221 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00222 attr->magic = 0;
00223 return 0;
00224 }
00225
00226 int Cpthread_attr_getstacksize(Cpthread_attr_t *attr, size_t *size)
00227 {
00228 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00229 *size = attr->stacksize;
00230 return 0;
00231 }
00232
00233 int Cpthread_attr_setstacksize(Cpthread_attr_t *attr, size_t size)
00234 {
00235 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00236 attr->stacksize = size;
00237 return 0;
00238 }
00239
00240 int Cpthread_attr_getdetachstate(Cpthread_attr_t *attr, int *state)
00241 {
00242 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00243 *state = attr->detached;
00244 return 0;
00245 }
00246
00247 int Cpthread_attr_setdetachstate(Cpthread_attr_t *attr, int state)
00248 {
00249 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00250 attr->detached = state;
00251 return 0;
00252 }
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265 void Cpthread_top(Cpthread_t pt)
00266 {
00267 Cpthread_key_t k; char *data; void *result;
00268
00269 data = CthCpvAccess(CthData);
00270 for (k=keys_active; k; k=k->next)
00271 *(void **)(data+(k->offset)) = 0;
00272 CtvAccess(Cpthread_errcode) = 0;
00273 CtvAccess(Cpthread_current) = pt;
00274 result = (pt->startfn)(pt->startarg1, pt->startarg2, pt->startarg3);
00275 Cpthread_exit(result);
00276 }
00277
00278 int Cpthread_create3(Cpthread_t *thread, Cpthread_attr_t *attr,
00279 voidfn fn, void *a1, void *a2, void *a3)
00280 {
00281 Cpthread_t pt;
00282 if (attr->magic != ATTR_MAGIC) errcode(EINVAL);
00283 pt = (Cpthread_t)malloc(sizeof(struct Cpthread_s));
00284 _MEMCHECK(pt);
00285 pt->magic = PT_MAGIC;
00286 pt->startfn = fn;
00287 pt->startarg1 = a1;
00288 pt->startarg2 = a2;
00289 pt->startarg3 = a3;
00290 pt->detached = attr->detached;
00291 pt->joinstatus = 0;
00292 pt->cleanups = 0;
00293 pt->waiting = 0;
00294 pt->thread = CthCreate((CthVoidFn)Cpthread_top, (void *)pt, attr->stacksize);
00295 CthSetStrategyDefault(pt->thread);
00296 CthAwaken(pt->thread);
00297 *thread = pt;
00298 return 0;
00299 }
00300
00301 int Cpthread_create(Cpthread_t *thread, Cpthread_attr_t *attr,
00302 voidfn fn, void *arg)
00303 {
00304 return Cpthread_create3(thread, attr, fn, arg, 0, 0);
00305 }
00306
00307 void Cpthread_exit(void *status)
00308 {
00309 Cpthread_t pt; Cpthread_cleanup_t c, cn; Cpthread_key_t k;
00310 void *priv; char *data; CthThread t;
00311
00312 pt = CtvAccess(Cpthread_current);
00313 t = pt->thread;
00314 c = pt->cleanups;
00315 data = CthCpvAccess(CthData);
00316
00317
00318 while (c) {
00319 (c->routine)(c->argument);
00320 cn = c->next;
00321 free(c); c=cn;
00322 }
00323
00324 k = keys_active;
00325 while (k) {
00326 if (k->destructo) {
00327 priv = *(void **)(data+(k->offset));
00328 if (priv) (k->destructo)(priv);
00329 }
00330 k=k->next;
00331 }
00332
00333 if (pt->detached) {
00334 pt->magic = 0;
00335 free(pt);
00336 } else {
00337 pt->joinstatus = status;
00338 pt->thread = 0;
00339 if (pt->waiting) CthAwaken(pt->waiting);
00340 }
00341 CthFree(t);
00342 CthSuspend();
00343 }
00344
00345 int Cpthread_equal(Cpthread_t t1, Cpthread_t t2)
00346 {
00347 return (t1==t2);
00348 }
00349
00350 int Cpthread_detach(Cpthread_t pt)
00351 {
00352 if (pt->magic != PT_MAGIC) errcode(EINVAL);
00353 if (pt->thread==0) {
00354 pt->magic = 0;
00355 free(pt);
00356 } else {
00357 pt->detached = 1;
00358 }
00359 return 0;
00360 }
00361
00362 int Cpthread_join(Cpthread_t pt, void **status)
00363 {
00364 if (pt->magic != PT_MAGIC) errcode(EINVAL);
00365 if (pt->thread) {
00366 pt->waiting = CthSelf();
00367 CthSuspend();
00368 }
00369 *status = pt->joinstatus;
00370 free(pt);
00371 return 0;
00372 }
00373
00374 int Cpthread_once(Cpthread_once_t *once, void (*fn)(void))
00375 {
00376 int rank = CmiMyRank();
00377 if (rank>=32) {
00378 CmiPrintf("error: cpthreads current implementation limited to 32 PE's per node.\n");
00379 exit(1);
00380 }
00381 if (once->flag[rank]) return 0;
00382 once->flag[rank] = 1;
00383 fn();
00384 return 1;
00385 }
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396 static void errspan()
00397 {
00398 CmiPrintf("Error: Cpthreads sync primitives do not work across processor boundaries.\n");
00399 exit(1);
00400 }
00401
00402 int Cpthread_mutexattr_init(Cpthread_mutexattr_t *mattr)
00403 {
00404 mattr->magic = MATTR_MAGIC;
00405 return 0;
00406 }
00407
00408 int Cpthread_mutexattr_destroy(Cpthread_mutexattr_t *mattr)
00409 {
00410 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00411 mattr->magic = 0;
00412 return 0;
00413 }
00414
00415 int Cpthread_mutexattr_getpshared(Cpthread_mutexattr_t *mattr, int *pshared)
00416 {
00417 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00418 *pshared = mattr->pshared;
00419 return 0;
00420 }
00421
00422 int Cpthread_mutexattr_setpshared(Cpthread_mutexattr_t *mattr, int pshared)
00423 {
00424 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00425 mattr->pshared = pshared;
00426 return 0;
00427 }
00428
00429 int Cpthread_mutex_init(Cpthread_mutex_t *mutex, Cpthread_mutexattr_t *mattr)
00430 {
00431 if (mattr->magic != MATTR_MAGIC) errcode(EINVAL);
00432 mutex->magic = MUTEX_MAGIC;
00433 mutex->onpe = CmiMyPe();
00434 mutex->users = CdsFifo_Create();
00435 return 0;
00436 }
00437
00438 int Cpthread_mutex_destroy(Cpthread_mutex_t *mutex)
00439 {
00440 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00441 if (mutex->onpe != CmiMyPe()) errspan();
00442 if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
00443 mutex->magic = 0;
00444 CdsFifo_Destroy(mutex->users);
00445 return 0;
00446 }
00447
00448 int Cpthread_mutex_lock(Cpthread_mutex_t *mutex)
00449 {
00450 CthThread self = CthSelf();
00451 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00452 if (mutex->onpe != CmiMyPe()) errspan();
00453 CdsFifo_Enqueue(mutex->users, self);
00454 if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
00455 return 0;
00456 }
00457
00458 int Cpthread_mutex_trylock(Cpthread_mutex_t *mutex)
00459 {
00460 CthThread self = CthSelf();
00461 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00462 if (mutex->onpe != CmiMyPe()) errspan();
00463 if (!CdsFifo_Empty(mutex->users)) errcode(EBUSY);
00464 CdsFifo_Enqueue(mutex->users, self);
00465 return 0;
00466 }
00467
00468 int Cpthread_mutex_unlock(Cpthread_mutex_t *mutex)
00469 {
00470 CthThread self = CthSelf();
00471 CthThread sleeper;
00472 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00473 if (mutex->onpe != CmiMyPe()) errspan();
00474 if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
00475 CdsFifo_Pop(mutex->users);
00476 sleeper = CdsFifo_Peek(mutex->users);
00477 if (sleeper) CthAwaken(sleeper);
00478 return 0;
00479 }
00480
00481
00482
00483
00484
00485
00486
00487 int Cpthread_condattr_init(Cpthread_condattr_t *cattr)
00488 {
00489 cattr->magic = CATTR_MAGIC;
00490 return 0;
00491 }
00492
00493 int Cpthread_condattr_destroy(Cpthread_condattr_t *cattr)
00494 {
00495 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00496 return 0;
00497 }
00498
00499 int Cpthread_condattr_getpshared(Cpthread_condattr_t *cattr, int *pshared)
00500 {
00501 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00502 *pshared = cattr->pshared;
00503 return 0;
00504 }
00505
00506 int Cpthread_condattr_setpshared(Cpthread_condattr_t *cattr, int pshared)
00507 {
00508 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00509 cattr->pshared = pshared;
00510 return 0;
00511 }
00512
00513 int Cpthread_cond_init(Cpthread_cond_t *cond, Cpthread_condattr_t *cattr)
00514 {
00515 if (cattr->magic != CATTR_MAGIC) errcode(EINVAL);
00516 cond->magic = COND_MAGIC;
00517 cond->onpe = CmiMyPe();
00518 cond->users = CdsFifo_Create();
00519 return 0;
00520 }
00521
00522 int Cpthread_cond_destroy(Cpthread_cond_t *cond)
00523 {
00524 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00525 if (cond->onpe != CmiMyPe()) errspan();
00526 cond->magic = 0;
00527 CdsFifo_Destroy(cond->users);
00528 return 0;
00529 }
00530
00531 int Cpthread_cond_wait(Cpthread_cond_t *cond, Cpthread_mutex_t *mutex)
00532 {
00533 CthThread self = CthSelf();
00534 CthThread sleeper;
00535
00536 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00537 if (mutex->magic != MUTEX_MAGIC) errcode(EINVAL);
00538 if (cond->onpe != CmiMyPe()) errspan();
00539 if (mutex->onpe != CmiMyPe()) errspan();
00540
00541 if (CdsFifo_Peek(mutex->users) != self) errcode(EPERM);
00542 CdsFifo_Pop(mutex->users);
00543 sleeper = CdsFifo_Peek(mutex->users);
00544 if (sleeper) CthAwaken(sleeper);
00545 CdsFifo_Enqueue(cond->users, self);
00546 CthSuspend();
00547 CdsFifo_Enqueue(mutex->users, self);
00548 if (CdsFifo_Peek(mutex->users) != self) CthSuspend();
00549 return 0;
00550 }
00551
00552 int Cpthread_cond_signal(Cpthread_cond_t *cond)
00553 {
00554 CthThread sleeper;
00555 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00556 if (cond->onpe != CmiMyPe()) errspan();
00557 sleeper = CdsFifo_Dequeue(cond->users);
00558 if (sleeper) CthAwaken(sleeper);
00559 return 0;
00560 }
00561
00562 int Cpthread_cond_broadcast(Cpthread_cond_t *cond)
00563 {
00564 CthThread sleeper;
00565 if (cond->magic != COND_MAGIC) errcode(EINVAL);
00566 if (cond->onpe != CmiMyPe()) errspan();
00567 while (1) {
00568 sleeper = CdsFifo_Dequeue(cond->users);
00569 if (sleeper==0) break;
00570 CthAwaken(sleeper);
00571 }
00572 return 0;
00573 }
00574
00575
00576
00577
00578
00579
00580
00581 typedef void (*mainfn)(int argc, char **argv);
00582
00583 int Cpthread_init()
00584 {
00585 return 0;
00586 }
00587
00588 void CpthreadModuleInit()
00589 {
00590 CtvInitialize(Cpthread_t, Cpthread_current);
00591 CtvInitialize(int, Cpthread_errcode);
00592 }
00593
00594 void Cpthread_start_main(mainfn fn, int argc, char **argv)
00595 {
00596 Cpthread_t pt;
00597 Cpthread_attr_t attrib;
00598 CmiIntPtr pargc = argc;
00599 if (CmiMyRank()==0) {
00600 Cpthread_attr_init(&attrib);
00601 Cpthread_attr_setdetachstate(&attrib, 1);
00602 Cpthread_create3(&pt, &attrib, (voidfn)fn, (void *)pargc, argv, 0);
00603 }
00604 }