00001
00002 #include "pose.h"
00003 #include "srtable.h"
00004 #include "gvt.def.h"
00005 #include "qd.h"
00006
00007 CkGroupID ThePVT;
00008 CkGroupID TheGVT;
00009 CpvExtern(int, stateRecovery);
00010 CpvExtern(eventID, theEventID);
00011
00012 static void staticDoneLB(void *data)
00013 {
00014 ((PVT*)data)->doneLB();
00015 }
00016
00018 PVT::PVT()
00019 {
00020 #ifdef VERBOSE_DEBUG
00021 CkPrintf("[%d] constructing PVT\n",CkMyPe());
00022 #endif
00023 CpvInitialize(int, stateRecovery);
00024 CpvAccess(stateRecovery) = 0;
00025 CpvInitialize(eventID, theEventID);
00026 CpvAccess(theEventID)=eventID();
00027
00028
00029 optGVT = POSE_UnsetTS; conGVT = POSE_UnsetTS;
00030 rdone=0;
00031 SRs=NULL;
00032 #if !CMK_TRACE_DISABLED
00033 localStats = (localStat *)CkLocalBranch(theLocalStats);
00034 if (pose_config.stats) {
00035 localStats->TimerStart(GVT_TIMER);
00036 }
00037 #endif
00038 #ifdef MEM_TEMPORAL
00039 localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00040 CkPrintf("NOTE: Temporal memory manager is ON!\n");
00041 #endif
00042 optPVT = conPVT = estGVT = POSE_UnsetTS;
00043 startPhaseActive = gvtTurn = simdone = 0;
00044 SendsAndRecvs = new SRtable();
00045 SendsAndRecvs->Initialize();
00046 specEventCount = eventCount = waitForFirst = 0;
00047 iterMin = POSE_UnsetTS;
00048 int P=CkNumPes(), N=CkMyPe();
00049 reportReduceTo = -1;
00050 if ((N < P-2) && (N%2 == 1)) {
00051 reportTo = N-1;
00052 reportsExpected = reportEnd = 0;
00053 }
00054 else if (N < P-2) {
00055 reportTo = N;
00056 reportsExpected = 2;
00057 if (N == P-3)
00058 reportsExpected = 1;
00059 reportEnd = 0;
00060 if (N < (P-2)/2)
00061 reportReduceTo = P-2;
00062 else reportReduceTo = P-1;
00063 }
00064 if (N == P-2) {
00065 reportTo = N;
00066 reportEnd = 1;
00067 reportsExpected = 1 + (P-2)/4 + ((P-2)%4)/2;
00068 }
00069 else if (N == P-1) {
00070 reportTo = N;
00071 reportEnd = 1;
00072 if (P==1) reportsExpected = 1;
00073 else reportsExpected = 1 + (P-2)/4 + (P-2)%2;
00074 }
00075
00076
00077 parCheckpointInProgress = 0;
00078 parLastCheckpointGVT = 0;
00079 parLastCheckpointTime = parStartTime = 0.0;
00080 parLBInProgress = 0;
00081 parLastLBGVT = 0;
00082
00083 #if !CMK_TRACE_DISABLED
00084 if(pose_config.stats)
00085 localStats->TimerStop();
00086 #endif
00087
00088 LBDatabase::Object()->AddMigrationDoneFn(staticDoneLB, this);
00089 }
00090
00092 void PVT::pup(PUP::er &p) {
00093 p|optPVT; p|conPVT; p|estGVT; p|repPVT;
00094 p|simdone; p|iterMin; p|waitForFirst;
00095 p|reportTo; p|reportsExpected; p|reportReduceTo; p|reportEnd;
00096 p|gvtTurn; p|specEventCount; p|eventCount;
00097 p|startPhaseActive; p|parStartTime; p|parCheckpointInProgress;
00098 p|parLastCheckpointGVT; p|parLastCheckpointTime;
00099 p|parLBInProgress; p|parLastLBGVT;
00100 p|optGVT; p|conGVT; p|rdone;
00101
00102 if (p.isUnpacking()) {
00103 parStartTime = parLastCheckpointTime;
00104 #if !CMK_TRACE_DISABLED
00105 localStats = (localStat *)CkLocalBranch(theLocalStats);
00106 #endif
00107 #ifdef MEM_TEMPORAL
00108 localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00109 #endif
00110 SendsAndRecvs = new SRtable();
00111 }
00112
00113 SendsAndRecvs->pup(p);
00114
00115 int nullFlag;
00116 if (SRs == NULL) {
00117 nullFlag = 1;
00118 } else {
00119 nullFlag = 0;
00120 }
00121 p|nullFlag;
00122 if (p.isUnpacking()) {
00123 if (nullFlag) {
00124 SRs = NULL;
00125 } else {
00126 SRs = new SRentry();
00127 SRs->pup(p);
00128 }
00129 } else {
00130 if (!nullFlag) {
00131 SRs->pup(p);
00132 }
00133 }
00134 }
00135
00136 void PVT::startPhaseExp(prioBcMsg *m) {
00137 startPhase(m);
00138 }
00139
00141 void PVT::startPhase(prioBcMsg *m)
00142 {
00143 CProxy_GVT g(TheGVT);
00144 CProxy_PVT p(ThePVT);
00145 int i;
00146
00147 if (startPhaseActive) return;
00148 #if !CMK_TRACE_DISABLED
00149 if(pose_config.stats)
00150 localStats->TimerStart(GVT_TIMER);
00151 #endif
00152 startPhaseActive = 1;
00153 if (m->bc) {
00154 prioBcMsg *startMsg = new (8*sizeof(POSE_TimeType)) prioBcMsg;
00155 startMsg->bc = 0;
00156 *((POSE_TimeType *)CkPriorityPtr(startMsg)) = 1-POSE_TimeMax;
00157 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00158 p.startPhaseExp(startMsg);
00159 }
00160
00161 objs.Wake();
00162
00163
00164 optPVT = conPVT = POSE_UnsetTS;
00165 int end = objs.getNumSpaces();
00166 for (i=0; i<end; i++)
00167 if (objs.objs[i].isPresent()) {
00168 if (objs.objs[i].isOptimistic()) {
00169 if ((optPVT < 0) || ((objs.objs[i].getOVT() < optPVT) &&
00170 (objs.objs[i].getOVT() > POSE_UnsetTS))) {
00171 optPVT = objs.objs[i].getOVT();
00172 CkAssert(simdone>0 || ((objs.objs[i].getOVT() >= estGVT) ||
00173 (objs.objs[i].getOVT() == POSE_UnsetTS)));
00174 }
00175 }
00176 else if (objs.objs[i].isConservative()) {
00177 if ((conPVT < 0) || ((objs.objs[i].getOVT() < conPVT) &&
00178 (objs.objs[i].getOVT() > POSE_UnsetTS)))
00179 conPVT = objs.objs[i].getOVT();
00180 }
00181 CkAssert(simdone>0 || (optPVT >= estGVT)||(optPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
00182 CkAssert(simdone>0 || (conPVT >= estGVT)||(conPVT == POSE_UnsetTS)||(estGVT == POSE_UnsetTS));
00183 }
00184
00185
00186 POSE_TimeType pvt = optPVT;
00187 if ((conPVT < pvt) && (conPVT > POSE_UnsetTS)) pvt = conPVT;
00188 if ((iterMin < pvt) && (iterMin > POSE_UnsetTS)) pvt = iterMin;
00189 if (waitForFirst) {
00190 waitForFirst = 0;
00191 if (pvt == POSE_UnsetTS)
00192 SendsAndRecvs->Restructure(estGVT, estGVT, POSE_UnsetTS);
00193 else
00194 SendsAndRecvs->Restructure(estGVT, pvt, POSE_UnsetTS);
00195 }
00196
00197
00198
00199
00200
00201 POSE_TimeType xt;
00202 if (pvt == POSE_UnsetTS) {
00203 POSE_TimeType maxOVT = POSE_UnsetTS;
00204 for (i=0; i<end; i++)
00205 if (objs.objs[i].isPresent()) {
00206 xt = objs.objs[i].getOVT2();
00207 if (xt > maxOVT)
00208 maxOVT = xt;
00209 }
00210 if (maxOVT > estGVT)
00211 pvt = maxOVT;
00212 }
00213
00214
00215 POSE_TimeType maxSR;
00216 UpdateMsg *um = SendsAndRecvs->PackTable(pvt, &maxSR);
00217
00218 um->optPVT = pvt;
00219 um->conPVT = conPVT;
00220 um->maxSR = maxSR;
00221 um->runGVTflag = 0;
00222
00223 if (um->numEntries > 0) {
00224
00225 }
00226
00227 p[reportTo].reportReduce(um);
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 objs.SetIdle();
00238 iterMin = POSE_UnsetTS;
00239 #if !CMK_TRACE_DISABLED
00240 if(pose_config.stats)
00241 localStats->TimerStop();
00242 #endif
00243 }
00244
00246 void PVT::setGVT(GVTMsg *m)
00247 {
00248 #if !CMK_TRACE_DISABLED
00249 if(pose_config.stats)
00250 localStats->TimerStart(GVT_TIMER);
00251 #endif
00252 CProxy_PVT p(ThePVT);
00253 CkAssert(m->estGVT >= estGVT);
00254 estGVT = m->estGVT;
00255 int i, end = objs.getNumSpaces();
00256 simdone = m->done;
00257 CkFreeMsg(m);
00258 waitForFirst = 1;
00259 objs.Commit();
00260 objs.StratCalcs();
00261 #ifdef MEM_TEMPORAL
00262 localTimePool->set_min_time(estGVT);
00263 #endif
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 if ((CkMyPe() == 0) && (parCheckpointInProgress == 0) && (estGVT > 0) &&
00283 (((pose_config.checkpoint_gvt_interval > 0) && (estGVT >= (parLastCheckpointGVT + pose_config.checkpoint_gvt_interval))) ||
00284 ((pose_config.checkpoint_time_interval > 0) &&
00285 ((CmiWallTimer() + parStartTime) >= (parLastCheckpointTime + (double)pose_config.checkpoint_time_interval))))) {
00286
00287 objs.CheckpointCommit();
00288
00289 eventMsg *dummyMsg = new eventMsg();
00290 CkCallback cb(CkIndex_PVT::beginCheckpoint(dummyMsg), CkMyPe(), ThePVT);
00291 parCheckpointInProgress = 1;
00292 parLastCheckpointTime = CmiWallTimer() + parStartTime;
00293 CkStartQD(cb);
00294 } else if ((CkMyPe() == 0) && (parLBInProgress == 0) &&
00295 (((pose_config.lb_gvt_interval > 0) && (estGVT >= (parLastLBGVT + pose_config.lb_gvt_interval))))) {
00296
00297 eventMsg *dummyMsg = new eventMsg();
00298 CkCallback cb(CkIndex_PVT::beginLoadbalancing(dummyMsg), CkMyPe(), ThePVT);
00299 parLBInProgress = 1;
00300 CkStartQD(cb);
00301 } else {
00302
00303 eventMsg *dummyMsg = new eventMsg();
00304 p[CkMyPe()].resumeAfterCheckpoint(dummyMsg);
00305 }
00306 #if !CMK_TRACE_DISABLED
00307 if(pose_config.stats)
00308 localStats->TimerStop();
00309 #endif
00310 }
00311
00313 void PVT::beginCheckpoint(eventMsg *m) {
00314 #if !CMK_TRACE_DISABLED
00315 if(pose_config.stats)
00316 localStats->TimerStart(GVT_TIMER);
00317 #endif
00318 CkFreeMsg(m);
00319 if (parCheckpointInProgress) {
00320 CkPrintf("POSE: quiescence detected\n");
00321 CkPrintf("POSE: beginning checkpoint on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00322 eventMsg *dummyMsg = new eventMsg();
00323 CkCallback cb(CkIndex_PVT::resumeAfterCheckpoint(dummyMsg), CkMyPe(), ThePVT);
00324 CkStartCheckpoint(POSE_CHECKPOINT_DIRECTORY, cb);
00325 }
00326 #if !CMK_TRACE_DISABLED
00327 if(pose_config.stats)
00328 localStats->TimerStop();
00329 #endif
00330 }
00331
00332 void PVT::beginLoadbalancing(eventMsg *m) {
00333 #if !CMK_TRACE_DISABLED
00334 if(pose_config.stats)
00335 localStats->TimerStart(GVT_TIMER);
00336 #endif
00337 CkFreeMsg(m);
00338 if (parLBInProgress) {
00339 CProxy_PVT p(ThePVT);
00340 p.callAtSync();
00341 }
00342 #if !CMK_TRACE_DISABLED
00343 if(pose_config.stats)
00344 localStats->TimerStop();
00345 #endif
00346 }
00347
00348 void PVT::callAtSync() {
00349 #if !CMK_TRACE_DISABLED
00350 if(pose_config.stats)
00351 localStats->TimerStart(GVT_TIMER);
00352 #endif
00353 objs.callAtSync();
00354 #if !CMK_TRACE_DISABLED
00355 if(pose_config.stats)
00356 localStats->TimerStop();
00357 #endif
00358 }
00359
00360 void PVT::doneLB() {
00361 eventMsg *dummyMsg = new eventMsg();
00362 CProxy_PVT p(ThePVT);
00363 p[0].resumeAfterLB(dummyMsg);
00364 }
00365
00367 void PVT::resumeAfterCheckpoint(eventMsg *m) {
00368 #if !CMK_TRACE_DISABLED
00369 if(pose_config.stats)
00370 localStats->TimerStart(GVT_TIMER);
00371 #endif
00372 if (parCheckpointInProgress) {
00373 CkPrintf("POSE: checkpoint/restart complete on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00374 parCheckpointInProgress = 0;
00375 parLastCheckpointGVT = estGVT;
00376 }
00377 CkFreeMsg(m);
00378 CProxy_PVT p(ThePVT);
00379 startPhaseActive = 0;
00380 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00381 startMsg->bc = 1;
00382 *((int *)CkPriorityPtr(startMsg)) = 0;
00383 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00384 p[CkMyPe()].startPhase(startMsg);
00385 #if !CMK_TRACE_DISABLED
00386 if(pose_config.stats)
00387 localStats->TimerStop();
00388 #endif
00389 }
00390
00391
00392 void PVT::resumeAfterLB(eventMsg *m) {
00393 static int count = 0;
00394 count ++;
00395 if (count != CkNumPes()) {
00396 CkFreeMsg(m);
00397 return;
00398 }
00399 count = 0;
00400 #if !CMK_TRACE_DISABLED
00401 if(pose_config.stats)
00402 localStats->TimerStart(GVT_TIMER);
00403 #endif
00404 if (parLBInProgress) {
00405 CkPrintf("POSE: load balancing complete on processor %d at GVT=%lld sim time=%.1f sec\n", CkMyPe(), estGVT, CmiWallTimer() + parStartTime);
00406 parLBInProgress = 0;
00407 parLastLBGVT = estGVT;
00408 }
00409 CkFreeMsg(m);
00410 CProxy_PVT p(ThePVT);
00411 startPhaseActive = 0;
00412 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00413 startMsg->bc = 1;
00414 *((int *)CkPriorityPtr(startMsg)) = 0;
00415 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00416 p[CkMyPe()].startPhase(startMsg);
00417 #if !CMK_TRACE_DISABLED
00418 if(pose_config.stats)
00419 localStats->TimerStop();
00420 #endif
00421 }
00422
00424 int PVT::objRegister(int arrIdx, POSE_TimeType safeTime, int sync, sim *myPtr)
00425 {
00426 int i = objs.Insert(arrIdx, POSE_UnsetTS, sync, myPtr);
00427 return(i*1000 + CkMyPe());
00428 }
00429
00430
00431 void PVT::objRemove(int pvtIdx)
00432 {
00433 int idx = (pvtIdx-CkMyPe())/1000;
00434 objs.Delete(idx);
00435 }
00436
00438 void PVT::objUpdate(POSE_TimeType timestamp, int sr)
00439 {
00440 #if !CMK_TRACE_DISABLED
00441 int tstat = localStats->TimerRunning();
00442 if(pose_config.stats){
00443 if (tstat)
00444 localStats->SwitchTimer(GVT_TIMER);
00445 else
00446 localStats->TimerStart(GVT_TIMER);
00447 }
00448 #endif
00449
00450
00451
00452 CkAssert(simdone>0 || (timestamp >= estGVT) || (estGVT == POSE_UnsetTS));
00453 CkAssert((sr == SEND) || (sr == RECV));
00454 if ((estGVT > POSE_UnsetTS) &&
00455 ((timestamp < iterMin) || (iterMin == POSE_UnsetTS)))
00456 iterMin = timestamp;
00457 if (waitForFirst) {
00458 waitForFirst = 0;
00459 SendsAndRecvs->Restructure(estGVT, timestamp, sr);
00460 }
00461 else SendsAndRecvs->Insert(timestamp, sr);
00462 #if !CMK_TRACE_DISABLED
00463 if(pose_config.stats){
00464 if (tstat)
00465 localStats->SwitchTimer(tstat);
00466 else
00467 localStats->TimerStop();
00468 }
00469 #endif
00470
00471 }
00472
00474 void PVT::objUpdateOVT(int pvtIdx, POSE_TimeType safeTime, POSE_TimeType ovt)
00475 {
00476 int index = (pvtIdx-CkMyPe())/1000;
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487 CkAssert(simdone>0 || (safeTime >= estGVT) || (safeTime == POSE_UnsetTS));
00488 if ((safeTime == POSE_UnsetTS) && (objs.objs[index].getOVT2() < ovt)) {
00489 objs.objs[index].setOVT2(ovt);
00490 } else if ((safeTime > POSE_UnsetTS) &&
00491 ((objs.objs[index].getOVT() > safeTime) || (objs.objs[index].getOVT() == POSE_UnsetTS))) {
00492 objs.objs[index].setOVT(safeTime);
00493 }
00494 }
00495
00497 void PVT::reportReduce(UpdateMsg *m)
00498 {
00499 #if !CMK_TRACE_DISABLED
00500 if(pose_config.stats)
00501 localStats->TimerStart(GVT_TIMER);
00502 #endif
00503 CProxy_PVT p(ThePVT);
00504 CProxy_GVT g(TheGVT);
00505 POSE_TimeType lastGVT = 0, maxSR=0;
00506
00507
00508 if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
00509 optGVT = m->optPVT;
00510 if (m->maxSR > 0)
00511 maxSR = m->maxSR;
00512 addSR(&SRs, m->SRs, optGVT, m->numEntries);
00513 rdone++;
00514 CkFreeMsg(m);
00515
00516 if (rdone == reportsExpected) {
00517 UpdateMsg *um;
00518 int entryCount = 0;
00519
00520 SRentry *tmp = SRs;
00521
00522 while (tmp) {
00523 if (((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS)) && (tmp->sends != tmp->recvs)) {
00524 entryCount++;
00525 }
00526 tmp = tmp->next;
00527 }
00528 um = new (entryCount * sizeof(SRentry), 0) UpdateMsg;
00529 tmp = SRs;
00530 int i=0;
00531 while (tmp) {
00532 if (((tmp->timestamp <= optGVT) || (optGVT == POSE_UnsetTS)) && (tmp->sends != tmp->recvs)) {
00533 um->SRs[i] = *tmp;
00534 i++;
00535 }
00536 tmp = tmp->next;
00537 }
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556 um->numEntries = entryCount;
00557 um->optPVT = optGVT;
00558 um->conPVT = conGVT;
00559 um->maxSR = maxSR;
00560 um->runGVTflag = 0;
00561
00562 if (reportEnd) {
00563 if (simdone>0)
00564 g[0].computeGVT(um);
00565 else {
00566 g[gvtTurn].computeGVT(um);
00567 gvtTurn = (gvtTurn + 1) % CkNumPes();
00568 }
00569 }
00570 else {
00571 p[reportReduceTo].reportReduce(um);
00572 }
00573
00574
00575 optGVT = conGVT = POSE_UnsetTS;
00576 SRentry *cur = SRs;
00577 SRs = NULL;
00578 while (cur) {
00579 tmp = cur->next;
00580 delete cur;
00581 cur = tmp;
00582 }
00583 rdone = 0;
00584 }
00585 #if !CMK_TRACE_DISABLED
00586 if(pose_config.stats)
00587 localStats->TimerStop();
00588 #endif
00589 }
00590
00592 GVT::GVT()
00593 {
00594 #ifdef VERBOSE_DEBUG
00595 CkPrintf("[%d] constructing GVT\n",CkMyPe());
00596 #endif
00597
00598 optGVT = POSE_UnsetTS, conGVT = POSE_UnsetTS;
00599 done=0;
00600 SRs = NULL;
00601 startOffset = 0;
00602 gvtIterationCount = 0;
00603
00604 #if !CMK_TRACE_DISABLED
00605 localStats = (localStat *)CkLocalBranch(theLocalStats);
00606 #endif
00607 #ifndef SEQUENTIAL_POSE
00608 if(pose_config.lb_on)
00609 nextLBstart = pose_config.lb_skip - 1;
00610 #endif
00611 estGVT = lastEarliest = inactiveTime = POSE_UnsetTS;
00612 lastSends = lastRecvs = inactive = 0;
00613 reportsExpected = 1;
00614 if (CkNumPes() >= 2) reportsExpected = 2;
00615
00616
00617 if (CkMyPe() == 0) {
00618 CProxy_PVT p(ThePVT);
00619 prioBcMsg *startMsg = new (8*sizeof(int)) prioBcMsg;
00620 startMsg->bc = 1;
00621 *((int *)CkPriorityPtr(startMsg)) = 0;
00622 CkSetQueueing(startMsg, CK_QUEUEING_IFIFO);
00623 p.startPhase(startMsg);
00624 }
00625 }
00626
00628 void GVT::pup(PUP::er &p) {
00629 p|estGVT; p|inactive; p|inactiveTime; p|nextLBstart;
00630 p|lastEarliest; p|lastSends; p|lastRecvs; p|reportsExpected;
00631 p|optGVT; p|conGVT; p|done; p|startOffset;
00632 p|gvtIterationCount;
00633
00634 if (p.isUnpacking()) {
00635 #if !CMK_TRACE_DISABLED
00636 localStats = (localStat *)CkLocalBranch(theLocalStats);
00637 #endif
00638 }
00639
00640 int nullFlag;
00641 if (SRs == NULL) {
00642 nullFlag = 1;
00643 } else {
00644 nullFlag = 0;
00645 }
00646 p|nullFlag;
00647 if (p.isUnpacking()) {
00648 if (nullFlag) {
00649 SRs = NULL;
00650 } else {
00651 SRs = new SRentry();
00652 SRs->pup(p);
00653 }
00654 } else {
00655 if (!nullFlag) {
00656 SRs->pup(p);
00657 }
00658 }
00659 }
00660
00661
00662
00663
00664
00665
00666
00667
00669 void GVT::runGVT(UpdateMsg *m)
00670 {
00671 #if !CMK_TRACE_DISABLED
00672 if(pose_config.stats)
00673 localStats->TimerStart(GVT_TIMER);
00674 #endif
00675 estGVT = m->optPVT;
00676 inactive = m->inactive;
00677 inactiveTime = m->inactiveTime;
00678 nextLBstart = m->nextLB;
00679 CProxy_GVT g(TheGVT);
00680 m->runGVTflag = 1;
00681 g[CkMyPe()].computeGVT(m);
00682 #if !CMK_TRACE_DISABLED
00683 if(pose_config.stats)
00684 localStats->TimerStop();
00685 #endif
00686 }
00687
00689 void GVT::computeGVT(UpdateMsg *m)
00690 {
00691 #if !CMK_TRACE_DISABLED
00692 if(pose_config.stats)
00693 localStats->TimerStart(GVT_TIMER);
00694 #endif
00695 CProxy_PVT p(ThePVT);
00696 CProxy_GVT g(TheGVT);
00697 GVTMsg *gmsg = new GVTMsg;
00698 POSE_TimeType lastGVT = 0, earliestMsg = POSE_UnsetTS,
00699 earlyAny = POSE_UnsetTS;
00700 SRentry *tmpSRs = SRs;
00701
00702 if (CkMyPe() != 0) startOffset = 1;
00703 if (m->runGVTflag == 1) done++;
00704 else {
00705
00706 if ((optGVT < 0) || ((m->optPVT > POSE_UnsetTS) && (m->optPVT < optGVT)))
00707 optGVT = m->optPVT;
00708 if ((conGVT < 0) || ((m->conPVT > POSE_UnsetTS) && (m->conPVT < conGVT)))
00709 conGVT = m->conPVT;
00710 if (m->maxSR > earlyAny)
00711 earlyAny = m->maxSR;
00712
00713
00714
00715
00716 addSR(&SRs, m->SRs, optGVT, m->numEntries);
00717 done++;
00718 }
00719 CkFreeMsg(m);
00720
00721 if (done == reportsExpected+startOffset) {
00722 #if !CMK_TRACE_DISABLED
00723 if(pose_config.stats)
00724 localStats->GvtInc();
00725 #endif
00726 gvtIterationCount++;
00727 done = 0;
00728 startOffset = 1;
00729 lastGVT = estGVT;
00730 if (lastGVT < 0) lastGVT = 0;
00731 estGVT = POSE_UnsetTS;
00732
00733
00734 estGVT = optGVT;
00735 if ((conGVT > POSE_UnsetTS) && (estGVT > POSE_UnsetTS) && (conGVT < estGVT)) estGVT = conGVT;
00736
00737
00738
00739
00740 SRentry *tmp = SRs;
00741 POSE_TimeType lastSR = POSE_UnsetTS;
00742 while (tmp && ((tmp->timestamp <= estGVT) || (estGVT == POSE_UnsetTS))) {
00743 lastSR = tmp->timestamp;
00744 if (tmp->sends != tmp->recvs) {
00745 earliestMsg = tmp->timestamp;
00746 break;
00747 }
00748 tmp = tmp->next;
00749 }
00750
00751
00752 if (((earliestMsg < estGVT) && (earliestMsg != POSE_UnsetTS)) ||
00753 (estGVT == POSE_UnsetTS))
00754 estGVT = earliestMsg;
00755 if ((lastSR != POSE_UnsetTS) && (estGVT == POSE_UnsetTS) &&
00756 (lastSR > lastGVT))
00757 estGVT = lastSR;
00758
00759
00760 if ((optGVT == POSE_UnsetTS) && (earliestMsg == POSE_UnsetTS)) {
00761 inactive++;
00762
00763
00764
00765
00766
00767
00768
00769 estGVT = lastGVT;
00770 if (inactive == 1) inactiveTime = lastGVT;
00771 }
00772 else if (estGVT < 0) {
00773 estGVT = lastGVT;
00774 inactive = 0;
00775 }
00776 else inactive = 0;
00777
00778
00779
00780 CmiAssert(estGVT >= lastGVT);
00781
00782
00783
00784
00785
00786 int term = 0;
00787 if ((estGVT >= POSE_endtime) && (POSE_endtime > POSE_UnsetTS)) {
00788 #if USE_LONG_TIMESTAMPS
00789 CkPrintf("At endtime: %lld\n", POSE_endtime);
00790 #else
00791 CkPrintf("At endtime: %d\n", POSE_endtime);
00792 #endif
00793 term = 1;
00794 }
00795 else if (inactive > 2) {
00796 #if USE_LONG_TIMESTAMPS
00797 CkPrintf("Simulation inactive at time: %lld\n", inactiveTime);
00798 #else
00799 CkPrintf("Simulation inactive at time: %d\n", inactiveTime);
00800 #endif
00801 term = 1;
00802 }
00803
00804
00805 gmsg->estGVT = estGVT;
00806 gmsg->done = term;
00807 if (term) {
00808
00809
00810 #if USE_LONG_TIMESTAMPS
00811 CkPrintf("Final GVT = %lld\n", gmsg->estGVT);
00812 #else
00813 CkPrintf("Final GVT = %d\n", gmsg->estGVT);
00814 #endif
00815 p.setGVT(gmsg);
00816 POSE_stop();
00817 }
00818 else {
00819 p.setGVT(gmsg);
00820
00821 if(pose_config.lb_on)
00822 {
00823
00824 #if !CMK_TRACE_DISABLED
00825 if(pose_config.stats)
00826 localStats->SwitchTimer(LB_TIMER);
00827 #endif
00828
00829 if (CkNumPes() > 1) {
00830 nextLBstart++;
00831 if (pose_config.lb_skip == nextLBstart) {
00832 TheLBG.calculateLocalLoad();
00833 nextLBstart = 0;
00834 }
00835 }
00836 #if !CMK_TRACE_DISABLED
00837 if(pose_config.stats)
00838 localStats->SwitchTimer(GVT_TIMER);
00839 #endif
00840 }
00841
00842
00843 UpdateMsg *umsg = new UpdateMsg;
00844 umsg->maxSR=0;
00845 umsg->optPVT = estGVT;
00846 umsg->inactive = inactive;
00847 umsg->inactiveTime = inactiveTime;
00848 umsg->nextLB = nextLBstart;
00849 umsg->runGVTflag = 0;
00850 g[(CkMyPe()+1) % CkNumPes()].runGVT(umsg);
00851 }
00852
00853
00854 optGVT = conGVT = POSE_UnsetTS;
00855 SRentry *cur = SRs;
00856 SRs = NULL;
00857 while (cur) {
00858 tmp = cur->next;
00859 delete cur;
00860 cur = tmp;
00861 }
00862 }
00863 #if !CMK_TRACE_DISABLED
00864 if(pose_config.stats)
00865 localStats->TimerStop();
00866 #endif
00867 }
00868
00869 void GVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
00870 {
00871 int i;
00872 SRentry *tab = (*SRs);
00873 SRentry *tmp = tab;
00874
00875 for (i=0; i<ne; i++) {
00876 if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
00877 if (!tmp) {
00878 tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
00879 tab->sends = e[i].sends;
00880 tab->recvs = e[i].recvs;
00881 tmp = tab;
00882 *SRs = tmp;
00883 }
00884 else {
00885 if (e[i].timestamp < tmp->timestamp) {
00886 CkAssert(tmp == *SRs);
00887 tab = new SRentry(e[i].timestamp, tmp);
00888 tab->sends = e[i].sends;
00889 tab->recvs = e[i].recvs;
00890 tmp = tab;
00891 *SRs = tmp;
00892 }
00893 else if (e[i].timestamp == tmp->timestamp) {
00894 tmp->sends = tmp->sends + e[i].sends;
00895 tmp->recvs = tmp->recvs + e[i].recvs;
00896 }
00897 else {
00898 while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
00899 tmp = tmp->next;
00900 if (!tmp->next) {
00901 tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
00902 tmp->next->sends = tmp->next->sends + e[i].sends;
00903 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00904 tmp = tmp->next;
00905 }
00906 else if (e[i].timestamp == tmp->next->timestamp) {
00907 tmp->next->sends = tmp->next->sends + e[i].sends;
00908 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00909 tmp = tmp->next;
00910 }
00911 else {
00912 tmp->next = new SRentry(e[i].timestamp, tmp->next);
00913 tmp->next->sends = tmp->next->sends + e[i].sends;
00914 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00915 tmp = tmp->next;
00916 }
00917 }
00918 }
00919 }
00920 else break;
00921 }
00922 }
00923
00924 void PVT::addSR(SRentry **SRs, SRentry *e, POSE_TimeType og, int ne)
00925 {
00926 int i;
00927 SRentry *tab = (*SRs);
00928 SRentry *tmp = tab;
00929
00930 for (i=0; i<ne; i++) {
00931 if ((e[i].timestamp < og) || (og == POSE_UnsetTS)) {
00932 if (!tmp) {
00933 tab = new SRentry(e[i].timestamp, (SRentry *)NULL);
00934 tab->sends = e[i].sends;
00935 tab->recvs = e[i].recvs;
00936 tmp = tab;
00937 *SRs = tmp;
00938 }
00939 else {
00940 if (e[i].timestamp < tmp->timestamp) {
00941 CkAssert(tmp == *SRs);
00942 tab = new SRentry(e[i].timestamp, tmp);
00943 tab->sends = e[i].sends;
00944 tab->recvs = e[i].recvs;
00945 tmp = tab;
00946 *SRs = tmp;
00947 }
00948 else if (e[i].timestamp == tmp->timestamp) {
00949 tmp->sends = tmp->sends + e[i].sends;
00950 tmp->recvs = tmp->recvs + e[i].recvs;
00951 }
00952 else {
00953 while (tmp->next && (e[i].timestamp > tmp->next->timestamp))
00954 tmp = tmp->next;
00955 if (!tmp->next) {
00956 tmp->next = new SRentry(e[i].timestamp, (SRentry *)NULL);
00957 tmp->next->sends = tmp->next->sends + e[i].sends;
00958 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00959 tmp = tmp->next;
00960 }
00961 else if (e[i].timestamp == tmp->next->timestamp) {
00962 tmp->next->sends = tmp->next->sends + e[i].sends;
00963 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00964 tmp = tmp->next;
00965 }
00966 else {
00967 tmp->next = new SRentry(e[i].timestamp, tmp->next);
00968 tmp->next->sends = tmp->next->sends + e[i].sends;
00969 tmp->next->recvs = tmp->next->recvs + e[i].recvs;
00970 tmp = tmp->next;
00971 }
00972 }
00973 }
00974 }
00975 else break;
00976 }
00977 }