00001
00009 #include "ComlibManager.h"
00010 #include "comlib.h"
00011 #include "ck.h"
00012 #include "envelope.h"
00013
00014
00015
00016 #undef ComlibManagerPrintf
00017
00018 #define ComlibManagerPrintf ComlibPrintf
00019
00020 #define getmax(a,b) ((a)>(b)?(a):(b))
00021
00022 CkpvExtern(int, RecvdummyHandle);
00023
00024 CkpvDeclare(CkGroupID, cmgrID);
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 CkpvDeclare(int, RecvmsgHandle);
00035
00036 void recv_array_msg(void *msg){
00037
00038
00039
00040 if(msg == NULL)
00041 return;
00042
00043 register envelope* env = (envelope *)msg;
00044 env->setUsed(0);
00045 env->getsetArrayHops()=1;
00046 CkUnpackMessage(&env);
00047
00048 int srcPe = env->getSrcPe();
00049 int sid = ((CmiMsgHeaderExt *) env)->stratid;
00050
00051
00052
00053 RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
00054
00055 CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
00056
00057 a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
00058
00059
00060 return;
00061 }
00062
00063
00064
00069 static void periodicDebugPrintStatus(void* ptr, double currWallTime);
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 ComlibManager::ComlibManager(){
00091 init();
00092 }
00093
00094 void ComlibManager::init(){
00095
00096
00097
00098
00099 if(CkNumPes() == 1 ){
00100 ComlibPrintf("Doing nothing in ComlibManager::init() because we are running on 1 pe.\n");
00101 } else {
00102
00103 if (CkMyRank() == 0) {
00104 PUPable_reg(CharmMessageHolder);
00105 }
00106
00107 numStatsReceived = 0;
00108 curComlibController = 0;
00109 clibIteration = 0;
00110
00111 CkpvInitialize(comRectHashType *, com_rect_ptr);
00112 CkpvAccess(com_rect_ptr)= new comRectHashType;
00113
00114 CkpvInitialize(int, RecvmsgHandle);
00115 CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
00116
00117 bcast_pelist = new int [CkNumPes()];
00118 _MEMCHECK(bcast_pelist);
00119 for(int brcount = 0; brcount < CkNumPes(); brcount++)
00120 bcast_pelist[brcount] = brcount;
00121
00122 section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
00123
00124 CkpvInitialize(CkGroupID, cmgrID);
00125 CkpvAccess(cmgrID) = thisgroup;
00126
00127 dummyArrayIndex.nInts = 0;
00128
00129 CkAssert(CkpvInitialized(conv_com_object));
00130 converseManager = &CkpvAccess(conv_com_object);
00131
00132 setupComplete = 0;
00133
00134 CkpvInitialize(int, migrationDoneHandlerID);
00135 CkpvAccess(migrationDoneHandlerID) =
00136 CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
00137
00138 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
00139 cgproxy[curComlibController].barrier();
00140 }
00141 }
00142
00143
00144
00145 void ComlibManager::barrier(){
00146 static int bcount = 0;
00147 ComlibPrintf("barrier %d\n", bcount);
00148 if(CkMyPe() == 0) {
00149 bcount ++;
00150 if(bcount == CkNumPes()){
00151 bcount = 0;
00152 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
00153 cgproxy.resumeFromSetupBarrier();
00154 }
00155 }
00156 }
00157
00158
00159
00172 void ComlibManager::resumeFromSetupBarrier(){
00173 ComlibPrintf("[%d] resumeFromSetupBarrier Charm group ComlibManager setup finished\n", CkMyPe());
00174
00175 setupComplete = 1;
00176 ComlibDoneCreating();
00177 ComlibPrintf("[%d] resumeFromSetupBarrier calling ComlibDoneCreating to tell converse layer strategies to set themselves up\n", CkMyPe());
00178 sendBufferedMessagesAllStrategies();
00179
00180 }
00181
00182
00183
00184
00185
00186
00187 bool ComlibManager::shouldBufferMessagesNow(int instid){
00188 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00189 return (!setupComplete) || myEntry->getErrorMode() == ERROR_MODE || myEntry->getErrorMode() == CONFIRM_MODE || myEntry->bufferOutgoing;
00190 }
00191
00192
00193
00194
00195
00196 void ComlibManager::sendBufferedMessagesAllStrategies(){
00197 int nstrats = converseManager->getNumStrats();
00198 for(int i=0;i<nstrats;i++){
00199 sendBufferedMessages(i+1);
00200 }
00201 }
00202
00203
00204
00205
00206
00207
00208 void ComlibManager::sendBufferedMessages(int instid, int step){
00209 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00210
00211 if(shouldBufferMessagesNow(instid)){
00212 ComlibPrintf("[%d] sendBufferedMessages is not flushing buffered messages for strategy %d because shouldBufferMessagesNow()==true step %d\n", CkMyPe(), instid, step);
00213 } else if(delayMessageSendBuffer[instid].size() == 0){
00214 ComlibPrintf("[%d] sendBufferedMessages: no bufferedmessages to send for strategy %d step %d\n", CkMyPe(), instid, step);
00215 } else{
00216 ComlibPrintf("[%d] sendBufferedMessages Sending %d buffered messages for instid=%d step %d\n", CkMyPe(), delayMessageSendBuffer[instid].size(), instid, step);
00217
00218 for (std::set<CharmMessageHolder*>::iterator iter = delayMessageSendBuffer[instid].begin(); iter != delayMessageSendBuffer[instid].end(); ++iter) {
00219 CharmMessageHolder* cmsg = *iter;
00220
00221 switch(cmsg->type){
00222
00223 case CMH_ARRAYSEND:
00224 CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
00225 CkpvAccess(conv_com_object).doneInserting(instid);
00226 break;
00227
00228 case CMH_GROUPSEND:
00229 CkAbort("CMH_GROUPSEND unimplemented");
00230 break;
00231
00232 case CMH_ARRAYBROADCAST:
00233 case CMH_ARRAYSECTIONSEND:
00234 case CMH_GROUPBROADCAST:
00235
00236 cmsg->sec_id = cmsg->copy_of_sec_id;
00237 CkpvAccess(conv_com_object).insertMessage(cmsg, instid);
00238 CkpvAccess(conv_com_object).doneInserting(instid);
00239 break;
00240
00241 default:
00242 CkAbort("Unknown cmsg->type was found in buffer of delayed messages\n");
00243 }
00244
00245 }
00246
00247 delayMessageSendBuffer[instid].clear();
00248 }
00249
00250 }
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00265 void ComlibManager::beginIteration(int instid, int iteration){
00266 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00267
00268 ComlibManagerPrintf("[%d] beginIteration iter=%d lastKnownIteration=%d %s %s %s\n", CkMyPe(), iteration, myEntry->lastKnownIteration, myEntry->errorModeString(), myEntry->errorModeServerString(), myEntry->discoveryModeString() );
00269
00270 CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
00271 if(CkMyPe()==0)
00272 CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
00273
00274 if(iteration > myEntry->lastKnownIteration){
00275 ComlibManagerPrintf("[%d] beginIteration Starting Next Iteration ( # %d )\n", CkMyPe(), iteration);
00276
00277 myEntry->lastKnownIteration = iteration;
00278 myEntry->nBeginItr = 1;
00279 myEntry->nEndItr = 0;
00280 myEntry->nProcSync = 0;
00281 myEntry->totalEndCounted = 0;
00282 myEntry->nEndSaved = 0;
00283
00284 } else if(iteration == myEntry->lastKnownIteration){
00285 ComlibManagerPrintf("[%d] beginIteration continuing iteration # %d\n", CkMyPe(), iteration);
00286 myEntry->nBeginItr++;
00287 } else {
00288 CkPrintf("[%d] ERROR: ComlibManager::beginIteration iteration=%d < myEntry->lastKnownIteration=%d", iteration, myEntry->lastKnownIteration);
00289 CkAbort("[%d] ERROR: ComlibManager::beginIteration iteration < myEntry->lastKnownIteration");
00290 }
00291
00292
00293
00294
00295
00296 if (myEntry->nBeginItr > myEntry->numElements) {
00297 ComlibManagerPrintf("[%d] beginIteration BUFFERING OUTGOING because nBeginItr=%d > numElements=%d\n",CkMyPe(), myEntry->nBeginItr, myEntry->numElements);
00298 myEntry->bufferOutgoing = 1;
00299 }
00300
00301 }
00302
00303
00314 void ComlibManager::endIteration(int instid, int step){
00315 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00316
00317
00318
00319
00320 CkAssert(myEntry->nEndItr <= myEntry->nBeginItr);
00321 CkAssert(step == myEntry->lastKnownIteration);
00322
00323 CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
00324 if(CkMyPe()==0)
00325 CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_MODE_SERVER);
00326
00327 myEntry->nEndItr++;
00328
00329 ComlibManagerPrintf("[%d] endIteration called\n",CkMyPe());
00330
00331
00332 if (myEntry->bufferOutgoing) {
00333
00334 CkAssert(delayMessageSendBuffer[instid].size() > 0);
00335 CProxy_ComlibManager myProxy(thisgroup);
00336 myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
00337 }
00338 else if(myEntry->nEndItr == myEntry->numElements) {
00339
00340 CkAssert(converseManager->isReady(instid));
00341 converseManager->doneInserting(instid);
00342 }
00343
00344 }
00345
00346
00354 void ComlibManager::bracketedStartErrorRecoveryProcess(int instid, int step){
00355 CProxy_ComlibManager myProxy(thisgroup);
00356 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00357 CkAssert(step >= myEntry->lastKnownIteration);
00358
00359
00360 if(converseManager->isReady(instid)){
00361 ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess(instid=%d step=%d) %s %s %s\n", CkMyPe(), instid, step, myEntry->errorModeString(), myEntry->errorModeServerString(), myEntry->discoveryModeString() );
00362
00363 CkAssert(myEntry->strategy != NULL);
00364 CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
00365
00366 if (!myEntry->strategy->isBracketed()) {
00367 CkPrintf("[%d] endIteration called unecessarily for a non-bracketed strategy\n", CkMyPe());
00368 return;
00369 }
00370
00371 if (myEntry->getErrorMode() == NORMAL_MODE) {
00372 ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess()\n", CkMyPe());
00373 myEntry->nEndSaved = myEntry->nEndItr;
00374 myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
00375 myEntry->setErrorMode(ERROR_MODE);
00376 bracketedStartDiscovery(instid);
00377 } else {
00378
00379 int update = myEntry->nEndItr - myEntry->nEndSaved;
00380 if (update > 0) {
00381
00382 CProxy_ComlibManager myProxy(thisgroup);
00383 myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
00384 myEntry->nEndSaved = myEntry->nEndItr;
00385 }
00386
00387 }
00388 } else {
00389 ComlibManagerPrintf("[%d] bracketedStartErrorRecoveryProcess() REENQUEUE\n", CkMyPe() );
00390
00391 myProxy[CkMyPe()].bracketedStartErrorRecoveryProcess(instid, step);
00392 }
00393 }
00394
00395
00397 void ComlibManager::bracketedErrorDetected(int instid, int step) {
00398 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00399
00400 bracketedCatchUpPE(instid,step);
00401 CkAssert(step == myEntry->lastKnownIteration);
00402 CkAssert(myEntry->getErrorMode() == NORMAL_MODE || myEntry->getErrorMode() == ERROR_MODE);
00403
00404 ComlibManagerPrintf("[%d] bracketedErrorDetected()\n", CkMyPe());
00405
00406 if (myEntry->getErrorMode() == NORMAL_MODE) {
00407
00408 myEntry->nEndSaved = myEntry->nEndItr;
00409 CProxy_ComlibManager myProxy(thisgroup);
00410 myProxy[0].bracketedReceiveCount(instid, CkMyPe(), myEntry->nEndSaved, 1, step);
00411 bracketedStartDiscovery(instid);
00412 myEntry->setErrorMode(ERROR_MODE);
00413
00414 } else {
00415
00416 int update = myEntry->nEndItr - myEntry->nEndSaved;
00417 ComlibManagerPrintf("bracketedErrorDetected update=%d\n", update);
00418 if (update > 0) {
00419 ComlibManagerPrintf("bracketedErrorDetected sending update to bracketedReceiveCount\n");
00420 CProxy_ComlibManager myProxy(thisgroup);
00421 myProxy[0].bracketedReceiveCount(instid, CkMyPe(), update, 0, step);
00422 myEntry->nEndSaved = myEntry->nEndItr;
00423 }
00424 }
00425
00426 }
00427
00430 void ComlibManager::bracketedConfirmCount(int instid, int step) {
00431 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00432 CkAssert(myEntry->getErrorMode() == ERROR_MODE);
00433 myEntry->setErrorMode(CONFIRM_MODE);
00434 CProxy_ComlibManager myProxy(thisgroup);
00435 ComlibManagerPrintf("[%d] bracketedConfirmCount\n", CkMyPe());
00436 myProxy[0].bracketedCountConfirmed(instid, myEntry->nEndSaved, step);
00437 }
00438
00442 void ComlibManager::bracketedCountConfirmed(int instid, int count, int step) {
00443 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00444 CkAssert(CkMyPe() == 0);
00445
00446 bracketedCatchUpPE(instid, step);
00447 CkAssert(myEntry->getErrorModeServer() == CONFIRM_MODE_SERVER);
00448 CkAssert(step == myEntry->lastKnownIteration);
00449
00450 myEntry->total += count;
00451
00452 ComlibManagerPrintf("[%d] bracketedCountConfirmed\n", CkMyPe());
00453
00454 if (++myEntry->peConfirmCounter == CkNumPes()) {
00455 myEntry->peConfirmCounter = 0;
00456
00457 CkAssert(myEntry->total == myEntry->totalEndCounted);
00458
00459 CProxy_ComlibManager(thisgroup).bracketedReceiveNewCount(instid, step);
00460 myEntry->setErrorModeServer(ERROR_FIXED_MODE_SERVER);
00461
00462 myEntry->total = 0;
00463 }
00464
00465 }
00466
00467
00468
00474 void ComlibManager::bracketedCatchUpPE(int instid, int step){
00475 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00476 CkAssert(step >= myEntry->lastKnownIteration);
00477 if(step > myEntry->lastKnownIteration){
00478
00479 myEntry->total = 0;
00480 myEntry->lastKnownIteration = step;
00481 myEntry->nBeginItr = 0;
00482 myEntry->nEndItr = 0;
00483 myEntry->nProcSync = 0;
00484 myEntry->totalEndCounted = 0;
00485 myEntry->nEndSaved = 0;
00486 myEntry->setErrorMode(NORMAL_MODE);
00487 myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
00488
00489 if(CkMyPe()==0){
00490 CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
00491 myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
00492 }
00493
00494 }
00495 }
00496
00506 void ComlibManager::bracketedReceiveCount(int instid, int pe, int count, int isFirst, int step) {
00507 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00508 ComlibPrintf("[%d] bracketedReceiveCount begins step=%d lastKnownIteration=%d totalEndCounted=%d count=%d\n", CkMyPe(), step, myEntry->lastKnownIteration, myEntry->totalEndCounted, count);
00509 CkAssert(step >= myEntry->lastKnownIteration);
00510
00511 CkAssert(CkMyPe() == 0);
00512 CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER || myEntry->getErrorModeServer() == ERROR_MODE_SERVER );
00513
00514
00515
00516 bracketedCatchUpPE(instid, step);
00517
00518
00519 CkAssert(step == myEntry->lastKnownIteration);
00520
00521 myEntry->totalEndCounted += count;
00522
00523
00524
00525 myEntry->nProcSync += isFirst;
00526 CkAssert(myEntry->nProcSync <= CkNumPes());
00527
00528 if (myEntry->getErrorModeServer() == NORMAL_MODE_SERVER) {
00529
00530 CkAssert(myEntry->nProcSync == 1);
00531 CProxy_ComlibManager(thisgroup).bracketedErrorDetected(instid, step);
00532 myEntry->setErrorModeServer(ERROR_MODE_SERVER);
00533 ComlibManagerPrintf("[%d] bracketedReceiveCount first time\n", CkMyPe());
00534 }
00535
00536
00537
00538 CharmStrategy* s = dynamic_cast<CharmStrategy*>(myEntry->strategy);
00539 ComlibArrayInfo ainfo = s->ainfo;
00540 int totalsrc = ainfo.getTotalSrc() ;
00541
00542 if(myEntry->nProcSync == CkNumPes() && myEntry->totalEndCounted == totalsrc) {
00543
00544 myEntry->setErrorModeServer(CONFIRM_MODE_SERVER);
00545 ComlibManagerPrintf("[%d] bracketedReceiveCount errorModeServer is now CONFIRM_MODE calling bracketedConfirmCount totalsrc=%d\n", CkMyPe(), (int)totalsrc);
00546 CProxy_ComlibManager(thisgroup).bracketedConfirmCount(instid, step);
00547
00548 }
00549
00550 }
00551
00552
00557 void ComlibManager::bracketedReceiveNewCount(int instid, int step) {
00558 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00559 CkAssert(myEntry->getErrorMode() == CONFIRM_MODE);
00560
00561 myEntry->setErrorMode(ERROR_FIXED_MODE);
00562
00563 myEntry->nEndItr -= myEntry->nEndSaved;
00564 myEntry->nBeginItr -= myEntry->nEndSaved;
00565
00566 myEntry->nEndSaved = 0;
00567
00568 bracketedFinalBarrier(instid, step);
00569 }
00570
00571
00572
00573
00580 void ComlibManager::bracketedReceiveNewPeList(int instid, int step, int *count) {
00581 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00582 CkAssert(myEntry->getDiscoveryMode() == STARTED_DISCOVERY_MODE);
00583 myEntry->setDiscoveryMode(FINISHED_DISCOVERY_MODE);
00584
00585 myEntry->strategy->bracketedUpdatePeKnowledge(count);
00586
00587 ComlibManagerPrintf("[%d] bracketedReceiveNewPeList Updating numElements\n", CkMyPe());
00588 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
00589 CkAssert((unsigned long)myInfo > 0x1000);
00590 myEntry->numElements = myInfo->getLocalSrc();
00591
00592 ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n",CkMyPe(), instid, delayMessageSendBuffer[instid].size() );
00593 ComlibManagerPrintf("[%d] delayMessageSendBuffer[%d].size()=%d\n", CkMyPe(), instid, delayMessageSendBuffer[instid].size());
00594
00595 bracketedFinalBarrier(instid, step);
00596 }
00597
00598
00606 void ComlibManager::bracketedFinalBarrier(int instid, int step) {
00607 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00608 ComlibManagerPrintf("[%d] ComlibManager::bracketedFinalBarrier %s %s %s\n", CkMyPe(), myEntry->errorModeString(), myEntry->errorModeServerString(), myEntry->discoveryModeString() );
00609
00610
00611 if (myEntry->getDiscoveryMode() == FINISHED_DISCOVERY_MODE && myEntry->getErrorMode() == ERROR_FIXED_MODE) {
00612 myEntry->setDiscoveryMode(NORMAL_DISCOVERY_MODE);
00613 myEntry->setErrorMode(NORMAL_MODE);
00614
00615
00616 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
00617 myInfo->useNewSourceAndDestinations();
00618
00619 CProxy_ComlibManager myProxy(thisgroup);
00620 myProxy[0].bracketedReleaseCount(instid, step);
00621 }
00622 }
00623
00624
00628 void ComlibManager::bracketedReleaseCount(int instid, int step) {
00629
00630 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00631 ComlibPrintf("[%d] ComlibManager::bracketedReleaseCount myEntry->numBufferReleaseReady was %d\n", CkMyPe(), myEntry->numBufferReleaseReady);
00632
00633 CkAssert(CkMyPe() == 0);
00634 CkAssert(myEntry->getErrorModeServer() == ERROR_FIXED_MODE_SERVER);
00635
00636 myEntry->numBufferReleaseReady++;
00637 if(myEntry->numBufferReleaseReady == CkNumPes()) {
00638 myEntry->setErrorModeServer(NORMAL_MODE_SERVER);
00639 CProxy_ComlibManager(thisgroup).bracketedReleaseBufferedMessages(instid, step);
00640 myEntry->numBufferReleaseReady = 0;
00641 }
00642 }
00643
00647 void ComlibManager::bracketedReleaseBufferedMessages(int instid, int step) {
00648 ComlibManagerPrintf("[%d] ComlibManager::bracketedReleaseBufferedMessages step=%d\n", CkMyPe(), step);
00649 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00650
00651 CkAssert(myEntry->getErrorModeServer() == NORMAL_MODE_SERVER);
00652 CkAssert(myEntry->getErrorMode() == NORMAL_MODE);
00653 CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
00654
00655 myEntry->bufferOutgoing = 0;
00656 sendBufferedMessages(instid, step);
00657
00658 converseManager->doneInserting(instid);
00659 }
00660
00661
00662
00663
00664
00665
00681 void ComlibManager::bracketedStartDiscovery(int instid) {
00682 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00683 CkAssert(myEntry->getDiscoveryMode() == NORMAL_DISCOVERY_MODE);
00684 myEntry->setDiscoveryMode(STARTED_DISCOVERY_MODE);
00685 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
00686 const CProxy_ComlibManager myProxy(thisgroup);
00687
00688 ComlibManagerPrintf("[%d] bracketedStartDiscovery\n", CkMyPe());
00689
00690 int countSrc = 0;
00691 int countDest = 0;
00692
00693 if (myInfo->isSourceArray()) {
00694
00695 const CkVec<CkArrayIndex> & srcElements = myInfo->getSourceElements();
00696 const int nelem = srcElements.size();
00697 const CkArrayID aid = myInfo->getSourceArrayID();
00698 const CkArray *a = (CkArray*)_localBranch(aid);
00699
00700 for (int i=0; i<nelem; ++i) {
00701 int pe = a->lastKnown(srcElements[i]);
00702 if (pe == CkMyPe()) {
00703 countSrc++;
00704 myInfo->addNewLocalSource(srcElements[i]);
00705 }
00706 else {
00707 myProxy[pe].bracketedDiscover(instid, aid, srcElements[i], true);
00708 }
00709 }
00710
00711 }
00712
00713 if (myInfo->isDestinationArray()) {
00714
00715
00716 const CkVec<CkArrayIndex> & destElements = myInfo->getDestinationElements();
00717 const int nelem = destElements.size();
00718 const CkArrayID aid = myInfo->getDestinationArrayID();
00719 const CkArray *a = (CkArray*)_localBranch(aid);
00720
00721 for (int i=0; i<nelem; ++i) {
00722 int pe = a->lastKnown(destElements[i]);
00723 if (pe == CkMyPe()) {
00724 countDest++;
00725 myInfo->addNewLocalDestination(destElements[i]);
00726 }
00727 else {
00728 ComlibPrintf("[%d] destination element %d is no longer local\n", CkMyPe(), (int)destElements[i].data()[0]);
00729 myProxy[pe].bracketedDiscover(instid, aid, destElements[i], false);
00730 }
00731 }
00732 }
00733
00734
00735
00736 if (countSrc > 0 || countDest > 0) {
00737 myProxy[0].bracketedContributeDiscovery(instid, CkMyPe(), countSrc, countDest, myEntry->lastKnownIteration);
00738 }
00739
00740 }
00741
00742
00743
00755 void ComlibManager::bracketedDiscover(int instid, CkArrayID aid, CkArrayIndex &idx, int isSrc) {
00756 ComlibManagerPrintf("[%d] bracketedDiscover\n", CkMyPe());
00757 CkArray *a = (CkArray *)_localBranch(aid);
00758 int pe = a->lastKnown(idx);
00759 CProxy_ComlibManager myProxy(thisgroup);
00760 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00761
00762 if (pe == CkMyPe()) {
00763
00764
00765
00766 myProxy[0].bracketedContributeDiscovery(instid, pe, isSrc?1:0, isSrc?0:1, myEntry->lastKnownIteration);
00767
00768
00769
00770
00771
00772 CkAssert((unsigned long)myEntry->strategy > 0x1000);
00773 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
00774 CkAssert((unsigned long)myInfo > 0x1000);
00775
00776 if (isSrc) {
00777
00778 ComlibManagerPrintf("[%d] bracketedDiscover addSource\n", CkMyPe());
00779 CkAssert((unsigned long)myInfo > 0x1000);
00780 myInfo->addNewLocalSource(idx);
00781
00782 ComlibManagerPrintf("[%d] bracketedDiscover updating numElements\n", CkMyPe());
00783 myEntry->numElements = myInfo->getLocalSrc();
00784 }
00785 else {
00786
00787 ComlibManagerPrintf("[%d] bracketedDiscover addNewDestination\n", CkMyPe());
00788 myInfo->addNewLocalDestination(idx);
00789 }
00790 } else {
00791 ComlibManagerPrintf("Keep On Forwarding*********************\n");
00792
00793 myProxy[pe].bracketedDiscover(instid, aid, idx, isSrc);
00794 }
00795 }
00796
00797
00798
00804 void ComlibManager::bracketedContributeDiscovery(int instid, int pe, int nsrc, int ndest, int step) {
00805 CkAssert(CkMyPe() == 0);
00806 ComlibManagerPrintf("[%d] bracketedContributeDiscovery pe=%d nsrc=%d ndest=%d step=%d\n", CkMyPe(), pe, nsrc, ndest, step);
00807 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00808 if (myEntry->peList == 0) {
00809 myEntry->peList = new int[CkNumPes()+2];
00810
00811
00812 for (int i=0; i<CkNumPes()+2; ++i) myEntry->peList[i]=0;
00813 ComlibManagerPrintf("[%d] bracketedContributeDiscovery zeroing new peList\n", CkMyPe());
00814 }
00815 myEntry->peList[CkNumPes()] += nsrc;
00816 myEntry->peList[CkNumPes()+1] += ndest;
00817
00818
00819
00820
00821
00822
00823 if (nsrc > 0) myEntry->peList[pe] |= 1;
00824 if (ndest > 0) myEntry->peList[pe] |= 2;
00825
00826 ComlibArrayInfo *myInfo = &dynamic_cast<CharmStrategy*>(myEntry->strategy)->ainfo;
00827 CkAssert((unsigned long)myInfo > 0x1000);
00828
00829
00830
00831
00832
00833 if (myEntry->peList[CkNumPes()] == myInfo->getTotalSrc() &&
00834 myEntry->peList[CkNumPes()+1] == myInfo->getTotalDest()) {
00835
00836
00837 CProxy_ComlibManager myProxy(thisgroup);
00838
00839 ComlibManagerPrintf("[%d] bracketedContributeDiscovery calling bracketedReceiveNewPeList %d/%d, %d/%d\n",
00840 CkMyPe(),
00841 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(),
00842 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
00843
00844 printPeList("bracketedContributeDiscovery peList=", myEntry->peList);
00845 myProxy.bracketedReceiveNewPeList(instid, step, myEntry->peList);
00846 delete myEntry->peList;
00847 myEntry->peList = NULL;
00848 } else {
00849 ComlibManagerPrintf("[%d] bracketedContributeDiscovery NOT calling bracketedReceiveNewPeList %d/%d, %d/%d\n",
00850 CkMyPe(),
00851 myEntry->peList[CkNumPes()], myInfo->getTotalSrc(),
00852 myEntry->peList[CkNumPes()+1], myInfo->getTotalDest() );
00853 }
00854
00855 }
00856
00857
00858 void ComlibManager::printPeList(const char* note, int *count)
00859 {
00860 char *buf = (char*)malloc(1024*64);
00861 sprintf(buf, "[%d] %s ", CkMyPe(), note);
00862 for(int i=0;i<CkNumPes();i++){
00863 switch (count[i]){
00864 case 0:
00865 sprintf(buf+strlen(buf), " %d:no ", i);
00866 break;
00867 case 1:
00868 sprintf(buf+strlen(buf), " %d:source ", i);
00869 break;
00870 case 2:
00871 sprintf(buf+strlen(buf), " %d:dest ", i);
00872 break;
00873 case 3:
00874 sprintf(buf+strlen(buf), " %d:both ", i);
00875 break;
00876 }
00877 }
00878
00879 sprintf(buf+strlen(buf), ", all source objects discovered = %d, all destination objects discovered = %d", count[CkNumPes()] , count[CkNumPes()+1] );
00880
00881 ComlibPrintf("%s\n", buf);
00882 free(buf);
00883 }
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893 extern int _charmHandlerIdx;
00894 void msg_prepareSend_noinline(CkArrayMessage *msg, int ep,CkArrayID aid);
00895
00896
00904 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg,
00905 const CkArrayIndex &idx, CkArrayID a){
00906
00907 CkAssert(pd != NULL);
00908 ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
00909 int instid = ci->getID();
00910 CkAssert(instid != 0);
00911
00912
00913 CkArray *amgr = CkArrayID::CkLocalBranch(a);
00914 int dest_proc = amgr->lastKnown(idx);
00915
00916 register envelope * env = UsrToEnv(msg);
00917 msg_prepareSend_noinline((CkArrayMessage*)msg, ep, a);
00918
00919 env->getsetArrayIndex()=idx;
00920 env->setUsed(0);
00921 ((CmiMsgHeaderExt *)env)->stratid = instid;
00922
00923 CkPackMessage(&env);
00924 CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
00925
00926 CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_ARRAYSEND);
00927
00928 if(shouldBufferMessagesNow(instid)){
00929 delayMessageSendBuffer[instid].insert(cmsg);
00930 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00931 int step = myEntry->lastKnownIteration;
00932 ComlibManagerPrintf("[%d] ComlibManager::ArraySend BUFFERED OUTGOING: now buffer contains %d messages step=%d\n",CkMyPe(), delayMessageSendBuffer[instid].size(), step);
00933 } else {
00934 ComlibPrintf("ComlibManager::ArraySend NOT BUFFERING inserting message into strategy %d\n",instid);
00935
00936 if(dest_proc == CkMyPe()){
00937
00938 amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
00939 return;
00940 } else {
00941
00942 converseManager->insertMessage(cmsg, instid);
00943 }
00944
00945 }
00946
00947 }
00948
00949
00950 #include "qd.h"
00951
00952
00953 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
00954
00955 CkAssert(pd != NULL);
00956 ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
00957 int instid = ci->getID();
00958
00959 int dest_proc = onPE;
00960
00961 ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc,
00962 UsrToEnv(msg)->getTotalsize());
00963
00964 register envelope * env = UsrToEnv(msg);
00965 if(dest_proc == CkMyPe()){
00966 _SET_USED(env, 0);
00967 CkSendMsgBranch(ep, msg, dest_proc, gid);
00968 return;
00969 }
00970
00971 ((CmiMsgHeaderExt *)env)->stratid = instid;
00972 CpvAccess(_qd)->create(1);
00973
00974 env->setMsgtype(ForBocMsg);
00975 env->setEpIdx(ep);
00976 env->setGroupNum(gid);
00977 env->setSrcPe(CkMyPe());
00978 env->setUsed(0);
00979
00980 CkPackMessage(&env);
00981 CmiSetHandler(env, _charmHandlerIdx);
00982
00983 CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc, CMH_GROUPSEND);
00984
00985 if(shouldBufferMessagesNow(instid)){
00986 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
00987 int step = myEntry->lastKnownIteration;
00988 ComlibPrintf("ComlibManager::GroupSend Buffering message for %d step=%d\n",instid, step);
00989 delayMessageSendBuffer[instid].insert(cmsg);
00990 } else {
00991 ComlibPrintf("ComlibManager::GroupSend inserting message into strategy %d\n",instid);
00992 converseManager->insertMessage(cmsg, instid);
00993 }
00994
00995
00996 }
00997
00998 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
00999 ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
01000
01001 CkAssert(pd != NULL);
01002 ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
01003 int instid = ci->getID();
01004
01005 register envelope * env = UsrToEnv(m);
01006 msg_prepareSend_noinline((CkArrayMessage*)m, ep, a);
01007
01008 env->getsetArrayIndex()= dummyArrayIndex;
01009 ((CmiMsgHeaderExt *)env)->stratid = instid;
01010
01011 CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
01012
01013 CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_BROADCAST, CMH_ARRAYBROADCAST);
01014 cmsg->npes = 0;
01015 cmsg->sec_id = NULL;
01016 cmsg->array_id = a;
01017
01018 multicast(cmsg, instid);
01019 }
01020
01021 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m,
01022 int nsid, CkSectionID *s, int opts) {
01023
01024 CkAssert(nsid == 1);
01025 CkAssert(pd != NULL);
01026 ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
01027 int instid = ci->getID();
01028
01029 ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
01030
01031
01032
01033 CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m, IS_SECTION_MULTICAST, CMH_ARRAYSECTIONSEND);
01034 cmsg->npes = 0;
01035 cmsg->sec_id = s;
01036 cmsg->array_id = s->_cookie.get_aid();
01037
01038
01039 msg_prepareSend_noinline((CkArrayMessage*)m, ep, s->_cookie.get_aid());
01040
01041 register envelope * env = UsrToEnv(m);
01042 env->getsetArrayIndex()= dummyArrayIndex;
01043 ((CmiMsgHeaderExt *)env)->stratid = instid;
01044
01045 CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
01046
01047 env->setUsed(0);
01048 CkPackMessage(&env);
01049
01050
01051 CkSectionInfo minfo;
01052 minfo.get_type() = COMLIB_MULTICAST_MESSAGE;
01053 minfo.info.sInfo.cInfo.instId = ci->getID();
01054
01055 minfo.info.sInfo.cInfo.id = 0;
01056 minfo.get_pe() = CkMyPe();
01057 ((CkMcastBaseMsg *)env)->_cookie = minfo;
01058
01059 multicast(cmsg, instid);
01060 }
01061
01062 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
01063
01064 CkAssert(pd != NULL);
01065 ComlibDelegateData *ci = static_cast<ComlibDelegateData *>(pd);
01066 int instid = ci->getID();
01067 CkAssert(instid!=0);
01068
01069 register envelope * env = UsrToEnv(m);
01070
01071 CpvAccess(_qd)->create(1);
01072
01073 env->setMsgtype(ForBocMsg);
01074 env->setEpIdx(ep);
01075 env->setGroupNum(g);
01076 env->setSrcPe(CkMyPe());
01077 env->setUsed(0);
01078 ((CmiMsgHeaderExt *)env)->stratid = instid;
01079
01080 CkPackMessage(&env);
01081 CmiSetHandler(env, _charmHandlerIdx);
01082
01083
01084 CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST, CMH_GROUPBROADCAST);
01085
01086 cmsg->npes = 0;
01087
01088
01089 multicast(cmsg, instid);
01090 }
01091
01092
01097 void ComlibManager::multicast(CharmMessageHolder *cmsg, int instid) {
01098 CkAssert(instid != 0);
01099 register envelope * env = UsrToEnv(cmsg->getCharmMessage());
01100
01101 #if DEBUG
01102 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
01103 ComlibPrintf("[%d] multicast setupComplete=%d %s %s\n", CkMyPe(), setupComplete, myEntry->getErrorModeString(), myEntry->getErrorModeServerString());
01104 #endif
01105
01106 env->setUsed(0);
01107 CkPackMessage(&env);
01108
01109 if(shouldBufferMessagesNow(instid)){
01110 cmsg->saveCopyOf_sec_id();
01111 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
01112 int step = myEntry->lastKnownIteration;
01113 ComlibPrintf("[%d] ComlibManager::multicast Buffering message for %d lastKnownIteration=%d\n", CkMyPe(),instid, step);
01114 delayMessageSendBuffer[instid].insert(cmsg);
01115 } else {
01116 converseManager->insertMessage(cmsg, instid);
01117 }
01118
01119 }
01120
01121
01122 void ComlibManager::printDiagnostics(int instid){
01123 CkPrintf("[%d] delayMessageSendBuffer.size()=%d\n", CkMyPe(), delayMessageSendBuffer[instid].size());
01124 }
01125
01126
01127 void ComlibManager::printDiagnostics(){
01128
01129
01130 std::map<ComlibInstanceHandle, std::set<CharmMessageHolder*> >::iterator iter;
01131 for(iter = delayMessageSendBuffer.begin(); iter != delayMessageSendBuffer.end(); ++iter){
01132 int instid = iter->first;
01133 int size = iter->second.size();
01134
01135 if(size>0 || true){
01136 CkPrintf("[%d] delayMessageSendBuffer[instid=%d] contains %d messages\n", CkMyPe(), instid, size);
01137
01138 if(! shouldBufferMessagesNow(instid)){
01139 CkPrintf("[%d] printDiagnostics: No messages should be still in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
01140 } else {
01141 CkPrintf("[%d] printDiagnostics: Messages still ought to be delayed in delayMessageSendBuffer[instid=%d]\n", CkMyPe(), instid);
01142 }
01143
01144 StrategyTableEntry *myEntry = converseManager->getStrategyTable(instid);
01145 CkPrintf("[%d] printDiagnostics[instid=%d] setupComplete=%d %s %s %s bufferOutgoing=%d\n", (int)CkMyPe(), (int)instid, (int)setupComplete, myEntry->errorModeString(), myEntry->errorModeServerString(), myEntry->discoveryModeString(), (int)myEntry->bufferOutgoing);
01146
01147
01148
01149 }
01150 }
01151
01152 CkpvAccess(conv_com_object).printDiagnostics();
01153
01154 }
01155
01156
01157
01158 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
01159
01160 data->ref();
01161 return data;
01162
01163 }
01164
01165 CkDelegateData* ComlibManager::DelegatePointerPup(PUP::er &p,
01166 CkDelegateData *pd) {
01167 if (!p.isUnpacking() && pd == NULL) CkAbort("Tryed to pup a null ComlibDelegateData!\n");
01168
01169 ComlibDelegateData *inst = static_cast<ComlibDelegateData *>(pd);
01170
01171
01172 if (p.isUnpacking()) inst = new ComlibDelegateData((CkMigrateMessage*)0);
01173 inst->pup(p);
01174
01175
01176
01177
01178
01179
01180
01181 return inst;
01182 }
01183
01184
01185
01186
01187 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe) {
01188 }
01189
01190
01192 void ComlibManager::AtSync() {
01193
01194 }
01195
01196
01197
01198
01199
01200
01201
01202
01206 void ComlibAssociateProxy(ComlibInstanceHandle cinst, CProxy &proxy) {
01207 if(CkNumPes() > 1){
01208 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
01209 proxy.ckDelegate(cgproxy.ckLocalBranch(), new ComlibDelegateData(cinst));
01210 } else {
01211 ComlibPrintf("Doing nothing in ComlibAssociateProxy because we have only 1 pe\n");
01212 }
01213 }
01214
01218 void ComlibAssociateProxy(Strategy *strat, CProxy &proxy) {
01219 ComlibAssociateProxy(strat->getHandle(), proxy);
01220 }
01221
01222
01223
01224
01225
01226
01230 ComlibInstanceHandle ComlibRegister(Strategy *strat) {
01231 return strat->getHandle();
01232 }
01233
01237 void ComlibBegin(CProxy &proxy, int iteration) {
01238 if(CkNumPes() > 1){
01239 ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
01240 if(cinst==NULL)
01241 return;
01242 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
01243 (cgproxy.ckLocalBranch())->beginIteration(cinst->getID(), iteration);
01244 } else {
01245 ComlibPrintf("Doing nothing in ComlibBegin because we have only 1 pe");
01246 }
01247 }
01248
01252 void ComlibEnd(CProxy &proxy, int iteration) {
01253 if(CkNumPes() > 1){
01254 ComlibDelegateData *cinst = static_cast<ComlibDelegateData *>(proxy.ckDelegatedPtr());
01255 if(cinst==NULL)
01256 return;
01257 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
01258 (cgproxy.ckLocalBranch())->endIteration(cinst->getID(), iteration);
01259 } else {
01260 ComlibPrintf("Doing nothing in ComlibEnd because we have only 1 pe");
01261 }
01262 }
01263
01264 char *routerName;
01265 int sfactor=0;
01266
01267
01270 class ComlibManagerMain : public CBase_ComlibManagerMain {
01271 public:
01272 ComlibManagerMain(CkArgMsg *msg) {
01273
01274 if(CkMyPe() == 0 && msg != NULL)
01275 CmiGetArgString(msg->argv, "+strategy", &routerName);
01276
01277 if(CkMyPe() == 0 && msg != NULL)
01278 CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
01279
01280 CProxy_ComlibManager::ckNew();
01281 }
01282 };
01283
01284
01285
01286
01287
01288
01289
01290 ComlibDelegateData::ComlibDelegateData(int instid) : CkDelegateData(), _instid(instid) {
01291 ComlibManagerPrintf("[%d] Constructing ComlibDelegateData\n", CkMyPe());
01292 ref();
01293 }
01294
01295
01296 void ComlibInitSectionID(CkSectionID &sid){
01297
01298 sid._cookie.get_type() = COMLIB_MULTICAST_MESSAGE;
01299 sid._cookie.get_pe() = CkMyPe();
01300
01301 sid._cookie.info.sInfo.cInfo.id = 0;
01302 sid.npes = 0;
01303 sid.pelist = NULL;
01304 }
01305
01306
01309 void _registercommlib(void)
01310 {
01311 static int _done = 0;
01312 if(_done)
01313 return;
01314 _done = 1;
01315 _registercomlib();
01316 }
01317
01318
01319 void ComlibNotifyMigrationDoneHandler(void *msg) {
01320 CmiFree(msg);
01321 CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
01322 ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
01323 if(cmgr_ptr)
01324 cmgr_ptr->AtSync();
01325 }
01326
01327
01328
01329
01330
01331
01332 static void periodicDebugPrintStatus(void* ptr, double currWallTime){
01333 CkPrintf("[%d] periodicDebugPrintStatus()\n", CkMyPe());
01334
01335 ComlibManager *mgr = (ComlibManager*)ptr;
01336 mgr->printDiagnostics();
01337
01338 CcdCallFnAfterOnPE((CcdVoidFn)periodicDebugPrintStatus, ptr, 4000, CkMyPe());
01339
01340 }
01341
01342
01343
01344
01345
01346 #include "comlib.def.h"
01347