00001
00002
00003
00004
00005
00006
00007
00014 #include <errno.h>
00015 #include <pthread.h>
00016 #include <sched.h>
00017 #include <time.h>
00018
00019 #include <stdio.h>
00020 #include <sys/types.h>
00021 #include <sys/time.h>
00022 #include <limits.h>
00023 #include <unistd.h>
00024
00025 #include "converse.h"
00026
00027 #define BLK_LEN 512
00028
00029 typedef struct {
00030 pthread_mutex_t mutex;
00031 pthread_cond_t cond;
00032 int waiting;
00033 CdsFifo q;
00034 } McQueue;
00035
00036 static McQueue *McQueueCreate(void);
00037 static void McQueueAddToBack(McQueue *queue, void *element);
00038 static void *McQueueRemoveFromFront(McQueue *queue);
00039 static McQueue **MsgQueue;
00040
00041 CpvDeclare(void*, CmiLocalQueue);
00042
00043 int Cmi_argc;
00044 int _Cmi_numpes;
00045 int Cmi_usched;
00046 int Cmi_initret;
00047 CmiStartFn Cmi_startFn;
00048
00049 pthread_key_t perThreadKey;
00050
00051 static void *threadInit(void *arg);
00052
00053 pthread_mutex_t memory_mutex;
00054
00055 void CmiMemLock() {pthread_mutex_lock(&memory_mutex);}
00056 void CmiMemUnlock() {pthread_mutex_unlock(&memory_mutex);}
00057
00058 int barrier;
00059 pthread_cond_t barrier_cond;
00060 pthread_mutex_t barrier_mutex;
00061
00062 void CmiNodeBarrier(void)
00063 {
00064 pthread_mutex_lock(&barrier_mutex);
00065 barrier++;
00066 if(barrier!=CmiNumPes())
00067 pthread_cond_wait(&barrier_cond, &barrier_mutex);
00068 else {
00069 barrier = 0;
00070 pthread_cond_broadcast(&barrier_cond);
00071 }
00072 pthread_mutex_unlock(&barrier_mutex);
00073 }
00074
00075 void CmiNodeAllBarrier(void)
00076 {
00077 pthread_mutex_lock(&barrier_mutex);
00078 barrier++;
00079 if(barrier!=CmiNumPes()+1)
00080 pthread_cond_wait(&barrier_cond, &barrier_mutex);
00081 else {
00082 barrier = 0;
00083 pthread_cond_broadcast(&barrier_cond);
00084 }
00085 pthread_mutex_unlock(&barrier_mutex);
00086 }
00087
00088 CmiNodeLock CmiCreateLock(void)
00089 {
00090 pthread_mutex_t *lock;
00091 lock = (pthread_mutex_t *) CmiAlloc(sizeof(pthread_mutex_t));
00092 pthread_mutex_init(lock, (pthread_mutexattr_t *) 0);
00093 return lock;
00094 }
00095
00096 void CmiLock(CmiNodeLock lock)
00097 {
00098 pthread_mutex_lock(lock);
00099 }
00100
00101 void CmiUnlock(CmiNodeLock lock)
00102 {
00103 pthread_mutex_unlock(lock);
00104 }
00105
00106 int CmiTryLock(CmiNodeLock lock)
00107 {
00108 return pthread_mutex_trylock(lock);
00109 }
00110
00111 void CmiDestroyLock(CmiNodeLock lock)
00112 {
00113 pthread_mutex_destroy(lock);
00114 }
00115
00116 int CmiMyPe()
00117 {
00118 int mype = (size_t) pthread_getspecific(perThreadKey);
00119 return mype;
00120 }
00121
00122
00123
00124
00125
00126
00127
00128 void CmiAbort(const char *message)
00129 {
00130 CmiError(message);
00131 abort();
00132 }
00133
00134 int CmiAsyncMsgSent(CmiCommHandle msgid)
00135 {
00136 return 1;
00137 }
00138
00139
00140 typedef struct {
00141 char **argv;
00142 int mype;
00143 } USER_PARAMETERS;
00144
00145 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00146 {
00147 int i;
00148 USER_PARAMETERS *usrparam;
00149 pthread_t *aThread;
00150
00151 _Cmi_numpes = 0;
00152 Cmi_usched = usched;
00153 Cmi_initret = initret;
00154 Cmi_startFn = fn;
00155
00156 CmiGetArgInt(argv,"+p",&_Cmi_numpes);
00157 if (_Cmi_numpes <= 0)
00158 {
00159 CmiError("Error: requested number of processors is invalid %d\n",
00160 _Cmi_numpes);
00161 abort();
00162 }
00163
00164
00165 pthread_mutex_init(&memory_mutex, (pthread_mutexattr_t *) 0);
00166
00167 MsgQueue=(McQueue **)CmiAlloc(_Cmi_numpes*sizeof(McQueue *));
00168 for(i=0; i<_Cmi_numpes; i++)
00169 MsgQueue[i] = McQueueCreate();
00170
00171 pthread_key_create(&perThreadKey, (void *) 0);
00172 barrier = 0;
00173 pthread_cond_init(&barrier_cond, (pthread_condattr_t *) 0);
00174 pthread_mutex_init(&barrier_mutex, (pthread_mutexattr_t *) 0);
00175
00176
00177 pthread_setconcurrency(_Cmi_numpes);
00178
00179 Cmi_argc = CmiGetArgc(argv);
00180 aThread = (pthread_t *) CmiAlloc(sizeof(pthread_t) * _Cmi_numpes);
00181 for(i=1; i<_Cmi_numpes; i++) {
00182 usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
00183 usrparam->argv = CmiCopyArgs(argv);
00184 usrparam->mype = i;
00185
00186 pthread_create(&aThread[i],(pthread_attr_t *)0,threadInit,(void *)usrparam);
00187 }
00188 usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
00189 usrparam->argv = CmiCopyArgs(argv);
00190 usrparam->mype = 0;
00191 threadInit(usrparam);
00192 }
00193
00194 void CmiTimerInit(void);
00195
00196 static void *threadInit(void *arg)
00197 {
00198 USER_PARAMETERS *usrparam;
00199 usrparam = (USER_PARAMETERS *) arg;
00200
00201
00202 pthread_setspecific(perThreadKey, (void *) usrparam->mype);
00203
00204 CthInit(usrparam->argv);
00205 ConverseCommonInit(usrparam->argv);
00206 CpvInitialize(void*, CmiLocalQueue);
00207 CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00208 CmiTimerInit();
00209 if (Cmi_initret==0) {
00210 Cmi_startFn(Cmi_argc, usrparam->argv);
00211 if (Cmi_usched==0) CsdScheduler(-1);
00212 ConverseExit();
00213 }
00214 return (void *) 0;
00215 }
00216
00217
00218 void ConverseExit(void)
00219 {
00220 ConverseCommonExit();
00221 CmiNodeBarrier();
00222 }
00223
00224
00225 void CmiDeclareArgs(void)
00226 {
00227 }
00228
00229
00230 void CmiNotifyIdle()
00231 {
00232 McQueue *queue = MsgQueue[CmiMyPe()];
00233 struct timespec ts;
00234 pthread_mutex_lock(&(queue->mutex));
00235 if(CdsFifo_Empty(queue->q)){
00236 queue->waiting++;
00237 ts.tv_sec = (time_t) 0;
00238 ts.tv_nsec = 10000000L;
00239 pthread_cond_timedwait(&(queue->cond), &(queue->mutex), &ts);
00240 queue->waiting--;
00241 }
00242 pthread_mutex_unlock(&(queue->mutex));
00243 return;
00244 }
00245
00246 void *CmiGetNonLocal()
00247 {
00248 return McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
00249 }
00250
00251
00252 void CmiSyncSendFn(int destPE, int size, char *msg)
00253 {
00254 char *buf;
00255
00256 buf=(void *)CmiAlloc(size);
00257 memcpy(buf,msg,size);
00258 McQueueAddToBack(MsgQueue[destPE],buf);
00259 CQdCreate(CpvAccess(cQdState), 1);
00260 }
00261
00262
00263 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00264 {
00265 CmiSyncSendFn(destPE, size, msg);
00266 return 0;
00267 }
00268
00269
00270 void CmiFreeSendFn(int destPE, int size, char *msg)
00271 {
00272 if (CmiMyPe()==destPE) {
00273 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00274 } else {
00275 McQueueAddToBack(MsgQueue[destPE],msg);
00276 }
00277 CQdCreate(CpvAccess(cQdState), 1);
00278 }
00279
00280 void CmiSyncBroadcastFn(int size, char *msg)
00281 {
00282 int i;
00283 for(i=0; i<_Cmi_numpes; i++)
00284 if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00285 }
00286
00287 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00288 {
00289 CmiSyncBroadcastFn(size, msg);
00290 return 0;
00291 }
00292
00293 void CmiFreeBroadcastFn(int size, char *msg)
00294 {
00295 CmiSyncBroadcastFn(size,msg);
00296 CmiFree(msg);
00297 }
00298
00299 void CmiSyncBroadcastAllFn(int size, char *msg)
00300 {
00301 int i;
00302 for(i=0; i<CmiNumPes(); i++)
00303 CmiSyncSendFn(i,size,msg);
00304 }
00305
00306
00307 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00308 {
00309 CmiSyncBroadcastAllFn(size, msg);
00310 return 0;
00311 }
00312
00313
00314 void CmiFreeBroadcastAllFn(int size, char *msg)
00315 {
00316 int i;
00317 for(i=0; i<CmiNumPes(); i++) {
00318 if(CmiMyPe() != i) {
00319 CmiSyncSendFn(i,size,msg);
00320 }
00321 }
00322 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00323 CQdCreate(CpvAccess(cQdState), 1);
00324 }
00325
00326
00327
00328
00329
00330
00331 static void ** AllocBlock(unsigned int len)
00332 {
00333 void **blk;
00334
00335 blk=(void **)CmiAlloc(len*sizeof(void *));
00336 return blk;
00337 }
00338
00339 static void
00340 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
00341 {
00342 memcpy(destblk, &(srcblk[first]), (len-first)*sizeof(void *));
00343 memcpy(&(destblk[len-first]),srcblk,first*sizeof(void *));
00344 }
00345
00346 McQueue * McQueueCreate(void)
00347 {
00348 McQueue *queue;
00349
00350 queue = (McQueue *) CmiAlloc(sizeof(McQueue));
00351 pthread_mutex_init(&(queue->mutex), (pthread_mutexattr_t *) 0);
00352 pthread_cond_init(&(queue->cond), (pthread_condattr_t *) 0);
00353 queue->waiting = 0;
00354 queue->q = CdsFifo_Create_len(BLK_LEN);
00355 return queue;
00356 }
00357
00358 void McQueueAddToBack(McQueue *queue, void *element)
00359 {
00360 pthread_mutex_lock(&(queue->mutex));
00361 CdsFifo_Enqueue(queue->q, element);
00362 if(queue->waiting) {
00363 pthread_cond_broadcast(&(queue->cond));
00364 }
00365 pthread_mutex_unlock(&(queue->mutex));
00366 }
00367
00368
00369 void * McQueueRemoveFromFront(McQueue *queue)
00370 {
00371 void *element = 0;
00372 pthread_mutex_lock(&(queue->mutex));
00373 element = CdsFifo_Dequeue(queue->q);
00374 pthread_mutex_unlock(&(queue->mutex));
00375 return element;
00376 }
00377
00378
00379
00380
00381 CpvStaticDeclare(double,inittime_wallclock);
00382 CpvStaticDeclare(double,inittime_virtual);
00383
00384 void CmiTimerInit(void)
00385 {
00386 struct timespec temp;
00387 CpvInitialize(double, inittime_wallclock);
00388 CpvInitialize(double, inittime_virtual);
00389 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00390 CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
00391 1e-9 * temp.tv_nsec;
00392 CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
00393 }
00394
00395 double CmiWallTimer(void)
00396 {
00397 struct timespec temp;
00398 double currenttime;
00399
00400 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00401 currenttime = (double) temp.tv_sec +
00402 1e-9 * temp.tv_nsec;
00403 return (currenttime - CpvAccess(inittime_wallclock));
00404 }
00405
00406 double CmiCpuTimer(void)
00407 {
00408 struct timespec temp;
00409 double currenttime;
00410
00411 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00412 currenttime = (double) temp.tv_sec +
00413 1e-9 * temp.tv_nsec;
00414 return (currenttime - CpvAccess(inittime_virtual));
00415 }
00416
00417 double CmiTimer(void)
00418 {
00419 return CmiCpuTimer();
00420 }
00421