00001
00002
00003
00004
00005
00006
00007
00014 #include <sys/types.h>
00015 #include <sys/sysmp.h>
00016 #include <sys/sysinfo.h>
00017 #include <sys/prctl.h>
00018 #include <sys/pda.h>
00019 #include <ulocks.h>
00020 #include <math.h>
00021 #include <stdlib.h>
00022 #include "converse.h"
00023 #include <time.h>
00024
00025 usptr_t *arena;
00026 static barrier_t *barr;
00027
00028 #define BLK_LEN 512
00029 typedef struct {
00030 usema_t *sema;
00031 void **blk;
00032 unsigned int blk_len;
00033 unsigned int first;
00034 unsigned int len;
00035 unsigned int maxlen;
00036 } McQueue;
00037
00038 static McQueue *McQueueCreate(void);
00039 static void McQueueAddToBack(McQueue *queue, void *element);
00040 static void *McQueueRemoveFromFront(McQueue *queue);
00041 static McQueue **MsgQueue;
00042 #define g_malloc(s) usmalloc(s,arena)
00043 #define g_free(p) usfree(p,arena)
00044
00045 #if NODE_0_IS_CONVHOST
00046 int inside_comm = 0;
00047 #endif
00048
00049 CpvDeclare(void*, CmiLocalQueue);
00050 int _Cmi_mype;
00051 int _Cmi_numpes;
00052 int _Cmi_myrank;
00053
00054 static int nthreads;
00055 static int requested_npe;
00056 static int arena_size_meg;
00057
00058 static void threadInit(void *arg);
00059
00060 int membusy;
00061
00062 void CmiMemLock() {membusy=1;}
00063 void CmiMemUnlock() {membusy=0;}
00064
00065
00066
00067
00068
00069
00070
00071 void CmiAbort(const char *message)
00072 {
00073 CmiError(message);
00074 exit(1);
00075 }
00076
00077 int CmiAsyncMsgSent(CmiCommHandle msgid)
00078 {
00079 return 1;
00080 }
00081
00082
00083 typedef struct {
00084 void *argv;
00085 CmiStartFn fn;
00086 int argc;
00087 int npe;
00088 int mype;
00089 int usched;
00090 int initret;
00091 } USER_PARAMETERS;
00092
00093 USER_PARAMETERS usrparam;
00094
00095 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00096 {
00097 int i;
00098 requested_npe = 1;
00099 CmiGetArgInt(argv,"+p",&requested_npe);
00100 CmiGetArgInt(argv,"++p",&requested_npe);
00101 arena_size_meg = 128;
00102 CmiGetArgInt(argv,"+memsize",&arena_size_meg);
00103
00104 if (requested_npe <= 0)
00105 {
00106 CmiError("Error: requested number of processors is invalid %d\n",
00107 requested_npe);
00108 abort();
00109 }
00110
00111 usconfig(CONF_INITUSERS, requested_npe+1);
00112 usconfig(CONF_ARENATYPE, US_SHAREDONLY);
00113
00114 if(usconfig(CONF_INITSIZE, (arena_size_meg * 1<<20))==(-1)) {
00115 CmiPrintf("Cannot set size of arena\n");
00116 abort();
00117 }
00118 arena = usinit("/dev/zero");
00119 if(arena == (usptr_t *) NULL) {
00120 CmiError("Cannot allocate arena\n");
00121 abort();
00122 }
00123 barr = new_barrier(arena);
00124 init_barrier(barr);
00125 nthreads = requested_npe;
00126
00127 usrparam.argc = argc;
00128 usrparam.argv = CmiCopyArgs(argv);
00129 usrparam.npe = requested_npe;
00130 usrparam.fn = fn;
00131 usrparam.initret = initret;
00132 usrparam.usched = usched;
00133
00134 MsgQueue=(McQueue **)g_malloc(requested_npe*sizeof(McQueue *));
00135 if (MsgQueue == (McQueue **)0) {
00136 CmiError("Cannot Allocate Memory...\n");
00137 abort();
00138 }
00139 for(i=0; i<requested_npe; i++)
00140 MsgQueue[i] = McQueueCreate();
00141
00142 for(i=1; i<requested_npe; i++) {
00143 usrparam.mype = i;
00144 sproc(threadInit, PR_SFDS, (void *)&usrparam);
00145 }
00146 usrparam.mype = 0;
00147 threadInit(&usrparam);
00148 }
00149
00150
00151
00152 void CmiTimerInit(void);
00153
00154 static void threadInit(void *arg)
00155 {
00156 USER_PARAMETERS *usrparam;
00157 usrparam = (USER_PARAMETERS *) arg;
00158
00159
00160 CpvInitialize(void*, CmiLocalQueue);
00161 _Cmi_mype = usrparam->mype;
00162 _Cmi_myrank = 0;
00163 _Cmi_numpes = usrparam->npe;
00164 #ifdef DEBUG
00165 printf("thread %d/%d started \n", CmiMyPe(), CmiNumPes());
00166 #endif
00167 prctl(PR_SETEXITSIG, SIGHUP,0);
00168
00169 CthInit(usrparam->argv);
00170 CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00171 CmiTimerInit();
00172
00173 usadd(arena);
00174 ConverseCommonInit(usrparam->argv);
00175 if (usrparam->initret==0 || usrparam->mype) {
00176 usrparam->fn(CmiGetArgc(usrparam->argv), usrparam->argv);
00177 if (usrparam->usched==0) {
00178 CsdScheduler(-1);
00179 }
00180 ConverseExit();
00181 }
00182 }
00183
00184
00185 void ConverseExit(void)
00186 {
00187 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00188 if (CmiMyPe() == 0){
00189 CmiPrintf("End of program\n");
00190 }
00191 #endif
00192 prctl(PR_SETEXITSIG,0,0);
00193 ConverseCommonExit();
00194
00195 exit(0);
00196 }
00197
00198
00199 void CmiDeclareArgs(void)
00200 {
00201 }
00202
00203
00204 void CmiNotifyIdle()
00205 {
00206 }
00207
00208 void *CmiGetNonLocal()
00209 {
00210 return McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
00211 }
00212
00213
00214 void CmiSyncSendFn(int destPE, int size, char *msg)
00215 {
00216 char *buf;
00217
00218 buf=(void *)g_malloc(size+8);
00219 if(buf==(void *)0) {
00220 CmiError("Cannot allocate memory of size %d!\n", size);
00221 abort();
00222 }
00223 ((int *)buf)[0]=size;
00224 buf += 8;
00225
00226 #ifdef DEBUG
00227 printf("Message of size %d allocated\n", CmiSize(buf));
00228 #endif
00229 memcpy(buf,msg,size);
00230 McQueueAddToBack(MsgQueue[destPE],buf);
00231 CQdCreate(CpvAccess(cQdState), 1);
00232 }
00233
00234
00235 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00236 {
00237 CmiSyncSendFn(destPE, size, msg);
00238 return 0;
00239 }
00240
00241
00242 void CmiFreeSendFn(int destPE, int size, char *msg)
00243 {
00244 if (_Cmi_mype==destPE) {
00245 CQdCreate(CpvAccess(cQdState), 1);
00246 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00247 } else {
00248 CmiSyncSendFn(destPE, size, msg);
00249 CmiFree(msg);
00250 }
00251 }
00252
00253 void CmiSyncBroadcastFn(int size, char *msg)
00254 {
00255 int i;
00256 for(i=0; i<CmiNumPes(); i++)
00257 if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00258 }
00259
00260 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00261 {
00262 CmiSyncBroadcastFn(size, msg);
00263 return 0;
00264 }
00265
00266 void CmiFreeBroadcastFn(int size, char *msg)
00267 {
00268 CmiSyncBroadcastFn(size,msg);
00269 CmiFree(msg);
00270 }
00271
00272 void CmiSyncBroadcastAllFn(int size, char *msg)
00273 {
00274 int i;
00275 for(i=0; i<CmiNumPes(); i++)
00276 CmiSyncSendFn(i,size,msg);
00277 }
00278
00279
00280 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00281 {
00282 CmiSyncBroadcastAllFn(size, msg);
00283 return 0;
00284 }
00285
00286
00287 void CmiFreeBroadcastAllFn(int size, char *msg)
00288 {
00289 int i;
00290 for(i=0; i<CmiNumPes(); i++) {
00291 if(CmiMyPe() != i) {
00292 CmiSyncSendFn(i,size,msg);
00293 }
00294 }
00295 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00296 CQdCreate(CpvAccess(cQdState), 1);
00297 }
00298
00299
00300
00301
00302
00303
00304 static void ** AllocBlock(unsigned int len)
00305 {
00306 void ** blk;
00307
00308 blk=(void **)g_malloc(len*sizeof(void *));
00309 if(blk==(void **)0) {
00310 CmiError("Cannot Allocate Memory!\n");
00311 abort();
00312 }
00313 return blk;
00314 }
00315
00316 static void
00317 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
00318 {
00319 memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00320 memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00321 }
00322
00323 McQueue * McQueueCreate(void)
00324 {
00325 McQueue *queue;
00326
00327 queue = (McQueue *) g_malloc(sizeof(McQueue));
00328 if(queue==(McQueue *)0) {
00329 CmiError("Cannot Allocate Memory!\n");
00330 abort();
00331 }
00332 queue->sema = usnewsema(arena, 1);
00333 usinitsema(queue->sema, 1);
00334 queue->blk = AllocBlock(BLK_LEN);
00335 queue->blk_len = BLK_LEN;
00336 queue->first = 0;
00337 queue->len = 0;
00338 queue->maxlen = 0;
00339 return queue;
00340 }
00341
00342 void McQueueAddToBack(McQueue *queue, void *element)
00343 {
00344 #if NODE_0_IS_CONVHOST
00345 inside_comm = 1;
00346 #endif
00347 #ifdef DEBUG
00348 printf("[%d] Waiting for lock\n",CmiMyPe());
00349 #endif
00350 uspsema(queue->sema);
00351 #ifdef DEBUG
00352 printf("[%d] Acquired lock\n",CmiMyPe());
00353 #endif
00354 if(queue->len==queue->blk_len) {
00355 void **blk;
00356
00357 queue->blk_len *= 3;
00358 blk = AllocBlock(queue->blk_len);
00359 SpillBlock(queue->blk, blk, queue->first, queue->len);
00360 g_free(queue->blk);
00361 queue->blk = blk;
00362 queue->first = 0;
00363 }
00364 queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00365 if(queue->len>queue->maxlen)
00366 queue->maxlen = queue->len;
00367 usvsema(queue->sema);
00368 #ifdef DEBUG
00369 printf("[%d] released lock\n",CmiMyPe());
00370 #endif
00371 #if NODE_0_IS_CONVHOST
00372 inside_comm = 0;
00373 #endif
00374 }
00375
00376
00377 void * McQueueRemoveFromFront(McQueue *queue)
00378 {
00379 void *element = (void *) 0;
00380 void *localmsg = (void *) 0;
00381 if(queue->len) {
00382 uspsema(queue->sema);
00383 element = queue->blk[queue->first++];
00384 queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00385 queue->len--;
00386 usvsema(queue->sema);
00387 }
00388 if(element) {
00389 #ifdef DEBUG
00390 printf("[%d] received message of size %d\n", CmiMyPe(), CmiSize(element));
00391 #endif
00392 localmsg = CmiAlloc(CmiSize(element));
00393 memcpy(localmsg, element, CmiSize(element));
00394 g_free((char *)element-8);
00395 }
00396 return localmsg;
00397 }
00398
00399
00400
00401
00402 CpvStaticDeclare(double,inittime_wallclock);
00403 CpvStaticDeclare(double,inittime_virtual);
00404
00405 void CmiTimerInit(void)
00406 {
00407 struct timespec temp;
00408 CpvInitialize(double, inittime_wallclock);
00409 CpvInitialize(double, inittime_virtual);
00410 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00411 CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
00412 1e-9 * temp.tv_nsec;
00413 CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
00414 }
00415
00416 double CmiWallTimer(void)
00417 {
00418 struct timespec temp;
00419 double currenttime;
00420
00421 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00422 currenttime = (double) temp.tv_sec +
00423 1e-9 * temp.tv_nsec;
00424 return (currenttime - CpvAccess(inittime_wallclock));
00425 }
00426
00427 double CmiCpuTimer(void)
00428 {
00429 struct timespec temp;
00430 double currenttime;
00431
00432 clock_gettime(CLOCK_SGI_CYCLE, &temp);
00433 currenttime = (double) temp.tv_sec +
00434 1e-9 * temp.tv_nsec;
00435 return (currenttime - CpvAccess(inittime_virtual));
00436 }
00437
00438 double CmiTimer(void)
00439 {
00440 return CmiCpuTimer();
00441 }
00442