00001
00002 #include "pose.h"
00003 #include "srtable.h"
00004 #include "gvt.def.h"
00005 #include "qd.h"
00006
00007 CkGroupID ThePVT;
00008 CkGroupID TheGVT;
00009 CpvExtern(int, stateRecovery);
00010 CpvExtern(eventID, theEventID);
00011
00012 static void staticDoneLB(void *data)
00013 {
00014 ((PVT*)data)->doneLB();
00015 }
00016
00018 PVT::PVT()
00019 {
00020 #ifdef VERBOSE_DEBUG
00021 CkPrintf("[%d] constructing PVT\n",CkMyPe());
00022 #endif
00023 CpvInitialize(int, stateRecovery);
00024 CpvAccess(stateRecovery) = 0;
00025 CpvInitialize(eventID, theEventID);
00026 CpvAccess(theEventID)=eventID();
00027
00028
00029 optGVT = POSE_UnsetTS; conGVT = POSE_UnsetTS;
00030 rdone=0;
00031 SRs=NULL;
00032 #ifdef POSE_COMM_ON
00033
00034 #endif
00035 #ifndef CMK_OPTIMIZE
00036 localStats = (localStat *)CkLocalBranch(theLocalStats);
00037 if (pose_config.stats) {
00038 localStats->TimerStart(GVT_TIMER);
00039 }
00040 #endif
00041 #ifdef MEM_TEMPORAL
00042 localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00043 CkPrintf("NOTE: Temporal memory manager is ON!\n");
00044 #endif
00045 optPVT = conPVT = estGVT = POSE_UnsetTS;
00046 startPhaseActive = gvtTurn = simdone = 0;
00047 SendsAndRecvs = new SRtable();
00048 SendsAndRecvs->Initialize();
00049 specEventCount = eventCount = waitForFirst = 0;
00050 iterMin = POSE_UnsetTS;
00051 int P=CkNumPes(), N=CkMyPe();
00052 reportReduceTo = -1;
00053 if ((N < P-2) && (N%2 == 1)) {
00054 reportTo = N-1;
00055 reportsExpected = reportEnd = 0;
00056 }
00057 else if (N < P-2) {
00058 reportTo = N;
00059 reportsExpected = 2;
00060 if (N == P-3)
00061 reportsExpected = 1;
00062 reportEnd = 0;
00063 if (N < (P-2)/2)
00064 reportReduceTo = P-2;
00065 else reportReduceTo = P-1;
00066 }
00067 if (N == P-2) {
00068 reportTo = N;
00069 reportEnd = 1;
00070 reportsExpected = 1 + (P-2)/4 + ((P-2)%4)/2;
00071 }
00072 else if (N == P-1) {
00073 reportTo = N;
00074 reportEnd = 1;
00075 if (P==1) reportsExpected = 1;
00076 else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
00077 }
00078
00079
00080 parCheckpointInProgress = 0;
00081 parLastCheckpointGVT = 0;
00082 parLastCheckpointTime = parStartTime = 0.0;
00083 parLBInProgress = 0;
00084 parLastLBGVT = 0;
00085
00086 #ifndef CMK_OPTIMIZE
00087 if(pose_config.stats)
00088 localStats->TimerStop();
00089 #endif
00090
00091 LBDatabase::Object()->AddMigrationDoneFn(staticDoneLB, this);
00092 }
00093
00095 void PVT::pup(PUP::er &p) {
00096 p|optPVT; p|conPVT; p|estGVT; p|repPVT;
00097 p|simdone; p|iterMin; p|waitForFirst;
00098 p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
00099 p|gvtTurn; p|specEventCount; p|eventCount;
00100 p|startPhaseActive; p|parStartTime; p|parCheckpointInProgress;
00101 p|parLastCheckpointGVT; p|parLastCheckpointTime;
00102 p|parLBInProgress; p|parLastLBGVT;
00103 p|optGVT; p|conGVT; p|rdone;
00104
00105 if (p.isUnpacking()) {
00106 parStartTime = parLastCheckpointTime;
00107 #ifndef CMK_OPTIMIZE
00108 localStats = (localStat *)CkLocalBranch(theLocalStats);
00109 #endif
00110 #ifdef MEM_TEMPORAL
00111 localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00112 #endif
00113 SendsAndRecvs = new SRtable();
00114 }
00115
00116 SendsAndRecvs->pup(p);
00117
00118 int nullFlag;
00119 if (SRs == NULL) {
00120 nullFlag = 1;
00121 } else {
00122 nullFlag = 0;
00123 }
00124 p|nullFlag;
00125 if (p.isUnpacking()) {
00126 if (nullFlag) {
00127 SRs = NULL;
00128 } else {
00129 SRs = new SRentry();
00130 SRs->pup(p);
00131 }
00132 } else {
00133 if (!nullFlag) {
00134 SRs->pup(p);
00135 }
00136 }
00137 }
00138
00139 void PVT::startPhaseExp(prioBcMsg *m) {
00140 startPhase(m);
00141 }
00142
00144 void PVT::startPhase(prioBcMsg *m)
00145 {
00146 CProxy_GVT g(TheGVT);
00147 CProxy_PVT p(ThePVT);
00148 register int i;
00149
00150 if (startPhaseActive) return;
00151 #ifndef CMK_OPTIMIZE
00152 if(pose_config.stats)
00153 localStats->TimerStart(GVT_TIMER);
00154 #endif
00155 startPhaseActive = 1;
00156 if (m->bc) {
00157 prioBcMsg *startMsg = new (8*sizeof(POSE_TimeType)) prioBcMsg;
00158 startMsg->bc = 0;
00159 *((POSE_TimeType *)CkPriorityPtr(startMsg)) = 1-POSE_TimeMax;
00160 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00161 p.startPhaseExp(startMsg);
00162 }
00163
00164 objs.Wake();
00165
00166
00167 optPVT = conPVT = POSE_UnsetTS;
00168 int end = objs.getNumSpaces();
00169 for (i=0; i<end; i++)
00170 if (objs.objs[i].isPresent()) {
00171 if (objs.objs[i].isOptimistic()) {
00172 if ((optPVT < 0) || ((objs.objs[i].getOVT() < optPVT) &&
00173 (objs.objs[i].getOVT() > POSE_UnsetTS))) {
00174 optPVT = objs.objs[i].getOVT();
00175 CkAssert(simdone>0 || ((objs.objs[i].getOVT() >= estGVT) ||
00176 (objs.objs[i].getOVT() == POSE_UnsetTS)));
00177 }
00178 }
00179 else if (objs.objs[i].isConservative()) {
00180 if ((conPVT < 0) || ((objs.objs[i].getOVT() < conPVT) &&
00181 (objs.objs[i].getOVT() > POSE_UnsetTS)))
00182 conPVT = objs.objs[i].getOVT();
00183 }
00184 CkAssert(simdone>0 || (optPVT >= estGVT)||(optPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
00185 CkAssert(simdone>0 || (conPVT >= estGVT)||(conPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
00186 }
00187
00188
00189 POSE_TimeType pvt = optPVT;
00190 if ((conPVT < pvt) && (conPVT > POSE_UnsetTS)) pvt = conPVT;
00191 if ((iterMin < pvt) && (iterMin > POSE_UnsetTS)) pvt = iterMin;
00192 if (waitForFirst) {
00193 waitForFirst = 0;
00194 if (pvt == POSE_UnsetTS)
00195 SendsAndRecvs->Restructure(estGVT, estGVT, POSE_UnsetTS);
00196 else
00197 SendsAndRecvs->Restructure(estGVT, pvt, POSE_UnsetTS);
00198 }
00199
00200
00201
00202
00203
00204 POSE_TimeType xt;
00205 if (pvt == POSE_UnsetTS) {
00206 POSE_TimeType maxOVT = POSE_UnsetTS;
00207 for (i=0; i<end; i++)
00208 if (objs.objs[i].isPresent()) {
00209 xt = objs.objs[i].getOVT2();
00210 if (xt > maxOVT)
00211 maxOVT = xt;
00212 }
00213 if (maxOVT > estGVT)
00214 pvt = maxOVT;
00215 }
00216
00217
00218 POSE_TimeType maxSR;
00219 UpdateMsg *um = SendsAndRecvs->PackTable(pvt, &maxSR);
00220
00221 um->optPVT = pvt;
00222 um->conPVT = conPVT;
00223 um->maxSR = maxSR;
00224 um->runGVTflag = 0;
00225
00226 if (um->numEntries > 0) {
00227
00228 }
00229
00230 p[reportTo].reportReduce(um);
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240 objs.SetIdle();
00241 iterMin = POSE_UnsetTS;
00242 #ifndef CMK_OPTIMIZE
00243 if(pose_config.stats)
00244 localStats->TimerStop();
00245 #endif
00246 }
00247
00249 void PVT::setGVT(GVTMsg *m)
00250 {
00251 #ifndef CMK_OPTIMIZE
00252 if(pose_config.stats)
00253 localStats->TimerStart(GVT_TIMER);
00254 #endif
00255 CProxy_PVT p(ThePVT);
00256 CkAssert(m->estGVT >= estGVT);
00257 estGVT = m->estGVT;
00258 int i, end = objs.getNumSpaces();
00259 #ifdef POSE_COMM_ON
00260
00261
00262
00263 #endif
00264 simdone = m->done;
00265 CkFreeMsg(m);
00266 waitForFirst = 1;
00267 objs.Commit();
00268 objs.StratCalcs();
00269 #ifdef MEM_TEMPORAL
00270 localTimePool->set_min_time(estGVT);
00271 #endif
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && (estGVT > 0) &&
00291 (((pose_config.checkpoint_gvt_interval > 0) && (estGVT >= (parLastCheckpointGVT + pose_config.checkpoint_gvt_interval))) ||
00292 ((pose_config.checkpoint_time_interval > 0) &&
00293 ((CmiWallTimer() + parStartTime) >= (parLastCheckpointTime + (double)pose_config.checkpoint_time_interval))))) {
00294
00295 objs.CheckpointCommit();
00296
00297 eventMsg *dummyMsg = new eventMsg();
00298 CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
00299 parCheckpointInProgress = 1;
00300 parLastCheckpointTime = CmiWallTimer() + parStartTime;
00301 CkStartQD(cb);
00302 } else if ((CkMyPe() == 0) && (parLBInProgress == 0) &&
00303 (((pose_config.lb_gvt_interval > 0) && (estGVT >= (parLastLBGVT + pose_config.lb_gvt_interval))))) {
00304
00305 eventMsg *dummyMsg = new eventMsg();
00306 CkCallback cb(CkIndex_PVT::beginLoadbalancing(dummyMsg), CkMyPe(), ThePVT);
00307 parLBInProgress = 1;
00308 CkStartQD(cb);
00309 } else {
00310
00311 eventMsg *dummyMsg = new eventMsg();
00312 p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
00313 }
00314 #ifndef CMK_OPTIMIZE
00315 if(pose_config.stats)
00316 localStats->TimerStop();
00317 #endif
00318 }
00319
00321 void PVT::beginCheckpoint(eventMsg *m) {
00322 #ifndef CMK_OPTIMIZE
00323 if(pose_config.stats)
00324 localStats->TimerStart(GVT_TIMER);
00325 #endif
00326 CkFreeMsg(m);
00327 if (parCheckpointInProgress) {
00328 CkPrintf("POSE: quiescence detected\n");
00329 CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00330 eventMsg *dummyMsg = new eventMsg();
00331 CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
00332 CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
00333 }
00334 #ifndef CMK_OPTIMIZE
00335 if(pose_config.stats)
00336 localStats->TimerStop();
00337 #endif
00338 }
00339
00340 void PVT::beginLoadbalancing(eventMsg *m) {
00341 #ifndef CMK_OPTIMIZE
00342 if(pose_config.stats)
00343 localStats->TimerStart(GVT_TIMER);
00344 #endif
00345 CkFreeMsg(m);
00346 if (parLBInProgress) {
00347 CProxy_PVT p(ThePVT);
00348 p.callAtSync();
00349 }
00350 #ifndef CMK_OPTIMIZE
00351 if(pose_config.stats)
00352 localStats->TimerStop();
00353 #endif
00354 }
00355
00356 void PVT::callAtSync() {
00357 #ifndef CMK_OPTIMIZE
00358 if(pose_config.stats)
00359 localStats->TimerStart(GVT_TIMER);
00360 #endif
00361 objs.callAtSync();
00362 #ifndef CMK_OPTIMIZE
00363 if(pose_config.stats)
00364 localStats->TimerStop();
00365 #endif
00366 }
00367
00368 void PVT::doneLB() {
00369 eventMsg *dummyMsg = new eventMsg();
00370 CProxy_PVT p(ThePVT);
00371 p[0].resumeAfterLB(dummyMsg);
00372 }
00373
00375 void PVT::resumeAfterCheckpoint(eventMsg *m) {
00376 #ifndef CMK_OPTIMIZE
00377 if(pose_config.stats)
00378 localStats->TimerStart(GVT_TIMER);
00379 #endif
00380 if (parCheckpointInProgress) {
00381 CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00382 parCheckpointInProgress = 0;
00383 parLastCheckpointGVT = estGVT;
00384 }
00385 CkFreeMsg(m);
00386 CProxy_PVT p(ThePVT);
00387 startPhaseActive = 0;
00388 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00389 startMsg->bc = 1;
00390 *((int *)CkPriorityPtr(startMsg)) = 0;
00391 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00392 p[CkMyPe()].startPhase(startMsg);
00393 #ifndef CMK_OPTIMIZE
00394 if(pose_config.stats)
00395 localStats->TimerStop();
00396 #endif
00397 }
00398
00399
00400 void PVT::resumeAfterLB(eventMsg *m) {
00401 static int count = 0;
00402 count ++;
00403 if (count != CkNumPes()) {
00404 CkFreeMsg(m);
00405 return;
00406 }
00407 count = 0;
00408 #ifndef CMK_OPTIMIZE
00409 if(pose_config.stats)
00410 localStats->TimerStart(GVT_TIMER);
00411 #endif
00412 if (parLBInProgress) {
00413 CkPrintf("POSE: load balancing complete on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00414 parLBInProgress = 0;
00415 parLastLBGVT = estGVT;
00416 }
00417 CkFreeMsg(m);
00418 CProxy_PVT p(ThePVT);
00419 startPhaseActive = 0;
00420 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00421 startMsg->bc = 1;
00422 *((int *)CkPriorityPtr(startMsg)) = 0;
00423 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00424 p[CkMyPe()].startPhase(startMsg);
00425 #ifndef CMK_OPTIMIZE
00426 if(pose_config.stats)
00427 localStats->TimerStop();
00428 #endif
00429 }
00430
00432 int PVT::objRegister(int arrIdx, POSE_TimeType safeTime, int sync, sim *myPtr)
00433 {
00434 int i = objs.Insert(arrIdx, POSE_UnsetTS, sync, myPtr);
00435 return(i*1000 + CkMyPe());
00436 }
00437
00438
00439 void PVT::objRemove(int pvtIdx)
00440 {
00441 int idx = (pvtIdx-CkMyPe())/1000;
00442 objs.Delete(idx);
00443 }
00444
00446 void PVT::objUpdate(POSE_TimeType timestamp, int sr)
00447 {
00448 #ifndef CMK_OPTIMIZE
00449 int tstat = localStats->TimerRunning();
00450 if(pose_config.stats){
00451 if (tstat)
00452 localStats->SwitchTimer(GVT_TIMER);
00453 else
00454 localStats->TimerStart(GVT_TIMER);
00455 }
00456 #endif
00457
00458
00459
00460 CkAssert(simdone>0 || (timestamp >= estGVT) || (estGVT == POSE_UnsetTS));
00461 CkAssert((sr == SEND) || (sr == RECV));
00462 if ((estGVT > POSE_UnsetTS) &&
00463 ((timestamp < iterMin) || (iterMin == POSE_UnsetTS)))
00464 iterMin = timestamp;
00465 if (waitForFirst) {
00466 waitForFirst = 0;
00467 SendsAndRecvs->Restructure(estGVT, timestamp, sr);
00468 }
00469 else SendsAndRecvs->Insert(timestamp, sr);
00470 #ifndef CMK_OPTIMIZE
00471 if(pose_config.stats){
00472 if (tstat)
00473 localStats->SwitchTimer(tstat);
00474 else
00475 localStats->TimerStop();
00476 }
00477 #endif
00478
00479 }
00480
00482 void PVT::objUpdateOVT(int pvtIdx, POSE_TimeType safeTime, POSE_TimeType ovt)
00483 {
00484 int index = (pvtIdx-CkMyPe())/1000;
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495 CkAssert(simdone>0 || (safeTime >= estGVT) || (safeTime == POSE_UnsetTS));
00496 if ((safeTime == POSE_UnsetTS) && (objs.objs[index].getOVT2() < ovt)) {
00497 objs.objs[index].setOVT2(ovt);
00498 } else if ((safeTime > POSE_UnsetTS) &&
00499 ((objs.objs[index].getOVT() > safeTime) || (objs.objs[index].getOVT() == POSE_UnsetTS))) {
00500 objs.objs[index].setOVT(safeTime);
00501 }
00502 }
00503
00505 void PVT::reportReduce(UpdateMsg *m)
00506 {
00507 #ifndef CMK_OPTIMIZE
00508 if(pose_config.stats)
00509 localStats->TimerStart(GVT_TIMER);
00510 #endif
00511 CProxy_PVT p(ThePVT);
00512 CProxy_GVT g(TheGVT);
00513 POSE_TimeType lastGVT = 0, maxSR=0;
00514
00515
00516 if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
00517 optGVT = m->optPVT;
00518 if (m->maxSR > 0)
00519 maxSR = m->maxSR;
00520 addSR(&SRs, m->SRs, optGVT, m->numEntries);
00521 rdone++;
00522 CkFreeMsg(m);
00523
00524 if (rdone == reportsExpected) {
00525 UpdateMsg *um;
00526 int entryCount = 0;
00527
00528 SRentry *tmp = SRs;
00529
00530 while (tmp) {
00531 if (((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS)) && (tmp->sends != tmp->recvs)) {
00532 entryCount++;
00533 }
00534 tmp = tmp->next;
00535 }
00536 um = new (entryCount * sizeof(SRentry), 0) UpdateMsg;
00537 tmp = SRs;
00538 int i=0;
00539 while (tmp) {
00540 if (((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS)) && (tmp->sends != tmp->recvs)) {
00541 um->SRs[i] = *tmp;
00542 i++;
00543 }
00544 tmp = tmp->next;
00545 }
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564 um->numEntries = entryCount;
00565 um->optPVT = optGVT;
00566 um->conPVT = conGVT;
00567 um->maxSR = maxSR;
00568 um->runGVTflag = 0;
00569
00570 if (reportEnd) {
00571 if (simdone>0)
00572 g[0].computeGVT(um);
00573 else {
00574 g[gvtTurn].computeGVT(um);
00575 gvtTurn = (gvtTurn + 1) % CkNumPes();
00576 }
00577 }
00578 else {
00579 p[reportReduceTo].reportReduce(um);
00580 }
00581
00582
00583 optGVT = conGVT = POSE_UnsetTS;
00584 SRentry *cur = SRs;
00585 SRs = NULL;
00586 while (cur) {
00587 tmp = cur->next;
00588 delete cur;
00589 cur = tmp;
00590 }
00591 rdone = 0;
00592 }
00593 #ifndef CMK_OPTIMIZE
00594 if(pose_config.stats)
00595 localStats->TimerStop();
00596 #endif
00597 }
00598
00600 GVT::GVT()
00601 {
00602 #ifdef VERBOSE_DEBUG
00603 CkPrintf("[%d] constructing GVT\n",CkMyPe());
00604 #endif
00605
00606 optGVT = POSE_UnsetTS, conGVT = POSE_UnsetTS;
00607 done=0;
00608 SRs = NULL;
00609 startOffset = 0;
00610 gvtIterationCount = 0;
00611
00612 #ifndef CMK_OPTIMIZE
00613 localStats = (localStat *)CkLocalBranch(theLocalStats);
00614 #endif
00615 #ifndef SEQUENTIAL_POSE
00616 if(pose_config.lb_on)
00617 nextLBstart = pose_config.lb_skip - 1;
00618 #endif
00619 estGVT = lastEarliest = inactiveTime = POSE_UnsetTS;
00620 lastSends = lastRecvs = inactive = 0;
00621 reportsExpected = 1;
00622 if (CkNumPes() >= 2) reportsExpected = 2;
00623
00624
00625 if (CkMyPe() == 0) {
00626 CProxy_PVT p(ThePVT);
00627 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00628 startMsg->bc = 1;
00629 *((int *)CkPriorityPtr(startMsg)) = 0;
00630 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00631 p.startPhase(startMsg);
00632 }
00633 }
00634
00636 void GVT::pup(PUP::er &p) {
00637 p|estGVT; p|inactive; p|inactiveTime; p|nextLBstart;
00638 p|lastEarliest; p|lastSends; p|lastRecvs; p|reportsExpected;
00639 p|optGVT; p|conGVT; p|done; p|startOffset;
00640 p|gvtIterationCount;
00641
00642 if (p.isUnpacking()) {
00643 #ifndef CMK_OPTIMIZE
00644 localStats = (localStat *)CkLocalBranch(theLocalStats);
00645 #endif
00646 }
00647
00648 int nullFlag;
00649 if (SRs == NULL) {
00650 nullFlag = 1;
00651 } else {
00652 nullFlag = 0;
00653 }
00654 p|nullFlag;
00655 if (p.isUnpacking()) {
00656 if (nullFlag) {
00657 SRs = NULL;
00658 } else {
00659 SRs = new SRentry();
00660 SRs->pup(p);
00661 }
00662 } else {
00663 if (!nullFlag) {
00664 SRs->pup(p);
00665 }
00666 }
00667 }
00668
00669
00670
00671
00672
00673
00674
00675
00677 void GVT::runGVT(UpdateMsg *m)
00678 {
00679 #ifndef CMK_OPTIMIZE
00680 if(pose_config.stats)
00681 localStats->TimerStart(GVT_TIMER);
00682 #endif
00683 estGVT = m->optPVT;
00684 inactive = m->inactive;
00685 inactiveTime = m->inactiveTime;
00686 nextLBstart = m->nextLB;
00687 CProxy_GVT g(TheGVT);
00688 m->runGVTflag = 1;
00689 g[CkMyPe()].computeGVT(m);
00690 #ifndef CMK_OPTIMIZE
00691 if(pose_config.stats)
00692 localStats->TimerStop();
00693 #endif
00694 }
00695
00697 void GVT::computeGVT(UpdateMsg *m)
00698 {
00699 #ifndef CMK_OPTIMIZE
00700 if(pose_config.stats)
00701 localStats->TimerStart(GVT_TIMER);
00702 #endif
00703 CProxy_PVT p(ThePVT);
00704 CProxy_GVT g(TheGVT);
00705 GVTMsg *gmsg = new GVTMsg;
00706 POSE_TimeType lastGVT = 0, earliestMsg = POSE_UnsetTS,
00707 earlyAny = POSE_UnsetTS;
00708 SRentry *tmpSRs = SRs;
00709
00710 if (CkMyPe() != 0) startOffset = 1;
00711 if (m->runGVTflag == 1) done++;
00712 else {
00713
00714 if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
00715 optGVT = m->optPVT;
00716 if ((conGVT < 0) || ((m->conPVT > POSE_UnsetTS) && (m->conPVT < conGVT)))
00717 conGVT = m->conPVT;
00718 if (m->maxSR > earlyAny)
00719 earlyAny = m->maxSR;
00720
00721
00722
00723
00724 addSR(&SRs, m->SRs, optGVT, m->numEntries);
00725 done++;
00726 }
00727 CkFreeMsg(m);
00728
00729 if (done == reportsExpected+startOffset) {
00730 #ifndef CMK_OPTIMIZE
00731 if(pose_config.stats)
00732 localStats->GvtInc();
00733 #endif
00734 gvtIterationCount++;
00735 done = 0;
00736 startOffset = 1;
00737 lastGVT = estGVT;
00738 if (lastGVT < 0) lastGVT = 0;
00739 estGVT = POSE_UnsetTS;
00740
00741
00742 estGVT = optGVT;
00743 if ((conGVT > POSE_UnsetTS) && (estGVT > POSE_UnsetTS) && (conGVT < estGVT)) estGVT = conGVT;
00744
00745
00746
00747
00748 SRentry *tmp = SRs;
00749 POSE_TimeType lastSR = POSE_UnsetTS;
00750 while (tmp && ((tmp->timestamp <= estGVT) || (estGVT == POSE_UnsetTS))) {
00751 lastSR = tmp->timestamp;
00752 if (tmp->sends != tmp->recvs) {
00753 earliestMsg = tmp->timestamp;
00754 break;
00755 }
00756 tmp = tmp->next;
00757 }
00758
00759
00760 if (((earliestMsg < estGVT) && (earliestMsg != POSE_UnsetTS)) ||
00761 (estGVT == POSE_UnsetTS))
00762 estGVT = earliestMsg;
00763 if ((lastSR != POSE_UnsetTS) && (estGVT == POSE_UnsetTS) &&
00764 (lastSR > lastGVT))
00765 estGVT = lastSR;
00766
00767
00768 if ((optGVT == POSE_UnsetTS) && (earliestMsg == POSE_UnsetTS)) {
00769 inactive++;
00770
00771
00772
00773
00774
00775
00776
00777 estGVT = lastGVT;
00778 if (inactive == 1) inactiveTime = lastGVT;
00779 }
00780 else if (estGVT < 0) {
00781 estGVT = lastGVT;
00782 inactive = 0;
00783 }
00784 else inactive = 0;
00785
00786
00787
00788 CmiAssert(estGVT >= lastGVT);
00789
00790
00791
00792
00793
00794 int term = 0;
00795 if ((estGVT >= POSE_endtime) && (POSE_endtime > POSE_UnsetTS)) {
00796 #if USE_LONG_TIMESTAMPS
00797 CkPrintf("At endtime: %lld\n", POSE_endtime);
00798 #else
00799 CkPrintf("At endtime: %d\n", POSE_endtime);
00800 #endif
00801 term = 1;
00802 }
00803 else if (inactive > 2) {
00804 #if USE_LONG_TIMESTAMPS
00805 CkPrintf("Simulation inactive at time: %lld\n", inactiveTime);
00806 #else
00807 CkPrintf("Simulation inactive at time: %d\n", inactiveTime);
00808 #endif
00809 term = 1;
00810 }
00811
00812
00813 gmsg->estGVT = estGVT;
00814 gmsg->done = term;
00815 if (term) {
00816
00817
00818 #if USE_LONG_TIMESTAMPS
00819 CkPrintf("Final GVT = %lld\n", gmsg->estGVT);
00820 #else
00821 CkPrintf("Final GVT = %d\n", gmsg->estGVT);
00822 #endif
00823 p.setGVT(gmsg);
00824 POSE_stop();
00825 }
00826 else {
00827 p.setGVT(gmsg);
00828
00829 if(pose_config.lb_on)
00830 {
00831
00832 #ifndef CMK_OPTIMIZE
00833 if(pose_config.stats)
00834 localStats->SwitchTimer(LB_TIMER);
00835 #endif
00836
00837 if (CkNumPes() > 1) {
00838 nextLBstart++;
00839 if (pose_config.lb_skip == nextLBstart) {
00840 TheLBG.calculateLocalLoad();
00841 nextLBstart = 0;
00842 }
00843 }
00844 #ifndef CMK_OPTIMIZE
00845 if(pose_config.stats)
00846 localStats->SwitchTimer(GVT_TIMER);
00847 #endif
00848 }
00849
00850
00851 UpdateMsg *umsg = new UpdateMsg;
00852 umsg->maxSR=0;
00853 umsg->optPVT = estGVT;
00854 umsg->inactive = inactive;
00855 umsg->inactiveTime = inactiveTime;
00856 umsg->nextLB = nextLBstart;
00857 umsg->runGVTflag = 0;
00858 g[(CkMyPe()+1) % CkNumPes()].runGVT(umsg);
00859 }
00860
00861
00862 optGVT = conGVT = POSE_UnsetTS;
00863 SRentry *cur = SRs;
00864 SRs = NULL;
00865 while (cur) {
00866 tmp = cur->next;
00867 delete cur;
00868 cur = tmp;
00869 }
00870 }
00871 #ifndef CMK_OPTIMIZE
00872 if(pose_config.stats)
00873 localStats->TimerStop();
00874 #endif
00875 }
00876
00877 void GVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
00878 {
00879 register int i;
00880 SRentry *tab = (*SRs);
00881 SRentry *tmp = tab;
00882
00883 for (i=0; i<ne; i++) {
00884 if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
00885 if (!tmp) {
00886 tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
00887 tab->sends = e[i].sends;
00888 tab->recvs = e[i].recvs;
00889 tmp = tab;
00890 *SRs = tmp;
00891 }
00892 else {
00893 if (e[i].timestamp < tmp->timestamp) {
00894 CkAssert(tmp == *SRs);
00895 tab = new SRentry(e[i].timestamp, tmp);
00896 tab->sends = e[i].sends;
00897 tab->recvs = e[i].recvs;
00898 tmp = tab;
00899 *SRs = tmp;
00900 }
00901 else if (e[i].timestamp == tmp->timestamp) {
00902 tmp->sends = tmp->sends + e[i].sends;
00903 tmp->recvs = tmp->recvs + e[i].recvs;
00904 }
00905 else {
00906 while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
00907 tmp = tmp->next;
00908 if (!tmp->next) {
00909 tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
00910 tmp->next->sends = tmp->next->sends + e[i].sends;
00911 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00912 tmp = tmp->next;
00913 }
00914 else if (e[i].timestamp == tmp->next->timestamp) {
00915 tmp->next->sends = tmp->next->sends + e[i].sends;
00916 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00917 tmp = tmp->next;
00918 }
00919 else {
00920 tmp->next = new SRentry(e[i].timestamp, tmp->next);
00921 tmp->next->sends = tmp->next->sends + e[i].sends;
00922 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00923 tmp = tmp->next;
00924 }
00925 }
00926 }
00927 }
00928 else break;
00929 }
00930 }
00931
00932 void PVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
00933 {
00934 register int i;
00935 SRentry *tab = (*SRs);
00936 SRentry *tmp = tab;
00937
00938 for (i=0; i<ne; i++) {
00939 if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
00940 if (!tmp) {
00941 tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
00942 tab->sends = e[i].sends;
00943 tab->recvs = e[i].recvs;
00944 tmp = tab;
00945 *SRs = tmp;
00946 }
00947 else {
00948 if (e[i].timestamp < tmp->timestamp) {
00949 CkAssert(tmp == *SRs);
00950 tab = new SRentry(e[i].timestamp, tmp);
00951 tab->sends = e[i].sends;
00952 tab->recvs = e[i].recvs;
00953 tmp = tab;
00954 *SRs = tmp;
00955 }
00956 else if (e[i].timestamp == tmp->timestamp) {
00957 tmp->sends = tmp->sends + e[i].sends;
00958 tmp->recvs = tmp->recvs + e[i].recvs;
00959 }
00960 else {
00961 while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
00962 tmp = tmp->next;
00963 if (!tmp->next) {
00964 tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
00965 tmp->next->sends = tmp->next->sends + e[i].sends;
00966 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00967 tmp = tmp->next;
00968 }
00969 else if (e[i].timestamp == tmp->next->timestamp) {
00970 tmp->next->sends = tmp->next->sends + e[i].sends;
00971 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00972 tmp = tmp->next;
00973 }
00974 else {
00975 tmp->next = new SRentry(e[i].timestamp, tmp->next);
00976 tmp->next->sends = tmp->next->sends + e[i].sends;
00977 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00978 tmp = tmp->next;
00979 }
00980 }
00981 }
00982 }
00983 else break;
00984 }
00985 }