00001
00002 #include <stdio.h>
00003 #include <string.h>
00004 #include <stdlib.h>
00005 #include "charm++.h"
00006 #include "srtable.h"
00007 #include "gvt.h"
00008
00010 char *SRentry::dumpString() {
00011 char *str= new char[32];
00012 #if USE_LONG_TIMESTAMPS
00013 snprintf(str, 32, "%llds%dr%d ", timestamp, sends, recvs);
00014 #else
00015 snprintf(str, 32, "%ds%dr%d ", timestamp, sends, recvs);
00016 #endif
00017 return str;
00018 }
00019
00021 SRtable::SRtable() : offset(0), b(0), size_b(0), numOverflow(0), overflow(NULL), end_overflow(NULL), ofSends(0), ofRecvs(0)
00022 {
00023 int i;
00024 for (i=0; i<MAX_B; i++) {
00025 buckets[i] = end_bucket[i] = NULL;
00026 numEntries[i] = sends[i] = recvs[i] = 0;
00027 }
00028 }
00029
00031 void SRtable::Initialize()
00032 {
00033 offset = 0; b = MAX_B; size_b = -1;
00034
00035
00036
00037 }
00038
00040
00042 void SRtable::Restructure(POSE_TimeType newGVTest, POSE_TimeType firstTS,
00043 int firstSR)
00044 {
00045 #ifdef SR_SANITIZE
00046 sanitize();
00047 #endif
00048 int i;
00049 POSE_TimeType long_i;
00050
00051 int sumS_old=0, sumR_old=0, sumS=0, sumR=0;
00052 POSE_TimeType keepBkt;
00053 if (size_b == -1) {
00054 size_b = 1;
00055 keepBkt = 0;
00056 }
00057 else keepBkt = (newGVTest-offset)/size_b;
00058 if (keepBkt < b)
00059 for (long_i=keepBkt; long_i<b; long_i++)
00060 CompressAndSortBucket(long_i, 0);
00061 CompressAndSortBucket(b, 1);
00062 int b_old = b, size_b_old = size_b, offset_old = offset;
00063 SRentry *buckets_old[MAX_B], *end_bucket_old[MAX_B],
00064 *overflow_old, *end_overflow_old, *tmp;
00065 int sends_old[MAX_B], recvs_old[MAX_B], ofSends_old, ofRecvs_old;
00066 for (i=0; i<b_old; i++) {
00067 buckets_old[i] = buckets[i];
00068 end_bucket_old[i] = end_bucket[i];
00069 sends_old[i] = sends[i];
00070 recvs_old[i] = recvs[i];
00071 }
00072 overflow_old = overflow;
00073 end_overflow_old = end_overflow;
00074 ofSends_old = ofSends;
00075 ofRecvs_old = ofRecvs;
00076 for (i=0; i<b; i++) { sumS_old += sends_old[i]; sumR_old += recvs_old[i]; }
00077 sumS_old += ofSends_old; sumR_old += ofRecvs_old;
00078
00079 overflow = end_overflow = NULL;
00080 ofSends = ofRecvs = 0;
00081 offset = newGVTest;
00082 POSE_TimeType d = firstTS - offset;
00083 CkAssert(d>=0);
00084 if ((d > MAX_B) || (d == 0)) b = MAX_B;
00085 size_b = 1 + d/b;
00086 for (i=0; i<b; i++) {
00087 buckets[i] = end_bucket[i] = NULL;
00088 sends[i] = recvs[i] = 0;
00089 }
00090
00091 #ifdef SR_SANITIZE
00092 sanitize();
00093 #endif
00094
00095 if (keepBkt < b_old) {
00096 for (long_i=0; long_i<keepBkt; long_i++) {
00097 tmp = buckets_old[long_i];
00098 while (tmp) {
00099 buckets_old[long_i] = tmp->next;
00100 delete tmp;
00101 tmp = buckets_old[long_i];
00102 }
00103 sumS_old -= sends_old[long_i];
00104 sumR_old -= recvs_old[long_i];
00105 sends_old[long_i] = recvs_old[long_i] = 0;
00106 }
00107 #ifdef SR_SANITIZE
00108 sanitize();
00109 #endif
00110
00111 tmp = buckets_old[keepBkt];
00112 while (tmp && (tmp->timestamp < offset)) {
00113 buckets_old[keepBkt] = tmp->next;
00114 sumS_old -= tmp->sends;
00115 sumR_old -= tmp->recvs;
00116 sends_old[keepBkt] -= tmp->sends;
00117 recvs_old[keepBkt] -= tmp->recvs;
00118 delete tmp;
00119 tmp = buckets_old[keepBkt];
00120 }
00121 if (tmp)
00122 MapToBuckets(buckets_old[keepBkt], end_bucket_old[keepBkt],
00123 &sends_old[keepBkt], &recvs_old[keepBkt]);
00124 #ifdef SR_SANITIZE
00125 sanitize();
00126 #endif
00127 for (long_i=keepBkt+1; long_i<b_old; long_i++) {
00128 if (buckets_old[long_i])
00129 MapToBuckets(buckets_old[long_i], end_bucket_old[long_i],
00130 &sends_old[long_i], &recvs_old[long_i]);
00131 }
00132 #ifdef SR_SANITIZE
00133 sanitize();
00134 #endif
00135 if (overflow_old)
00136 MapToBuckets(overflow_old, end_overflow_old, &ofSends_old,
00137 &ofRecvs_old);
00138 #ifdef SR_SANITIZE
00139 sanitize();
00140 #endif
00141 }
00142 else {
00143 for (i=0; i<b_old; i++) {
00144 tmp = buckets_old[i];
00145 while (tmp) {
00146 buckets_old[i] = tmp->next;
00147 delete tmp;
00148 tmp = buckets_old[i];
00149 }
00150 sumS_old -= sends_old[i];
00151 sumR_old -= recvs_old[i];
00152 sends_old[i] = recvs_old[i] = 0;
00153 }
00154 #ifdef SR_SANITIZE
00155 sanitize();
00156 #endif
00157
00158 tmp = overflow_old;
00159 while (tmp && (tmp->timestamp < offset)) {
00160 overflow_old = tmp->next;
00161 sumS_old -= tmp->sends;
00162 sumR_old -= tmp->recvs;
00163 ofSends_old -= tmp->sends;
00164 ofRecvs_old -= tmp->recvs;
00165 delete tmp;
00166 tmp = overflow_old;
00167 }
00168 if (overflow_old)
00169 MapToBuckets(overflow_old, end_overflow_old, &ofSends_old, &ofRecvs_old);
00170 #ifdef SR_SANITIZE
00171 sanitize();
00172 #endif
00173 }
00174
00175 for (i=0; i<b; i++) { sumS += sends[i]; sumR += recvs[i]; }
00176 sumS += ofSends; sumR += ofRecvs;
00177
00178 if ((sumS_old != sumS) || (sumR_old != sumR))
00179 CkPrintf("Old sends=%d; old recvs=%d; new sends=%d, new recvs=%d\n",
00180 sumS_old, sumR_old, sumS, sumR);
00181
00182 if (firstSR != -1) Insert(firstTS, firstSR);
00183 #ifdef SR_SANITIZE
00184 sanitize();
00185 #endif
00186 }
00187
00188 void SRtable::MapToBuckets(SRentry *bkt, SRentry *endBkt, int *s,
00189 int *r)
00190 {
00191 POSE_TimeType destBkt1 = (bkt->timestamp-offset)/size_b;
00192 POSE_TimeType destBkt2 = (endBkt->timestamp-offset)/size_b;
00193 SRentry *tmp = bkt;
00194 CkAssert(destBkt1 <= destBkt2);
00195 while (destBkt1 != destBkt2) {
00196 CkAssert(destBkt1 < destBkt2);
00197 if (destBkt1 >= b) break;
00198 bkt = tmp->next;
00199 (*s) -= tmp->sends;
00200 (*r) -= tmp->recvs;
00201 #ifdef SR_SANITIZE
00202 sanitize();
00203 #endif
00204 if (end_bucket[destBkt1]) {
00205 if (end_bucket[destBkt1]->timestamp == tmp->timestamp) {
00206 end_bucket[destBkt1]->sends += tmp->sends;
00207 end_bucket[destBkt1]->recvs += tmp->recvs;
00208 sends[destBkt1] += tmp->sends;
00209 recvs[destBkt1] += tmp->recvs;
00210 delete tmp;
00211 #ifdef SR_SANITIZE
00212 sanitize();
00213 #endif
00214 }
00215 else {
00216 #ifdef SR_SANITIZE
00217 sanitize();
00218 #endif
00219 end_bucket[destBkt1]->next = tmp;
00220 end_bucket[destBkt1] = tmp;
00221 tmp->next = NULL;
00222 sends[destBkt1] += tmp->sends;
00223 recvs[destBkt1] += tmp->recvs;
00224 #ifdef SR_SANITIZE
00225 sanitize();
00226 #endif
00227 }
00228 }
00229 else {
00230 buckets[destBkt1] = end_bucket[destBkt1] = tmp;
00231 tmp->next = NULL;
00232 sends[destBkt1] = tmp->sends;
00233 recvs[destBkt1] = tmp->recvs;
00234 #ifdef SR_SANITIZE
00235 sanitize();
00236 #endif
00237 }
00238 tmp = bkt;
00239 destBkt1 = (bkt->timestamp-offset)/size_b;
00240 }
00241 if (destBkt1 >= b) {
00242 if (end_overflow) {
00243 end_overflow->next = bkt;
00244 end_overflow = endBkt;
00245 ofSends += (*s);
00246 ofRecvs += (*r);
00247 #ifdef SR_SANITIZE
00248 sanitize();
00249 #endif
00250 }
00251 else {
00252 overflow = bkt;
00253 end_overflow = endBkt;
00254 ofSends = (*s);
00255 ofRecvs = (*r);
00256 #ifdef SR_SANITIZE
00257 sanitize();
00258 #endif
00259 }
00260 }
00261 else {
00262 if (end_bucket[destBkt1]) {
00263 end_bucket[destBkt1]->next = bkt;
00264 end_bucket[destBkt1] = endBkt;
00265 sends[destBkt1] += (*s);
00266 recvs[destBkt1] += (*r);
00267 #ifdef SR_SANITIZE
00268 sanitize();
00269 #endif
00270 }
00271 else {
00272 buckets[destBkt1] = bkt;
00273 end_bucket[destBkt1] = endBkt;
00274 sends[destBkt1] = (*s);
00275 recvs[destBkt1] = (*r);
00276 #ifdef SR_SANITIZE
00277 sanitize();
00278 #endif
00279 }
00280 }
00281 }
00282
00284 UpdateMsg *SRtable::PackTable(POSE_TimeType pvt, POSE_TimeType *maxSR)
00285 {
00286 #ifdef SR_SANITIZE
00287 sanitize();
00288 #endif
00289 int i;
00290 int packSize = 0, nEntries = 0, entryIdx = 0;
00291 POSE_TimeType nBkts = 0;
00292 POSE_TimeType destBkt;
00293 SRentry *tmp;
00294
00295 if (pvt == POSE_UnsetTS) destBkt = b;
00296 else destBkt = (pvt-offset)/size_b;
00297
00298 SortTable();
00299 nBkts = destBkt;
00300 if (destBkt >= b) {
00301 tmp = overflow;
00302 while (tmp && (tmp->timestamp < pvt)) {
00303 nEntries++;
00304 tmp = tmp->next;
00305 }
00306 nBkts = b-1;
00307 }
00308 for (i=0; i<=nBkts; i++) {
00309 tmp = buckets[i];
00310 while (tmp) {
00311 (*maxSR) = tmp->timestamp;
00312 if (tmp->sends != tmp->recvs)
00313 nEntries++;
00314 tmp = tmp->next;
00315 }
00316 }
00317
00318 packSize = nEntries * sizeof(SRentry);
00319 UpdateMsg *um = new (packSize, 0) UpdateMsg;
00320 for (i=0; i<=nBkts; i++) {
00321 tmp = buckets[i];
00322 while (tmp) {
00323 if (tmp->sends != tmp->recvs) {
00324 um->SRs[entryIdx] = *tmp;
00325 entryIdx++;
00326 }
00327 tmp = tmp->next;
00328 }
00329 }
00330
00331 um->maxSR=*maxSR;
00332 if (destBkt >= b) {
00333 tmp = overflow;
00334 while (tmp && (tmp->timestamp < pvt)) {
00335 if (tmp->sends != tmp->recvs) {
00336 um->SRs[entryIdx] = *tmp;
00337 entryIdx++;
00338 }
00339 tmp = tmp->next;
00340 }
00341 }
00342
00343 for (i=0; i<entryIdx-1; i++) {
00344 if (um->SRs[i].timestamp > um->SRs[i+1].timestamp)
00345 CkPrintf("WARNING: SRtable sorting code is broken!\n");
00346 }
00347 CkAssert(entryIdx <= nEntries);
00348
00349
00350
00351 um->numEntries = entryIdx;
00352 #ifdef SR_SANITIZE
00353 sanitize();
00354 #endif
00355 return um;
00356 }
00357
00359 void SRtable::SortTable()
00360 {
00361 #ifdef SR_SANITIZE
00362 sanitize();
00363 #endif
00364 int i;
00365 for (i=0; i<b; i++) CompressAndSortBucket(i, 0);
00366 CompressAndSortBucket(b, 1);
00367 #ifdef SR_SANITIZE
00368 sanitize();
00369 #endif
00370 }
00371
00373 void SRtable::CompressAndSortBucket(POSE_TimeType i, int is_overflow)
00374 {
00375 #ifdef SR_SANITIZE
00376 sanitize();
00377 #endif
00378 SRentry *tmp, *e, *newBucket = NULL, *lastInserted = NULL;
00379 int nEntries = 0;
00380 if (is_overflow) tmp = overflow;
00381 else tmp = buckets[i];
00382 while (tmp) {
00383 e = tmp;
00384 tmp = tmp->next;
00385
00386 if (!newBucket) {
00387 newBucket = lastInserted = e;
00388 e->next = NULL;
00389 nEntries++;
00390 }
00391 else if (lastInserted->timestamp == e->timestamp) {
00392 lastInserted->sends += e->sends;
00393 lastInserted->recvs += e->recvs;
00394 delete e;
00395 }
00396 else if (lastInserted->timestamp < e->timestamp) {
00397 while (lastInserted->next &&
00398 (lastInserted->next->timestamp < e->timestamp))
00399 lastInserted = lastInserted->next;
00400 if (lastInserted->next) {
00401 if (lastInserted->next->timestamp == e->timestamp) {
00402 lastInserted->next->sends += e->sends;
00403 lastInserted->next->recvs += e->recvs;
00404 lastInserted = lastInserted->next;
00405 delete e;
00406 }
00407 else {
00408 e->next = lastInserted->next;
00409 lastInserted->next = e;
00410 lastInserted = e;
00411 nEntries++;
00412 }
00413 }
00414 else {
00415 lastInserted->next = e;
00416 e->next = NULL;
00417 lastInserted = e;
00418 nEntries++;
00419 }
00420 }
00421 else if (newBucket->timestamp > e->timestamp) {
00422 e->next = newBucket;
00423 newBucket = lastInserted = e;
00424 nEntries++;
00425 }
00426 else if (newBucket->timestamp == e->timestamp) {
00427 newBucket->sends += e->sends;
00428 newBucket->recvs += e->recvs;
00429 delete e;
00430 }
00431 else {
00432 lastInserted = newBucket;
00433 while (lastInserted->next &&
00434 (lastInserted->next->timestamp < e->timestamp))
00435 lastInserted = lastInserted->next;
00436 if (lastInserted->next) {
00437 if (lastInserted->next->timestamp == e->timestamp) {
00438 lastInserted->next->sends += e->sends;
00439 lastInserted->next->recvs += e->recvs;
00440 lastInserted = lastInserted->next;
00441 delete e;
00442 }
00443 else {
00444 e->next = lastInserted->next;
00445 lastInserted->next = e;
00446 lastInserted = e;
00447 nEntries++;
00448 }
00449 }
00450 else {
00451 lastInserted->next = e;
00452 e->next = NULL;
00453 lastInserted = e;
00454 nEntries++;
00455 }
00456 }
00457 }
00458 SRentry *lastEntry = newBucket;
00459 if (lastEntry) while (lastEntry->next) lastEntry = lastEntry->next;
00460 if (is_overflow) {
00461 overflow = newBucket;
00462 end_overflow = lastEntry;
00463 numOverflow = nEntries;
00464 }
00465 else {
00466 buckets[i] = newBucket;
00467 end_bucket[i] = lastEntry;
00468 numEntries[i] = nEntries;
00469 }
00470 #ifdef SR_SANITIZE
00471 sanitize();
00472 #endif
00473 }
00474
00476 void SRtable::FreeTable()
00477 {
00478 #ifdef SR_SANITIZE
00479 sanitize();
00480 #endif
00481 int i;
00482 SRentry *tmp;
00483 for (i=0; i<b; i++) {
00484 tmp = buckets[i];
00485 while (tmp) {
00486 buckets[i] = tmp->next;
00487 delete(tmp);
00488 tmp = buckets[i];
00489 }
00490 numEntries[i] = sends[i] = recvs[i] = 0;
00491 }
00492 tmp = overflow;
00493 while (tmp) {
00494 overflow = tmp->next;
00495 delete(tmp);
00496 tmp = overflow;
00497 }
00498 offset = b = size_b = 0;
00499 ofSends = ofRecvs = 0;
00500 }
00501
00503 void SRtable::dump()
00504 {
00505 SRentry *tmp;
00506 CkPrintf("\nSRtable: offset=%d b=%d size_b=%d\n", offset, b, size_b);
00507 for (int i=0; i<b; i++) {
00508 tmp = buckets[i];
00509 CkPrintf("... Bucket[%d]: ", i);
00510 while (tmp) {
00511 tmp->dump();
00512 tmp = tmp->next;
00513 }
00514 CkPrintf("\n");
00515 }
00516 tmp = overflow;
00517 CkPrintf("... Overflow: ");
00518 while (tmp) {
00519 tmp->dump();
00520 tmp = tmp->next;
00521 }
00522 CkPrintf("\n");
00523 }
00524
00526 char *SRtable::dumpString() {
00527 char *str= new char[PVT_DEBUG_BUFFER_LINE_LENGTH];
00528 char *tempStr= new char[32];
00529 SRentry *tmp;
00530 snprintf(str, PVT_DEBUG_BUFFER_LINE_LENGTH, "SRtable[");
00531 for (int i=0; i<b; i++) {
00532 tmp = buckets[i];
00533 snprintf(tempStr, 32, "b%d: ", i);
00534 strncat(str, tempStr, 32);
00535 while (tmp) {
00536 char *stemp=tmp->dumpString();
00537 strcat(str, stemp);
00538 delete stemp;
00539 tmp = tmp->next;
00540 }
00541 }
00542 tmp = overflow;
00543 strncat(str, " OF: ", PVT_DEBUG_BUFFER_LINE_LENGTH);
00544 while (tmp) {
00545 char *stemp=tmp->dumpString();
00546 strcat(str, stemp);
00547 delete [] stemp;
00548 tmp = tmp->next;
00549 }
00550 strncat(str, "]", PVT_DEBUG_BUFFER_LINE_LENGTH);
00551 delete [] tempStr;
00552 return str;
00553 }
00554
00556 void SRtable::sanitize()
00557 {
00558 POSE_TimeType bktMin, bktMax;
00559 int sCount, rCount;
00560 SRentry *tmp;
00561 CmiAssert(offset > -1);
00562 CmiAssert((b>-1) && (b <= MAX_B));
00563 CmiAssert(size_b > -1);
00564 for (int i=0; i<b; i++) {
00565 sCount = rCount = 0;
00566 tmp = buckets[i];
00567 bktMin = i*size_b + offset;
00568 bktMax = i*size_b + (size_b-1) + offset;
00569 if (!tmp) CkAssert(!end_bucket[i]);
00570 while (tmp) {
00571 CkAssert((tmp->timestamp >= bktMin) && (tmp->timestamp <= bktMax));
00572 if (!tmp->next) CkAssert(end_bucket[i] == tmp);
00573 tmp->sanitize();
00574 sCount += tmp->sends;
00575 rCount += tmp->recvs;
00576 tmp = tmp->next;
00577 }
00578 CkAssert(sCount == sends[i]);
00579 CkAssert(rCount == recvs[i]);
00580 }
00581 tmp = overflow;
00582 sCount = rCount = 0;
00583 if (!tmp) CkAssert(!end_overflow);
00584 while (tmp) {
00585 CkAssert(tmp->timestamp >= b*size_b + offset);
00586 if (!tmp->next) CkAssert(end_overflow == tmp);
00587 tmp->sanitize();
00588 sCount += tmp->sends;
00589 rCount += tmp->recvs;
00590 tmp = tmp->next;
00591 }
00592 CkAssert(sCount == ofSends);
00593 CkAssert(rCount == ofRecvs);
00594 }