00001
00002
00003
00004
00005
00006 #if CMK_C_INLINE
00007 #define INLINE_KEYWORD inline
00008 #else
00009 #define INLINE_KEYWORD
00010 #endif
00011
00012
00013 #ifndef CMK_BROADCAST_SPANNING_TREE
00014 #define CMK_BROADCAST_SPANNING_TREE 1
00015 #endif
00016
00017 #ifndef CMK_BROADCAST_HYPERCUBE
00018 #define CMK_BROADCAST_HYPERCUBE 0
00019 #endif
00020
00021 #define BROADCAST_SPANNING_FACTOR 4
00022
00023 #define BROADCAST_SPANNING_INTRA_FACTOR 8
00024
00025
00026
00027
00028
00029
00030 #define CMI_BROADCAST_ROOT(msg) ((CmiMsgHeaderBasic *)msg)->root
00031 #define CMI_SET_BROADCAST_ROOT(msg, root) CMI_BROADCAST_ROOT(msg) = (root);
00032
00042 #ifndef CMK_OFFLOAD_BCAST_PROCESS
00043 #define CMK_OFFLOAD_BCAST_PROCESS 0
00044 #endif
00045
00046 #if CMK_OFFLOAD_BCAST_PROCESS
00047 CsvDeclare(PCQueue, procBcastQ);
00048 #if CMK_NODE_QUEUE_AVAILABLE
00049 CsvDeclare(PCQueue, nodeBcastQ);
00050 #endif
00051 #endif
00052
00053 #if CMK_WITH_STATS
00054 static int MSG_STATISTIC = 0;
00055 int msg_histogram[22];
00056 static int _cmi_log2(int size)
00057 {
00058 int ret = 1;
00059 size = size-1;
00060 while( (size=size>>1)>0) ret++;
00061 return ret;
00062 }
00063 #endif
00064
00065 #if CMK_BROADCAST_HYPERCUBE
00066
00067 static int CmiNodesDim;
00068 #endif
00069
00070
00071
00072 static void handleOneBcastMsg(int size, char *msg);
00073 static void processBcastQs();
00074
00075
00076
00077
00078
00079 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
00080 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
00081 static void SendSpanningChildrenProc(int size, char *msg);
00082 static void SendHyperCubeProc(int size, char *msg);
00083 #if CMK_NODE_QUEUE_AVAILABLE
00084 static void SendSpanningChildrenNode(int size, char *msg);
00085 static void SendHyperCubeNode(int size, char *msg);
00086 #endif
00087
00088 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
00089 static void SendHyperCube(int size, char *msg, int rankToAssign, int startNode);
00090
00091 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
00092 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
00093 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
00094 #endif
00095 #endif
00096
00097
00098 void CmiSyncBroadcastFn(int size, char *msg);
00099 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
00100 void CmiFreeBroadcastFn(int size, char *msg);
00101
00102 void CmiSyncBroadcastAllFn(int size, char *msg);
00103 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
00104 void CmiFreeBroadcastAllFn(int size, char *msg);
00105
00106 #if CMK_NODE_QUEUE_AVAILABLE
00107 void CmiSyncNodeBroadcastFn(int size, char *msg);
00108 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
00109 void CmiFreeNodeBroadcastFn(int size, char *msg);
00110
00111 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
00112 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
00113 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
00114 #endif
00115
00116
00117
00118 #define CMI_DEST_RANK(msg) ((CmiMsgHeaderBasic *)msg)->rank
00119
00120 #ifndef CMK_HAS_SIZE_IN_MSGHDR
00121 #define CMK_HAS_SIZE_IN_MSGHDR 1
00122 #endif
00123 #if CMK_HAS_SIZE_IN_MSGHDR
00124 #define CMI_MSG_SIZE(msg) ((CmiMsgHeaderBasic *)msg)->size
00125 #else
00126 #define CMI_MSG_SIZE(msg) (CmiAbort("Has no msg size in header"))
00127 #endif
00128
00129 #if CMK_NODE_QUEUE_AVAILABLE
00130
00131
00132
00133
00134 #define DGRAM_NODEMESSAGE (0x1FFB)
00135 #endif
00136
00137
00138 int _Cmi_mynode;
00139 int _Cmi_mynodesize;
00140 int _Cmi_numnodes;
00141 int _Cmi_numpes;
00142
00143 CpvDeclare(void*, CmiLocalQueue);
00144
00145
00146 #define P2P_SYNC 0x1
00147 #define P2P_ASYNC 0x2
00148 #define BCAST_SYNC 0x4
00149 #define BCAST_ASYNC 0x8
00150 #define OUT_OF_BAND 0x10
00151
00152 enum MACHINE_SMP_MODE {
00153 INVALID_MODE,
00154 #if CMK_BLUEGENEQ
00155 COMM_THREAD_SEND_RECV = 1,
00156 #else
00157 COMM_THREAD_SEND_RECV = 0,
00158 #endif
00159 COMM_THREAD_ONLY_RECV,
00160 COMM_WORK_THREADS_SEND_RECV,
00161 COMM_THREAD_NOT_EXIST
00162 };
00163
00164 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
00165
00166
00167 #if CMK_SMP
00168 static volatile int commThdExit = 0;
00169 static CmiNodeLock commThdExitLock = 0;
00170
00180 #ifndef CMK_SMP_NO_COMMTHD
00181 #define CMK_SMP_NO_COMMTHD 0
00182 #endif
00183
00184 #if CMK_SMP_NO_COMMTHD
00185 int Cmi_commthread = 0;
00186 #else
00187 int Cmi_commthread = 1;
00188 #endif
00189
00190 #endif
00191
00192
00193 static int Cmi_nodestart;
00194
00195
00196
00197
00198
00199 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
00200 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
00201 #endif
00202
00203 CpvDeclare(unsigned , networkProgressCount);
00204 int networkProgressPeriod;
00205
00206
00207
00208 void CmiAbort(const char *message);
00209 static void PerrorExit(const char *msg);
00210
00211
00212 static void handleOneRecvedMsg(int size, char *msg);
00213
00214
00215
00216
00217
00218 static void SendToPeers(int size, char *msg);
00219
00220
00221 void CmiPushPE(int rank, void *msg);
00222
00223 #if CMK_NODE_QUEUE_AVAILABLE
00224 void CmiPushNode(void *msg);
00225 #endif
00226
00227
00228
00229
00230 #ifndef USE_COMMON_SYNC_P2P
00231 #define USE_COMMON_SYNC_P2P 1
00232 #endif
00233 #ifndef USE_COMMON_ASYNC_P2P
00234 #define USE_COMMON_ASYNC_P2P 1
00235 #endif
00236 #ifndef USE_COMMON_SYNC_BCAST
00237 #define USE_COMMON_SYNC_BCAST 1
00238 #endif
00239 #ifndef USE_COMMON_ASYNC_BCAST
00240 #define USE_COMMON_ASYNC_BCAST 1
00241 #endif
00242
00243 static void CmiSendSelf(char *msg);
00244
00245 void CmiSyncSendFn(int destPE, int size, char *msg);
00246 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
00247 void CmiFreeSendFn(int destPE, int size, char *msg);
00248
00249 #if CMK_NODE_QUEUE_AVAILABLE
00250 static void CmiSendNodeSelf(char *msg);
00251
00252 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
00253 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
00254 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
00255
00256 #endif
00257
00258
00259 static char **Cmi_argv;
00260 static char **Cmi_argvcopy;
00261 static CmiStartFn Cmi_startfn;
00262 static int Cmi_usrsched;
00263 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
00264 static void ConverseRunPE(int everReturn);
00265
00266
00267 static void AdvanceCommunication(int whenidle);
00268 static void CommunicationServer(int sleepTime);
00269 static void CommunicationServerThread(int sleepTime);
00270 void ConverseExit(void);
00271
00272
00273 void *CmiGetNonLocal(void);
00274 #if CMK_NODE_QUEUE_AVAILABLE
00275 void *CmiGetNonLocalNodeQ(void);
00276 #endif
00277
00278 static char *CopyMsg(char *msg, int len);
00279
00280
00281
00282 #include "machine-smp.c"
00283
00284
00285 typedef struct {
00286 int sleepMs;
00287 int nIdles;
00288 CmiState cs;
00289 } CmiIdleState;
00290
00291 static CmiIdleState *CmiNotifyGetState(void);
00292
00305 static void CmiNotifyBeginIdle(CmiIdleState *s);
00306 static void CmiNotifyStillIdle(CmiIdleState *s);
00307 void CmiNotifyIdle(void);
00308
00309
00310
00311 #if !CMK_SMP
00312
00313 static struct CmiStateStruct Cmi_state;
00314 int _Cmi_mype;
00315 int _Cmi_myrank;
00316
00317 void CmiMemLock() {}
00318 void CmiMemUnlock() {}
00319
00320 #define CmiGetState() (&Cmi_state)
00321 #define CmiGetStateN(n) (&Cmi_state)
00322
00323 void CmiYield(void) {
00324 sleep(0);
00325 }
00326
00327 static void CmiStartThreads(char **argv) {
00328 CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
00329 _Cmi_mype = Cmi_nodestart;
00330 _Cmi_myrank = 0;
00331 }
00332 #else
00333
00334 INLINE_KEYWORD int CmiMyPe() {
00335 return CmiGetState()->pe;
00336 }
00337 INLINE_KEYWORD int CmiMyRank() {
00338 return CmiGetState()->rank;
00339 }
00340 INLINE_KEYWORD int CmiNodeFirst(int node) {
00341 return node*_Cmi_mynodesize;
00342 }
00343 INLINE_KEYWORD int CmiNodeSize(int node) {
00344 return _Cmi_mynodesize;
00345 }
00346 INLINE_KEYWORD int CmiNodeOf(int pe) {
00347 return (pe/_Cmi_mynodesize);
00348 }
00349 INLINE_KEYWORD int CmiRankOf(int pe) {
00350 return pe%_Cmi_mynodesize;
00351 }
00352 #endif
00353 CsvDeclare(CmiNodeState, NodeState);
00354
00355
00356 #include "machine-broadcast.c"
00357 #include "immediate.c"
00358 #include "machine-commthd-util.c"
00359
00360
00361 static void PerrorExit(const char *msg) {
00362 perror(msg);
00363 exit(1);
00364 }
00365
00366
00367
00368 void CmiPushPE(int rank,void *msg) {
00369 CmiState cs = CmiGetStateN(rank);
00370 MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
00371 #if CMK_IMMEDIATE_MSG
00372 if (CmiIsImmediate(msg)) {
00373 MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
00374 CMI_DEST_RANK(msg) = rank;
00375 CmiPushImmediateMsg(msg);
00376 MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
00377 return;
00378 }
00379 #endif
00380
00381 PCQueuePush(cs->recv,(char*)msg);
00382
00383 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00384 if (_Cmi_noprocforcommthread)
00385 #endif
00386 CmiIdleLock_addMessage(&cs->idle);
00387 MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
00388 }
00389
00390 #if CMK_NODE_QUEUE_AVAILABLE
00391
00392 void CmiPushNode(void *msg) {
00393 MACHSTATE(3,"Pushing message into NodeRecv queue");
00394 #if CMK_IMMEDIATE_MSG
00395 if (CmiIsImmediate(msg)) {
00396 CMI_DEST_RANK(msg) = 0;
00397 CmiPushImmediateMsg(msg);
00398 return;
00399 }
00400 #endif
00401 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00402 PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
00403 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00404
00405 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
00406 if (_Cmi_noprocforcommthread)
00407 #endif
00408 {
00409 CmiState cs=CmiGetStateN(0);
00410 CmiIdleLock_addMessage(&cs->idle);
00411 }
00412 }
00413 #endif
00414
00415
00416 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
00417 int isBcastMsg = 0;
00418 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
00419 isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
00420 #endif
00421
00422 if (isBcastMsg) {
00423 handleOneBcastMsg(size, msg);
00424 return;
00425 }
00426
00427 #if CMK_NODE_QUEUE_AVAILABLE
00428 if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
00429 CmiPushNode(msg);
00430 return;
00431 }
00432 #endif
00433 CmiPushPE(CMI_DEST_RANK(msg), msg);
00434
00435 }
00436
00437
00438 static void SendToPeers(int size, char *msg) {
00439
00440
00441
00442 int exceptRank = CMI_DEST_RANK(msg);
00443 int i;
00444 for (i=0; i<exceptRank; i++) {
00445 CmiPushPE(i, CopyMsg(msg, size));
00446 }
00447 for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
00448 CmiPushPE(i, CopyMsg(msg, size));
00449 }
00450 }
00451
00452
00453
00454 static void CmiSendSelf(char *msg) {
00455 #if CMK_IMMEDIATE_MSG
00456 if (CmiIsImmediate(msg)) {
00457
00458 CmiPushImmediateMsg(msg);
00459 CmiHandleImmediate();
00460 return;
00461 }
00462 #endif
00463 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00464 }
00465
00466
00467 #if USE_COMMON_SYNC_P2P
00468 void CmiSyncSendFn(int destPE, int size, char *msg) {
00469 char *dupmsg = CopyMsg(msg, size);
00470 CmiFreeSendFn(destPE, size, dupmsg);
00471 }
00472
00473 #if CMK_USE_PXSHM
00474 #include "machine-pxshm.c"
00475 #endif
00476 #if CMK_USE_XPMEM
00477 #include "machine-xpmem.c"
00478 #endif
00479
00480 static int refcount = 0;
00481
00482 #if CMK_USE_OOB
00483 CpvExtern(int, _urgentSend);
00484 #endif
00485
00486
00487 #if CMK_C_INLINE
00488 inline
00489 #endif
00490 CmiCommHandle CmiSendNetworkFunc(int destNode, int size, char *msg, int mode)
00491 {
00492 int rank;
00493 #if CMK_USE_PXSHM
00494 if (CmiValidPxshm(destNode, size)) {
00495 CmiSendMessagePxshm(msg, size, destNode, &refcount);
00496
00497 return 0;
00498 }
00499 #endif
00500 #if CMK_USE_XPMEM
00501 if (CmiValidXpmem(destNode, size)) {
00502 CmiSendMessageXpmem(msg, size, destNode, &refcount);
00503
00504 return 0;
00505 }
00506 #endif
00507 #if CMK_PERSISTENT_COMM
00508 if (CpvAccess(phs)) {
00509 if (size > PERSIST_MIN_SIZE) {
00510 CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
00511 LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)], destNode, size, msg);
00512 return 0;
00513 }
00514 }
00515 #endif
00516
00517 #if CMK_WITH_STATS
00518 if (MSG_STATISTIC)
00519 {
00520 int ret_log = _cmi_log2(size);
00521 if(ret_log >21) ret_log = 21;
00522 msg_histogram[ret_log]++;
00523 }
00524 #endif
00525 #if CMK_USE_OOB
00526 if (CpvAccess(_urgentSend)) mode |= OUT_OF_BAND;
00527 #endif
00528 return LrtsSendFunc(destNode, size, msg, mode);
00529 }
00530
00531 void CmiFreeSendFn(int destPE, int size, char *msg) {
00532 CMI_SET_BROADCAST_ROOT(msg, 0);
00533 CQdCreate(CpvAccess(cQdState), 1);
00534 if (CmiMyPe()==destPE) {
00535 CmiSendSelf(msg);
00536 #if CMK_PERSISTENT_COMM
00537 if (CpvAccess(phs)) CpvAccess(curphs)++;
00538 #endif
00539 }
00540 else {
00541 int destNode = CmiNodeOf(destPE);
00542 int destRank = CmiRankOf(destPE);
00543 #if CMK_SMP
00544 if (CmiMyNode()==destNode) {
00545 CmiPushPE(destRank, msg);
00546 #if CMK_PERSISTENT_COMM
00547 if (CpvAccess(phs)) CpvAccess(curphs)++;
00548 #endif
00549 return;
00550 }
00551 #endif
00552 CMI_DEST_RANK(msg) = destRank;
00553 CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
00554 #if CMK_PERSISTENT_COMM
00555 if (CpvAccess(phs)) CpvAccess(curphs)++;
00556 #endif
00557 }
00558 }
00559 #endif
00560
00561 #if USE_COMMON_ASYNC_P2P
00562 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
00563 int destNode = CmiNodeOf(destPE);
00564 if (destNode == CmiMyNode()) {
00565 CmiSyncSendFn(destPE,size,msg);
00566 return 0;
00567 } else {
00568 #if CMK_WITH_STATS
00569 if ( MSG_STATISTIC)
00570 {
00571 int ret_log = _cmi_log2(size);
00572 if(ret_log >21) ret_log = 21;
00573 msg_histogram[ret_log]++;
00574 }
00575 #endif
00576 return CmiSendNetworkFunc(destPE, size, msg, P2P_ASYNC);
00577 }
00578 }
00579 #endif
00580
00581 #if CMK_NODE_QUEUE_AVAILABLE
00582 static void CmiSendNodeSelf(char *msg) {
00583 #if CMK_IMMEDIATE_MSG
00584 if (CmiIsImmediate(msg)) {
00585 CmiPushImmediateMsg(msg);
00586 if (!_immRunning) CmiHandleImmediate();
00587 return;
00588 }
00589 #endif
00590 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00591 PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
00592 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00593 }
00594
00595 #if USE_COMMON_ASYNC_P2P
00596 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
00597 char *dupmsg = CopyMsg(msg, size);
00598 CmiFreeNodeSendFn(destNode, size, dupmsg);
00599 }
00600
00601 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
00602 CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
00603 CQdCreate(CpvAccess(cQdState), 1);
00604 CMI_SET_BROADCAST_ROOT(msg, 0);
00605 if (destNode == CmiMyNode()) {
00606 CmiSendNodeSelf(msg);
00607 } else {
00608 #if CMK_WITH_STATS
00609 if ( MSG_STATISTIC)
00610 {
00611 int ret_log = _cmi_log2(size);
00612 if(ret_log >21) ret_log = 21;
00613 msg_histogram[ret_log]++;
00614 }
00615 #endif
00616 CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
00617 }
00618 #if CMK_PERSISTENT_COMM
00619 if (CpvAccess(phs)) CpvAccess(curphs)++;
00620 #endif
00621 }
00622 #endif
00623
00624 #if USE_COMMON_ASYNC_P2P
00625 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
00626 if (destNode == CmiMyNode()) {
00627 CmiSyncNodeSendFn(destNode, size, msg);
00628 return 0;
00629 } else {
00630 #if CMK_WITH_STATS
00631 if ( MSG_STATISTIC)
00632 {
00633 int ret_log = _cmi_log2(size);
00634 if(ret_log >21) ret_log = 21;
00635 msg_histogram[ret_log]++;
00636 }
00637 #endif
00638 return CmiSendNetworkFunc(destNode, size, msg, P2P_ASYNC);
00639 }
00640 }
00641 #endif
00642 #endif
00643
00644
00645 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
00646 int _ii;
00647 int tmp;
00648 #if CMK_WITH_STATS
00649 MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
00650 #endif
00651
00652 _Cmi_mynodesize = 1;
00653 if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
00654 CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
00655 #if ! CMK_SMP
00656 if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
00657 CmiAbort("+ppn cannot be used in non SMP version!\n");
00658 #endif
00659
00660
00661
00662 networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
00663 CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
00664
00665
00666
00667
00668
00669
00670 #if CMK_WITH_STATS
00671 if ( MSG_STATISTIC)
00672 {
00673 for(_ii=0; _ii<22; _ii++)
00674 msg_histogram[_ii] = 0;
00675 }
00676 #endif
00677
00678 LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
00679
00680 if (_Cmi_mynode==0) {
00681 #if !CMK_SMP
00682 printf("Charm++> Running on non-SMP mode\n");
00683 #else
00684 printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
00685 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
00686 printf("Charm++> The comm. thread both sends and receives messages\n");
00687 } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
00688 printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
00689 } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
00690 printf("Charm++> Both comm. thread and worker thread send and messages\n");
00691 } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
00692 printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
00693 } else {
00694 CmiAbort("Charm++> Invalid SMP mode setting\n");
00695 }
00696 #endif
00697 }
00698
00699 _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
00700 Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
00701 Cmi_argvcopy = CmiCopyArgs(argv);
00702 Cmi_argv = argv;
00703 Cmi_startfn = fn;
00704 Cmi_usrsched = usched;
00705
00706 #if CMK_USE_PXSHM
00707 CmiInitPxshm(argv);
00708 #endif
00709 #if CMK_USE_XPMEM
00710 CmiInitXpmem(argv);
00711 #endif
00712
00713
00714 #if CMK_BROADCAST_HYPERCUBE
00715
00716 tmp = CmiNumNodes()-1;
00717 CmiNodesDim = 0;
00718 while (tmp>0) {
00719 CmiNodesDim++;
00720 tmp = tmp >> 1;
00721 }
00722 if (CmiNumNodes()==1) CmiNodesDim=1;
00723 #endif
00724
00725 CsvInitialize(CmiNodeState, NodeState);
00726 CmiNodeStateInit(&CsvAccess(NodeState));
00727 #if CMK_SMP
00728 commThdExitLock = CmiCreateLock();
00729 #endif
00730
00731 #if CMK_OFFLOAD_BCAST_PROCESS
00732
00733 CsvInitialize(PCQueue, procBcastQ);
00734 #if CMK_NODE_QUEUE_AVAILABLE
00735 CsvInitialize(PCQueue, nodeBcastQ);
00736 #endif
00737 #endif
00738
00739 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
00740 CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
00741 #endif
00742
00743 CmiStartThreads(argv);
00744
00745 ConverseRunPE(initret);
00746 }
00747
00748 extern void ConverseCommonInit(char **argv);
00749 extern void CthInit(char **argv);
00750
00751 static void ConverseRunPE(int everReturn) {
00752 CmiState cs;
00753 char** CmiMyArgv;
00754
00755 LrtsPreCommonInit(everReturn);
00756
00757 #if CMK_OFFLOAD_BCAST_PROCESS
00758 int createQueue = 1;
00759 #if CMK_SMP
00760 #if CMK_SMP_NO_COMMTHD
00761
00762 if (CmiMyRank()) createQueue = 0;
00763 #else
00764 if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
00765 #endif
00766 #endif
00767
00768 if (createQueue) {
00769 CsvAccess(procBcastQ) = PCQueueCreate();
00770 #if CMK_NODE_QUEUE_AVAILABLE
00771 CsvAccess(nodeBcastQ) = PCQueueCreate();
00772 #endif
00773 }
00774 #endif
00775
00776 CmiNodeAllBarrier();
00777
00778 cs = CmiGetState();
00779 CpvInitialize(void *,CmiLocalQueue);
00780 CpvAccess(CmiLocalQueue) = cs->localqueue;
00781
00782 if (CmiMyRank())
00783 CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
00784 else
00785 CmiMyArgv=Cmi_argv;
00786
00787 CthInit(CmiMyArgv);
00788
00789
00790
00791
00792 CpvInitialize(unsigned , networkProgressCount);
00793 CpvAccess(networkProgressCount) = 0;
00794
00795 ConverseCommonInit(CmiMyArgv);
00796
00797 LrtsPostCommonInit(everReturn);
00798
00799 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
00800 CmiInitNotifyCommThdScheme();
00801 #endif
00802
00803
00804 _immediateReady = 1;
00805
00806 if(CharmLibInterOperate) {
00807
00808
00809 Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
00810 CsdScheduler(-1);
00811 } else {
00812
00813 if (CmiMyRank() == CmiMyNodeSize()) {
00814 Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
00815 while (1) CommunicationServerThread(5);
00816 } else {
00817 if (!everReturn) {
00818 Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
00819 if (Cmi_usrsched==0) CsdScheduler(-1);
00820 ConverseExit();
00821 }
00822 }
00823 }
00824 }
00825
00826
00827
00828 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
00829 int doProcessBcast = 1;
00830
00831 #if CMK_USE_PXSHM
00832 CommunicationServerPxshm();
00833 #endif
00834 #if CMK_USE_XPMEM
00835 CommunicationServerXpmem();
00836 #endif
00837
00838 LrtsAdvanceCommunication(whenidle);
00839
00840 #if CMK_OFFLOAD_BCAST_PROCESS
00841 #if CMK_SMP_NO_COMMTHD
00842
00843 if (CmiMyRank()) doProcessBcast = 0;
00844 #endif
00845 if (doProcessBcast) processBcastQs();
00846 #endif
00847
00848 #if CMK_IMMEDIATE_MSG
00849 #if !CMK_SMP
00850 CmiHandleImmediate();
00851 #endif
00852 #if CMK_SMP && CMK_SMP_NO_COMMTHD
00853 if (CmiMyRank()==0) CmiHandleImmediate();
00854 #endif
00855 #endif
00856 }
00857
00858 extern void ConverseCommonExit();
00859
00860 static void CommunicationServer(int sleepTime) {
00861 #if CMK_SMP
00862 AdvanceCommunication(1);
00863
00864 if (commThdExit == CmiMyNodeSize()) {
00865 MACHSTATE(2, "CommunicationServer exiting {");
00866 LrtsDrainResources();
00867 MACHSTATE(2, "} CommunicationServer EXIT");
00868
00869 ConverseCommonExit();
00870
00871 #if CMK_USE_PXSHM
00872 CmiExitPxshm();
00873 #endif
00874 #if CMK_USE_XPMEM
00875 CmiExitXpmem();
00876 #endif
00877 LrtsExit();
00878 }
00879 #endif
00880 }
00881
00882 static void CommunicationServerThread(int sleepTime) {
00883 CommunicationServer(sleepTime);
00884 #if CMK_IMMEDIATE_MSG
00885 CmiHandleImmediate();
00886 #endif
00887 }
00888
00889 void ConverseExit(void) {
00890 int i;
00891 #if !CMK_SMP
00892 LrtsDrainResources();
00893 #else
00894 if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
00895 || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
00896 LrtsDrainResources();
00897 #endif
00898
00899 ConverseCommonExit();
00900
00901 #if CMK_WITH_STATS
00902 if (MSG_STATISTIC)
00903 {
00904 for(i=0; i<22; i++)
00905 {
00906 CmiPrintf("[MSG PE:%d]", CmiMyPe());
00907 if(msg_histogram[i] >0)
00908 CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
00909 }
00910 CmiPrintf("\n");
00911 }
00912 #endif
00913
00914 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00915 if (CmiMyPe() == 0) CmiPrintf("End of program\n");
00916 #endif
00917
00918 #if !CMK_SMP
00919 #if CMK_USE_PXSHM
00920 CmiExitPxshm();
00921 #endif
00922 #if CMK_USE_XPMEM
00923 CmiExitXpmem();
00924 #endif
00925 LrtsExit();
00926 #else
00927
00928
00929 CmiLock(commThdExitLock);
00930 commThdExit++;
00931 CmiUnlock(commThdExitLock);
00932 while (1) CmiYield();
00933 #endif
00934 }
00935
00936
00937 void CmiAbort(const char *message) {
00938 #if CMK_USE_PXSHM
00939 CmiExitPxshm();
00940 #endif
00941 #if CMK_USE_XPMEM
00942 CmiExitXpmem();
00943 #endif
00944 LrtsAbort(message);
00945 }
00946
00947
00948 void *CmiGetNonLocal(void) {
00949 CmiState cs = CmiGetState();
00950 void *msg = NULL;
00951
00952 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
00953
00961 if (CmiNumPes() == 1) return NULL;
00962 #endif
00963
00964 MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
00965 CmiIdleLock_checkMessage(&cs->idle);
00966
00967
00968 msg = PCQueuePop(cs->recv);
00969 #if !CMK_SMP
00970 if (!msg) {
00971 AdvanceCommunication(0);
00972 msg = PCQueuePop(cs->recv);
00973 }
00974 #else
00975
00976 #endif
00977
00978 MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
00979
00980 return msg;
00981 }
00982
00983 #if CMK_NODE_QUEUE_AVAILABLE
00984 void *CmiGetNonLocalNodeQ(void) {
00985 CmiState cs = CmiGetState();
00986 char *result = 0;
00987 CmiIdleLock_checkMessage(&cs->idle);
00988 if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
00989 MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
00990 CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
00991 result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
00992 CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
00993 MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
00994 }
00995
00996 return result;
00997 }
00998 #endif
00999
01000
01001 static CmiIdleState *CmiNotifyGetState(void) {
01002 CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
01003 s->sleepMs=0;
01004 s->nIdles=0;
01005 s->cs=CmiGetState();
01006 return s;
01007 }
01008
01009 static void CmiNotifyBeginIdle(CmiIdleState *s) {
01010 s->sleepMs=0;
01011 s->nIdles=0;
01012 }
01013
01014
01015 #define SPINS_BEFORE_SLEEP 20
01016 static void CmiNotifyStillIdle(CmiIdleState *s) {
01017 MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
01018 #if !CMK_SMP
01019 AdvanceCommunication(1);
01020 #else
01021 LrtsPostNonLocal();
01022
01023 if (_Cmi_noprocforcommthread) {
01024 s->nIdles++;
01025 if (s->nIdles>SPINS_BEFORE_SLEEP) {
01026 s->sleepMs+=2;
01027 if (s->sleepMs>10) s->sleepMs=10;
01028 }
01029
01030 if (s->sleepMs>0) {
01031 MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
01032 CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
01033 MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
01034 }
01035 }
01036 #endif
01037
01038 MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
01039 }
01040
01041
01042 void CmiNotifyIdle(void) {
01043 AdvanceCommunication(1);
01044 CmiYield();
01045 }
01046
01047
01048 static char *CopyMsg(char *msg, int len) {
01049 char *copy = (char *)CmiAlloc(len);
01050 #if CMK_ERROR_CHECKING
01051 if (!copy) {
01052 CmiAbort("Error: out of memory in machine layer\n");
01053 }
01054 #endif
01055 memcpy(copy, msg, len);
01056 return copy;
01057 }
01058
01059
01060