00001
00002 #include "pose.h"
00003
00005 eventQueue::eventQueue()
00006 {
00007 lastLoggedVT = 0;
00008 Event *e;
00009 eqh = new EqHeap();
00010 largest = POSE_UnsetTS;
00011 mem_usage = 0;
00012 eventCount = 0;
00013 tsOfLastInserted = 0;
00014
00015 e = new Event();
00016 e->timestamp = POSE_UnsetTS;
00017 e->done = -1;
00018 e->fnIdx = -99;
00019 e->msg = NULL;
00020 e->commitBfr = NULL;
00021 e->spawnedList = NULL;
00022 e->commitBfrLen = 0;
00023 e->next = e->prev = NULL;
00024 frontPtr = e;
00025
00026 e = new Event();
00027 e->timestamp=POSE_UnsetTS;
00028 e->done = -1;
00029 e->fnIdx = -100;
00030 e->msg = NULL;
00031 e->commitBfr = NULL;
00032 e->spawnedList = NULL;
00033 e->commitBfrLen = 0;
00034 e->next = e->prev = NULL;
00035 currentPtr = backPtr = e;
00036
00037 frontPtr->next = backPtr;
00038 backPtr->prev = frontPtr;
00039 RBevent = NULL;
00040
00041 recentAvgEventSparsity = 1;
00042 sparsityStartTime = 0;
00043 sparsityCalcCount = 0;
00044 tsDiffCount = 0;
00045 lastCommittedTS = 0;
00046 for (int i = 0; i < DIFFS_TO_STORE; i++) {
00047 tsCommitDiffs[i] = 0;
00048 }
00049 #ifdef MEM_TEMPORAL
00050 localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00051 #endif
00052 #ifdef EQ_SANITIZE
00053 sanitize();
00054 #endif
00055 }
00056
00058 eventQueue::~eventQueue()
00059 {
00060 Event *tmp1 = frontPtr, *tmp2 = frontPtr->next;
00061 while (tmp2) {
00062 free(tmp1);
00063 tmp1 = tmp2;
00064 tmp2 = tmp1->next;
00065 }
00066 free(tmp1);
00067 delete eqh;
00068 }
00069
00071 void eventQueue::InsertEvent(Event *e)
00072 {
00073 tsOfLastInserted = e->timestamp;
00074 if(pose_config.deterministic)
00075 {
00076 InsertEventDeterministic(e);
00077 }
00078 else
00079 {
00080 #ifdef EQ_SANITIZE
00081 sanitize();
00082 #endif
00083 Event *tmp = backPtr->prev;
00084
00085 if (e->timestamp > largest) largest = e->timestamp;
00086 eventCount++;
00087
00088
00089
00090
00091 if ((tmp->timestamp < e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID < e->evID)) && (currentPtr != backPtr))
00092 eqh->InsertEvent(e);
00093 else {
00094 if ((currentPtr != backPtr) && (currentPtr->timestamp > e->timestamp))
00095 tmp = currentPtr;
00096 while (tmp->timestamp > e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID > e->evID))
00097 tmp = tmp->prev;
00098
00099 e->prev = tmp;
00100 e->next = tmp->next;
00101 e->next->prev = e;
00102 tmp->next = e;
00103
00104 if ((currentPtr->prev == e) && (currentPtr->done < 1))
00105 currentPtr = currentPtr->prev;
00106 else if ((currentPtr == backPtr) || (e->timestamp < currentPtr->timestamp || (e->timestamp == currentPtr->timestamp && e->evID < currentPtr->evID)))
00107 SetRBevent(e);
00108 }
00109 #ifdef EQ_SANITIZE
00110 sanitize();
00111 #endif
00112 }
00113 }
00114
00116
00118 void eventQueue::InsertEventDeterministic(Event *e)
00119 {
00120 Event *tmp = backPtr->prev;
00121 if (e->timestamp > largest) largest = e->timestamp;
00122 eventCount++;
00123
00124
00125
00126
00127 if ((tmp->timestamp < e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID < e->evID)) && (currentPtr != backPtr))
00128 eqh->InsertEvent(e);
00129 else {
00130 if ((currentPtr != backPtr) && (currentPtr->timestamp > e->timestamp))
00131 tmp = currentPtr;
00132 while (tmp->timestamp > e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID > e->evID))
00133 tmp = tmp->prev;
00134
00135 if (tmp->timestamp == e->timestamp)
00136 while ((tmp->timestamp == e->timestamp) && (e->evID < tmp->evID))
00137 tmp = tmp->prev;
00138
00139 e->prev = tmp;
00140 e->next = tmp->next;
00141 e->next->prev = e;
00142 tmp->next = e;
00143
00144 if ((currentPtr->prev == e) && (currentPtr->done < 1))
00145 currentPtr = currentPtr->prev;
00146 else if ((currentPtr == backPtr) ||
00147 ((e->timestamp < currentPtr->timestamp) ||
00148 ((e->timestamp == currentPtr->timestamp) &&
00149 (e->evID < currentPtr->evID))))
00150 SetRBevent(e);
00151 }
00152 #ifdef EQ_SANITIZE
00153 sanitize();
00154 #endif
00155 }
00156
00157 void eventQueue::CommitStatsHelper(sim *obj, Event *commitPtr) {
00158 #if !CMK_TRACE_DISABLED
00159 localStat *localStats = (localStat *)CkLocalBranch(theLocalStats);
00160 if (pose_config.stats) {
00161 localStats->Commit();
00162 }
00163
00164 if (pose_config.dop) {
00165
00166
00167 if (lastLoggedVT >= commitPtr->svt) {
00168 commitPtr->svt = commitPtr->evt = -1;
00169 } else {
00170 lastLoggedVT = commitPtr->evt;
00171 }
00172 localStats->WriteDopData(commitPtr->srt, commitPtr->ert, commitPtr->svt, commitPtr->evt);
00173 localStats->SetMaximums(commitPtr->evt, commitPtr->ert);
00174 }
00175 #endif
00176
00177
00178 if (obj->myStrat->STRAT_T == ADAPT5_T) {
00179
00180 sparsityCalcCount++;
00181
00182 if (sparsityCalcCount >= EVQ_SPARSE_CALC_PERIOD) {
00183 recentAvgEventSparsity = (int)((commitPtr->timestamp - sparsityStartTime) / sparsityCalcCount);
00184 if (recentAvgEventSparsity < 1) {
00185 recentAvgEventSparsity = 1;
00186 }
00187 ((adapt5 *)obj->myStrat)->setRecentAvgEventSparsity(recentAvgEventSparsity);
00188 sparsityCalcCount = 0;
00189 sparsityStartTime = commitPtr->timestamp;
00190 }
00191
00192
00193 POSE_TimeType diff = commitPtr->timestamp - lastCommittedTS;
00194 lastCommittedTS = commitPtr->timestamp;
00195 for (int i = 0; i < DIFFS_TO_STORE; i++) {
00196 if (diff > tsCommitDiffs[i]) {
00197
00198 for (int j = DIFFS_TO_STORE - 2; j > i; j--) {
00199 tsCommitDiffs[j+1] = tsCommitDiffs[j];
00200 }
00201 if (i < DIFFS_TO_STORE - 1) {
00202
00203 tsCommitDiffs[i+1] = tsCommitDiffs[i];
00204 }
00205 tsCommitDiffs[i] = diff;
00206 break;
00207 }
00208 }
00209 tsDiffCount++;
00210
00211
00212 if (tsDiffCount >= TS_DIFF_WIN_SIZE) {
00213 POSE_TimeType totalDiff = 0;
00214 for (int i = HIGHEST_DIFFS_TO_IGNORE; i < DIFFS_TO_STORE; i++) {
00215 totalDiff += tsCommitDiffs[i];
00216 }
00217 POSE_TimeType avgDiff = totalDiff / NUM_DIFFS_TO_AVERAGE;
00218 ((adapt5 *)obj->myStrat)->setTimeLeash(DIFFS_TO_STORE * avgDiff);
00219 tsDiffCount = 0;
00220 for (int i = 0; i < DIFFS_TO_STORE; i++) {
00221 tsCommitDiffs[i] = 0;
00222 }
00223 }
00224
00225 }
00226
00227 }
00228
00230 void eventQueue::CommitEvents(sim *obj, POSE_TimeType ts)
00231 {
00232 #ifdef EQ_SANITIZE
00233 sanitize();
00234 #endif
00235 Event *target = frontPtr->next, *commitPtr = frontPtr->next;
00236 if (ts == POSE_endtime) {
00237 CommitAll(obj);
00238 #ifdef MEM_TEMPORAL
00239 localTimePool->set_min_time(ts);
00240 localTimePool->empty_recycle_bin();
00241 #endif
00242 return;
00243 }
00244
00245
00246 if (obj->objID->usesAntimethods()) {
00247 while ((commitPtr->timestamp < ts) && (commitPtr != backPtr)
00248 && (commitPtr != currentPtr)) {
00249 CmiAssert(commitPtr->done == 1);
00250 obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00251 obj->basicStats[0]++;
00252 CommitStatsHelper(obj, commitPtr);
00253 if (commitPtr->commitBfrLen > 0) {
00254 CkPrintf("%s", commitPtr->commitBfr);
00255 if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00256 }
00257 commitPtr = commitPtr->next;
00258 }
00259 }
00260 else {
00261 while ((target != backPtr) && (target->timestamp < ts) &&
00262 (target != currentPtr)) {
00263 while (commitPtr != target) {
00264 CmiAssert(commitPtr->done == 1);
00265 obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00266 obj->basicStats[0]++;
00267 CommitStatsHelper(obj, commitPtr);
00268 if (commitPtr->commitBfrLen > 0) {
00269 CkPrintf("%s", commitPtr->commitBfr);
00270 if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00271 }
00272 commitPtr = commitPtr->next;
00273 }
00274
00275 target = target->next;
00276 #ifdef MEM_TEMPORAL
00277 while (!target->serialCPdata && (target->timestamp <ts) && (target != backPtr))
00278 #else
00279 while (!target->cpData && (target->timestamp <ts) && (target != backPtr))
00280 #endif
00281 target = target->next;
00282 }
00283 }
00284
00285 Event *link = commitPtr;
00286 commitPtr = commitPtr->prev;
00287 while (commitPtr != frontPtr) {
00288 #ifdef MEM_TEMPORAL
00289 if (commitPtr->serialCPdata) {
00290 localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00291 #else
00292 if (commitPtr->cpData) {
00293 delete commitPtr->cpData;
00294 #endif
00295 }
00296 commitPtr = commitPtr->prev;
00297 delete commitPtr->next;
00298 mem_usage--;
00299 }
00300 frontPtr->next = link;
00301 link->prev = frontPtr;
00302 #ifdef EQ_SANITIZE
00303 sanitize();
00304 #endif
00305 }
00306
00308 void eventQueue::CommitAll(sim *obj)
00309 {
00310 #ifdef EQ_SANITIZE
00311 sanitize();
00312 #endif
00313 Event *commitPtr = frontPtr->next;
00314
00315
00316 while (commitPtr != backPtr) {
00317 if (commitPtr->done) {
00318 obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00319 obj->basicStats[0]++;
00320 CommitStatsHelper(obj, commitPtr);
00321 if (commitPtr->commitBfrLen > 0) {
00322 CkPrintf("%s", commitPtr->commitBfr);
00323 if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00324 }
00325 }
00326 commitPtr = commitPtr->next;
00327 }
00328
00329
00330
00331 Event *link = commitPtr;
00332 commitPtr = commitPtr->prev;
00333 while (commitPtr != frontPtr) {
00334 #ifdef MEM_TEMPORAL
00335 if (commitPtr->serialCPdata) {
00336 localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00337 }
00338 #else
00339 if (commitPtr->cpData) {
00340 delete commitPtr->cpData;
00341 }
00342 #endif
00343 commitPtr = commitPtr->prev;
00344 mem_usage--;
00345 delete commitPtr->next;
00346 }
00347 frontPtr->next = link;
00348 link->prev = frontPtr;
00349 #ifdef EQ_SANITIZE
00350 sanitize();
00351 #endif
00352 }
00353
00355 void eventQueue::CommitDoneEvents(sim *obj) {
00356 #ifdef EQ_SANITIZE
00357 sanitize();
00358 #endif
00359 Event *commitPtr = frontPtr->next;
00360
00361
00362 while (commitPtr != backPtr) {
00363 if (commitPtr->done) {
00364 obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00365 obj->basicStats[0]++;
00366 CommitStatsHelper(obj, commitPtr);
00367 if (commitPtr->commitBfrLen > 0) {
00368 CkPrintf("%s", commitPtr->commitBfr);
00369 if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00370 }
00371 }
00372 commitPtr = commitPtr->next;
00373 }
00374
00375
00376 Event *link = commitPtr;
00377 commitPtr = commitPtr->prev;
00378 while (commitPtr != frontPtr) {
00379 if (commitPtr->done == 1) {
00380 #ifdef MEM_TEMPORAL
00381 if (commitPtr->serialCPdata) {
00382 localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00383 }
00384 #else
00385 if (commitPtr->cpData) {
00386 delete commitPtr->cpData;
00387 }
00388 #endif
00389 }
00390 commitPtr = commitPtr->prev;
00391 if (commitPtr->next->done == 1) {
00392 mem_usage--;
00393 delete commitPtr->next;
00394 } else {
00395 link = commitPtr->next;
00396 }
00397 }
00398 frontPtr->next = link;
00399 link->prev = frontPtr;
00400 #ifdef EQ_SANITIZE
00401 sanitize();
00402 #endif
00403 }
00404
00406 void eventQueue::SetCurrentPtr(Event *e) {
00407 Event *tmp = e;
00408
00409 CmiAssert((e->done == 0) || (e->done == -1));
00410 CmiAssert(currentPtr->done != 2);
00411 currentPtr = e;
00412 if ((currentPtr == backPtr) && (eqh->top)) {
00413
00414 tmp = eqh->GetAndRemoveTopEvent();
00415 tmp->next = currentPtr;
00416 tmp->prev = currentPtr->prev;
00417 currentPtr->prev->next = tmp;
00418 currentPtr->prev = tmp;
00419 currentPtr = tmp;
00420 }
00421 }
00422
00424 void eventQueue::DeleteEvent(Event *ev)
00425 {
00426 #ifdef EQ_SANITIZE
00427 sanitize();
00428 #endif
00429 Event *tmp;
00430 CmiAssert(ev != currentPtr);
00431 CmiAssert(ev->spawnedList == NULL);
00432 CmiAssert(ev != frontPtr);
00433 CmiAssert(ev != backPtr);
00434
00435 if (RBevent == ev) {
00436 RBevent = NULL;
00437 tmp = ev->next;
00438 while ((tmp != currentPtr) && (tmp != backPtr) && (tmp->done == 1))
00439 tmp = tmp->next;
00440 if ((tmp != currentPtr) && (tmp != backPtr) && (tmp->done == 0))
00441 RBevent = tmp;
00442 }
00443
00444 ev->prev->next = ev->next;
00445 ev->next->prev = ev->prev;
00446 POSE_TimeType ts = ev->timestamp;
00447 if (!ev->done) eventCount--;
00448 else mem_usage--;
00449 delete ev;
00450 if (ts == largest) FindLargest();
00451 #ifdef EQ_SANITIZE
00452 sanitize();
00453 #endif
00454 }
00455
00457 void eventQueue::dump()
00458 {
00459 Event *e = frontPtr;
00460 CkPrintf("[EVENTQUEUE: \n");
00461 while (e) {
00462 #if USE_LONG_TIMESTAMPS
00463 CkPrintf("%lld[", e->timestamp); e->evID.dump(); CkPrintf(".%d", e->done); CkPrintf("]");
00464 #else
00465 CkPrintf("%d[", e->timestamp); e->evID.dump(); CkPrintf("]");
00466 #endif
00467 if (e == frontPtr) CkPrintf("(FP)");
00468 if (e == currentPtr) CkPrintf("(CP)");
00469 if (e == backPtr) CkPrintf("(BP)");
00470 CkPrintf(" ");
00471 e = e->next;
00472 }
00473 CkPrintf("\n");
00474 eqh->dump();
00475 CkPrintf("end EVENTQUEUE]\n");
00476 }
00477
00479 char *eventQueue::dumpString() {
00480 Event *e = frontPtr;
00481 char *str= new char[PVT_DEBUG_BUFFER_LINE_LENGTH];
00482 char *tempStr= new char[PVT_DEBUG_BUFFER_LINE_LENGTH];
00483 snprintf(str, PVT_DEBUG_BUFFER_LINE_LENGTH, "[EVQ: ");
00484 while (e) {
00485 #if USE_LONG_TIMESTAMPS
00486 snprintf(tempStr, PVT_DEBUG_BUFFER_LINE_LENGTH, "%lld[%u.%d.%d]", e->timestamp, e->evID.id, e->evID.getPE(), e->done);
00487 #else
00488 sprintf(tempStr, PVT_DEBUG_BUFFER_LINE_LENGTH, "%d[%u.%d.%d]", e->timestamp, e->evID.id, e->evID.getPE(), e->done);
00489 #endif
00490 strncat(str, tempStr, 32);
00491
00492 if (e == frontPtr) strncat(str, "(FP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00493 if (e == currentPtr) strncat(str, "(CP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00494 if (e == backPtr) strncat(str, "(BP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00495 strncat(str, " ", PVT_DEBUG_BUFFER_LINE_LENGTH);
00496 e = e->next;
00497 }
00498 char *eqs=eqh->dumpString();
00499 strncat(str, eqs, 32);
00500 delete [] tempStr;
00501 delete [] eqs;
00502 strncat(str, "end EVQ]", PVT_DEBUG_BUFFER_LINE_LENGTH);
00503 return str;
00504 }
00505
00507 void eventQueue::pup(PUP::er &p)
00508 {
00509 p|tsOfLastInserted; p|recentAvgEventSparsity;
00510 p|sparsityStartTime; p|sparsityCalcCount;
00511 Event *tmp;
00512 int i;
00513 int countlist = 0;
00514 if (p.isUnpacking()) {
00515 p(countlist);
00516 tmp = frontPtr;
00517 for (i=0; i<countlist; i++) {
00518 tmp->next = new Event;
00519 tmp->next->prev = tmp;
00520 tmp->next->next = NULL;
00521 tmp = tmp->next;
00522 tmp->pup(p);
00523 }
00524 tmp->next = backPtr;
00525 backPtr->prev = tmp;
00526 currentPtr = backPtr;
00527 if ((countlist > 0) && (backPtr->prev != frontPtr))
00528 while ((currentPtr->prev->done == 0) &&
00529 (currentPtr->prev != frontPtr))
00530 currentPtr = currentPtr->prev;
00531 eqh->pup(p);
00532 }
00533 else {
00534 tmp = frontPtr->next;
00535 while (tmp != backPtr) {
00536 countlist++;
00537 tmp = tmp->next;
00538 }
00539 p(countlist);
00540 tmp = frontPtr->next;
00541 for (i=0; i<countlist; i++) {
00542 tmp->pup(p);
00543 tmp = tmp->next;
00544 }
00545 eqh->pup(p);
00546 }
00547 }
00548
00550 void eventQueue::sanitize()
00551 {
00552
00553 CmiAssert(frontPtr != NULL);
00554 CmiAssert(frontPtr->timestamp == POSE_UnsetTS);
00555 CmiAssert(frontPtr->done == -1);
00556 CmiAssert(frontPtr->fnIdx == -99);
00557 CmiAssert(frontPtr->msg == NULL);
00558 CmiAssert(frontPtr->commitBfr == NULL);
00559 CmiAssert(frontPtr->spawnedList == NULL);
00560 CmiAssert(frontPtr->next != NULL);
00561 CmiAssert(frontPtr->prev == NULL);
00562 CmiAssert(frontPtr->commitBfrLen == 0);
00563 CmiAssert(backPtr != NULL);
00564 CmiAssert(backPtr->timestamp == POSE_UnsetTS);
00565 CmiAssert(backPtr->done == -1);
00566 CmiAssert(backPtr->fnIdx == -100);
00567 CmiAssert(backPtr->msg == NULL);
00568 CmiAssert(backPtr->commitBfr == NULL);
00569 CmiAssert(backPtr->spawnedList == NULL);
00570 CmiAssert(backPtr->next == NULL);
00571 CmiAssert(backPtr->prev != NULL);
00572 CmiAssert(backPtr->commitBfrLen == 0);
00573
00574
00575 Event *tmp = frontPtr->next;
00576 while (tmp != backPtr) {
00577 CmiAssert(tmp->next != NULL);
00578 tmp->sanitize();
00579 tmp = tmp->next;
00580 }
00581
00582
00583 tmp = backPtr->prev;
00584 while (tmp != frontPtr) {
00585 CmiAssert(tmp->prev != NULL);
00586 tmp->sanitize();
00587 tmp = tmp->prev;
00588 }
00589
00590
00591 CmiAssert(currentPtr != NULL);
00592
00593
00594 tmp = currentPtr;
00595 while (tmp != backPtr) {
00596 CmiAssert(tmp->next != NULL);
00597 CmiAssert((tmp->next == backPtr) ||
00598 (tmp->timestamp <= tmp->next->timestamp));
00599 tmp = tmp->next;
00600 }
00601
00602 while (tmp != currentPtr) {
00603 CmiAssert(tmp->prev != NULL);
00604 tmp = tmp->prev;
00605 }
00606
00607 while (tmp != frontPtr) {
00608 CmiAssert(tmp->prev != NULL);
00609 tmp = tmp->prev;
00610 }
00611
00612 while (tmp != currentPtr) {
00613 CmiAssert(tmp->next != NULL);
00614 tmp = tmp->next;
00615 }
00616
00617
00618 if ((frontPtr->next != backPtr) && (frontPtr->next->done))
00619 #ifdef MEM_TEMPORAL
00620 CmiAssert(frontPtr->next->serialCPdata);
00621 #else
00622 CmiAssert(frontPtr->next->cpData);
00623 #endif
00624
00625
00626 tmp = frontPtr->next;
00627 while ((tmp != currentPtr) && (tmp->done == 1))
00628 tmp = tmp->next;
00629 if (tmp == currentPtr) CmiAssert(RBevent == NULL);
00630 else CmiAssert((RBevent == NULL) || (tmp == RBevent));
00631
00632
00633 eqh->sanitize();
00634 }