00001
00002
00003
00004
00005
00006
00007
00014 #include <stdio.h>
00015 #include <sys/time.h>
00016 #include "converse.h"
00017 #include <mpproto.h>
00018 #include <sys/systemcfg.h>
00019
00020 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
00021
00022 int _Cmi_mype;
00023 int _Cmi_numpes;
00024 int _Cmi_myrank;
00025 CpvDeclare(void*, CmiLocalQueue);
00026
00027 #define BLK_LEN 512
00028
00029 static void **recdQueue_blk;
00030 static unsigned int recdQueue_blk_len;
00031 static unsigned int recdQueue_first;
00032 static unsigned int recdQueue_len;
00033 static void recdQueueInit(void);
00034 static void recdQueueAddToBack(void *element);
00035 static void *recdQueueRemoveFromFront(void);
00036
00037 typedef struct msg_list {
00038 int msgid;
00039 char *msg;
00040 struct msg_list *next;
00041 } SMSG_LIST;
00042
00043 static int Cmi_dim;
00044 static double itime;
00045
00046 static SMSG_LIST *sent_msgs=0;
00047 static SMSG_LIST *end_sent=0;
00048
00049 static int allmsg, dontcare, msgtype;
00050
00051
00052
00053 void CmiTimerInit(void)
00054 {
00055 timebasestruct_t time;
00056 read_real_time(&time, TIMEBASE_SZ);
00057 time_base_to_time(&time, TIMEBASE_SZ);
00058 itime=(double)time.tb_high + 1.0e-9*((double) time.tb_low);
00059 }
00060
00061 double CmiTimer(void)
00062 {
00063 double t;
00064 timebasestruct_t time;
00065
00066 read_real_time(&time, TIMEBASE_SZ);
00067 time_base_to_time(&time, TIMEBASE_SZ);
00068 t=(double)time.tb_high + 1.0e-9*((double) time.tb_low);
00069 return (t-itime);
00070 }
00071
00072 double CmiWallTimer(void)
00073 {
00074 double t;
00075 timebasestruct_t time;
00076
00077 read_real_time(&time, TIMEBASE_SZ);
00078 time_base_to_time(&time, TIMEBASE_SZ);
00079 t=(double)time.tb_high + 1.0e-9*((double) time.tb_low);
00080 return (t-itime);
00081 }
00082
00083 double CmiCpuTimer(void)
00084 {
00085 return CmiTimer();
00086 }
00087
00088 static int CmiAllAsyncMsgsSent(void)
00089 {
00090 SMSG_LIST *msg_tmp = sent_msgs;
00091
00092 while(msg_tmp!=0) {
00093 if(mpc_status(msg_tmp->msgid)<0)
00094 return 0;
00095 msg_tmp = msg_tmp->next;
00096 }
00097 return 1;
00098 }
00099
00100 int CmiAsyncMsgSent(CmiCommHandle c) {
00101
00102 SMSG_LIST *msg_tmp = sent_msgs;
00103
00104 while ((msg_tmp) && ((CmiCommHandle)msg_tmp->msgid != c))
00105 msg_tmp = msg_tmp->next;
00106
00107 if ((msg_tmp) && (mpc_status(msg_tmp->msgid)<0))
00108 return 0;
00109 else
00110 return 1;
00111 }
00112
00113 void CmiReleaseCommHandle(CmiCommHandle c)
00114 {
00115 return;
00116 }
00117
00118
00119 static void CmiReleaseSentMessages(void)
00120 {
00121 SMSG_LIST *msg_tmp=sent_msgs;
00122 SMSG_LIST *prev=0;
00123 SMSG_LIST *temp;
00124
00125 while(msg_tmp!=0) {
00126 if(mpc_status(msg_tmp->msgid)>=0) {
00127
00128 temp = msg_tmp->next;
00129 if(prev==0)
00130 sent_msgs = temp;
00131 else
00132 prev->next = temp;
00133 CmiFree(msg_tmp->msg);
00134 CmiFree(msg_tmp);
00135 msg_tmp = temp;
00136 } else {
00137 prev = msg_tmp;
00138 msg_tmp = msg_tmp->next;
00139 }
00140 }
00141 end_sent = prev;
00142 }
00143
00144 static void PumpMsgs(void)
00145 {
00146 int src, type, mstat;
00147 size_t nbytes;
00148 char *msg;
00149
00150 while(1) {
00151 src = dontcare; type = msgtype;
00152 mpc_probe(&src, &type, &mstat);
00153 if(mstat<0)
00154 return;
00155 msg = (char *) CmiAlloc((size_t) mstat);
00156 mpc_brecv(msg, (size_t)mstat, &src, &type, &nbytes);
00157 recdQueueAddToBack(msg);
00158 }
00159 }
00160
00161
00162
00163 void *CmiGetNonLocal(void)
00164 {
00165 CmiReleaseSentMessages();
00166 PumpMsgs();
00167 return recdQueueRemoveFromFront();
00168 }
00169
00170 void CmiNotifyIdle(void)
00171 {
00172 CmiReleaseSentMessages();
00173 PumpMsgs();
00174 }
00175
00176
00177
00178 void CmiSyncSendFn(int destPE, int size, char *msg)
00179 {
00180 char *dupmsg = (char *) CmiAlloc(size);
00181 memcpy(dupmsg, msg, size);
00182 if (_Cmi_mype==destPE) {
00183 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
00184 CQdCreate(CpvAccess(cQdState), 1);
00185 }
00186 else
00187 CmiAsyncSendFn(destPE, size, dupmsg);
00188 }
00189
00190
00191 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00192 {
00193 SMSG_LIST *msg_tmp;
00194 int msgid;
00195
00196 mpc_send(msg, size, destPE, msgtype, &msgid);
00197 msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
00198 msg_tmp->msgid = msgid;
00199 msg_tmp->msg = msg;
00200 msg_tmp->next = 0;
00201 if(sent_msgs==0)
00202 sent_msgs = msg_tmp;
00203 else
00204 end_sent->next = msg_tmp;
00205 end_sent = msg_tmp;
00206 CQdCreate(CpvAccess(cQdState), 1);
00207 return (CmiCommHandle) msgid;
00208 }
00209
00210 void CmiFreeSendFn(int destPE, int size, char *msg)
00211 {
00212 if (_Cmi_mype==destPE) {
00213 CQdCreate(CpvAccess(cQdState), 1);
00214 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00215 } else {
00216 CmiAsyncSendFn(destPE, size, msg);
00217 }
00218 }
00219
00220
00221
00222
00223 void CmiSyncBroadcastFn(int size, char *msg)
00224 {
00225 int i ;
00226
00227 for ( i=_Cmi_mype+1; i<_Cmi_numpes; i++ )
00228 CmiSyncSendFn(i, size,msg) ;
00229 for ( i=0; i<_Cmi_mype; i++ )
00230 CmiSyncSendFn(i, size,msg) ;
00231 }
00232
00233
00234 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00235 {
00236 int i ;
00237
00238 for ( i=_Cmi_mype+1; i<_Cmi_numpes; i++ )
00239 CmiAsyncSendFn(i,size,msg) ;
00240 for ( i=0; i<_Cmi_mype; i++ )
00241 CmiAsyncSendFn(i,size,msg) ;
00242 return (CmiCommHandle) (CmiAllAsyncMsgsSent());
00243 }
00244
00245 void CmiFreeBroadcastFn(int size, char *msg)
00246 {
00247 CmiSyncBroadcastFn(size,msg);
00248 CmiFree(msg);
00249 }
00250
00251 void CmiSyncBroadcastAllFn(int size, char *msg)
00252 {
00253 int i ;
00254
00255 for ( i=0; i<_Cmi_numpes; i++ )
00256 CmiSyncSendFn(i,size,msg) ;
00257 }
00258
00259 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00260 {
00261 int i ;
00262
00263 for ( i=1; i<_Cmi_numpes; i++ )
00264 CmiAsyncSendFn(i,size,msg) ;
00265 return (CmiCommHandle) (CmiAllAsyncMsgsSent());
00266 }
00267
00268 void CmiFreeBroadcastAllFn(int size, char *msg)
00269 {
00270 int i ;
00271
00272 for ( i=0; i<_Cmi_numpes; i++ )
00273 CmiSyncSendFn(i,size,msg) ;
00274 CmiFree(msg) ;
00275 }
00276
00277
00278
00279 void ConverseExit(void)
00280 {
00281 int msgid = allmsg;
00282 size_t nbytes;
00283 ConverseCommonExit();
00284 mpc_wait(&msgid, &nbytes);
00285 exit(0);
00286 }
00287
00288 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00289 {
00290 int n ;
00291 int nbuf[4];
00292
00293 _Cmi_myrank = 0;
00294 mpc_environ(&_Cmi_numpes, &_Cmi_mype);
00295 mpc_task_query(nbuf, 4, 3);
00296 dontcare = nbuf[0];
00297 allmsg = nbuf[1];
00298 mpc_task_query(nbuf, 2, 2);
00299 msgtype = nbuf[0];
00300
00301
00302 for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
00303 Cmi_dim++ ;
00304
00305 CmiTimerInit();
00306 CpvInitialize(void *, CmiLocalQueue);
00307 CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00308 recdQueueInit();
00309 CthInit(argv);
00310 ConverseCommonInit(argv);
00311 if (initret==0) {
00312 fn(argc, argv);
00313 if (usched==0) CsdScheduler(-1);
00314 ConverseExit();
00315 }
00316 }
00317
00318
00319
00320
00321
00322
00323
00324 void CmiAbort(const char *message)
00325 {
00326 CmiError(message);
00327 exit(1);
00328 }
00329
00330
00331
00332
00333
00334 static void ** AllocBlock(unsigned int len)
00335 {
00336 void ** blk;
00337
00338 blk=(void **)CmiAlloc(len*sizeof(void *));
00339 if(blk==(void **)0) {
00340 CmiError("Cannot Allocate Memory!\n");
00341 mpc_stopall(1);
00342 }
00343 return blk;
00344 }
00345
00346 static void
00347 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
00348 {
00349 memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00350 memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00351 }
00352
00353 void recdQueueInit(void)
00354 {
00355 recdQueue_blk = AllocBlock(BLK_LEN);
00356 recdQueue_blk_len = BLK_LEN;
00357 recdQueue_first = 0;
00358 recdQueue_len = 0;
00359 }
00360
00361 void recdQueueAddToBack(void *element)
00362 {
00363 if(recdQueue_len==recdQueue_blk_len) {
00364 void **blk;
00365 recdQueue_blk_len *= 3;
00366 blk = AllocBlock(recdQueue_blk_len);
00367 SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
00368 CmiFree(recdQueue_blk);
00369 recdQueue_blk = blk;
00370 recdQueue_first = 0;
00371 }
00372 recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
00373 }
00374
00375
00376 void * recdQueueRemoveFromFront(void)
00377 {
00378 if(recdQueue_len) {
00379 void *element;
00380 element = recdQueue_blk[recdQueue_first++];
00381 recdQueue_first %= recdQueue_blk_len;
00382 recdQueue_len--;
00383 return element;
00384 }
00385 return 0;
00386 }
00387