00001
00002 #ifndef SRTABLE_H
00003 #define SRTABLE_H
00004 #include "pose.h"
00005
00006 #define MAX_B 10
00007
00008
00009
00010 class UpdateMsg;
00011
00013
00014 class SRentry {
00015 public:
00017 POSE_TimeType timestamp;
00019 int sends;
00021 int recvs;
00023 SRentry *next;
00025
00026 SRentry() :timestamp(POSE_UnsetTS), sends(0), recvs(0), next(NULL)
00027 {
00028 }
00030
00031 SRentry(POSE_TimeType ts, SRentry *p) :timestamp(ts), sends(0), recvs(0), next(p)
00032 {
00033 }
00035
00037 SRentry(POSE_TimeType ts, int sr, SRentry *p) :timestamp(ts), sends(0), recvs(0), next(p)
00038 {
00039 if (sr == SEND) { sends = 1; recvs = 0; }
00040 else { sends = 0; recvs = 1; }
00041 }
00043
00044 SRentry(POSE_TimeType ts, int sr) :timestamp(ts), sends(0), recvs(0), next(NULL)
00045 {
00046 if (sr == SEND) { sends = 1; recvs = 0; }
00047 else { sends = 0; recvs = 1; }
00048 }
00050
00052 void pup(PUP::er &p) {
00053 p|timestamp; p|sends; p|recvs;
00054 int nullFlag;
00055 if (next == NULL) {
00056 nullFlag = 1;
00057 } else {
00058 nullFlag = 0;
00059 }
00060 p|nullFlag;
00061 if (p.isUnpacking()) {
00062 if (nullFlag) {
00063 next = NULL;
00064 } else {
00065 next = new SRentry();
00066 next->pup(p);
00067 }
00068 } else {
00069 if (!nullFlag) {
00070 next->pup(p);
00071 }
00072 }
00073 }
00075 inline SRentry& operator=(const SRentry& e) {
00076 timestamp = e.timestamp;
00077 sends = e.sends;
00078 recvs = e.recvs;
00079 return *this;
00080 }
00082 void dump() {
00083 if (next)
00084 CkPrintf("TS:%d #s:%d #r:%d n:!NULL ", timestamp, sends, recvs);
00085 else CkPrintf("TS:%d #s:%d #r:%d n:NULL ",timestamp, sends, recvs);
00086 }
00088 char *dumpString();
00090 void sanitize() {
00091 CmiAssert(timestamp >= POSE_UnsetTS);
00092 CmiAssert(sends >= 0);
00093 CmiAssert(recvs >= 0);
00094 if (next == NULL) return;
00095
00096 POSE_TimeType test_ts = next->timestamp;
00097 int test_sendCount = next->sends;
00098 int test_recvCount = next->recvs;
00099 SRentry *test_next = next->next;
00100 next->timestamp = test_ts;
00101 next->sends = test_sendCount;
00102 next->recvs = test_recvCount;
00103 next->next = test_next;
00104 }
00105 };
00106
00108
00109 class SRtable {
00110 public:
00112
00113 POSE_TimeType offset;
00115
00116 int b;
00118
00119 int size_b;
00121
00122 SRentry *buckets[MAX_B];
00124
00125 SRentry *end_bucket[MAX_B];
00127 int sends[MAX_B], recvs[MAX_B], ofSends, ofRecvs;
00129
00130 SRentry *overflow;
00132 SRentry *end_overflow;
00134
00135 int numEntries[MAX_B];
00137
00138 int numOverflow;
00139
00141
00142 SRtable();
00144 ~SRtable() { FreeTable(); }
00146 void pup(PUP::er &p) {
00147 p|offset; p|b; p|size_b; p|numOverflow;
00148 PUParray(p, sends, MAX_B);
00149 PUParray(p, recvs, MAX_B);
00150 p|ofSends; p|ofRecvs;
00151 PUParray(p, numEntries, MAX_B);
00152
00153
00154 int nullFlag;
00155 SRentry *tmp;
00156 for (int i = 0; i < MAX_B; i++) {
00157 if (buckets[i] == NULL) {
00158 nullFlag = 1;
00159 } else {
00160 nullFlag = 0;
00161 }
00162 p|nullFlag;
00163 if (p.isUnpacking()) {
00164 if (nullFlag) {
00165 buckets[i] = end_bucket[i] = NULL;
00166 } else {
00167 buckets[i] = new SRentry();
00168 buckets[i]->pup(p);
00169 }
00170 } else {
00171 if (!nullFlag) {
00172 buckets[i]->pup(p);
00173 }
00174 }
00175 }
00176
00177
00178 if (overflow == NULL) {
00179 nullFlag = 1;
00180 } else {
00181 nullFlag = 0;
00182 }
00183 p|nullFlag;
00184 if (p.isUnpacking()) {
00185 if (nullFlag) {
00186 overflow = end_overflow = NULL;
00187 } else {
00188 overflow = new SRentry();
00189 tmp = overflow;
00190 do {
00191 tmp->pup(p);
00192
00193
00194
00195
00196
00197
00198 if (tmp->next) {
00199 tmp->next = new SRentry();
00200 } else {
00201
00202
00203 end_overflow = tmp;
00204 }
00205 tmp = tmp->next;
00206 } while (tmp);
00207 }
00208 } else {
00209 if (!nullFlag) {
00210 tmp = overflow;
00211 while (tmp) {
00212 tmp->pup(p);
00213 tmp = tmp->next;
00214 }
00215 }
00216 }
00217 }
00219 void Initialize();
00221 void Insert(POSE_TimeType ts, int sr) {
00222 #ifdef SR_SANITIZE
00223 sanitize();
00224 #endif
00225 CmiAssert(ts >= offset);
00226 CmiAssert((sr == 0) || (sr == 1));
00227 if (size_b == -1) size_b = 1 + ts/b;
00228 POSE_TimeType destBkt = (ts-offset)/size_b;
00229 SRentry *e = new SRentry(ts, sr, NULL);
00230 if (destBkt >= b) {
00231 if (overflow) {
00232 if (end_overflow->timestamp == ts) {
00233 if (sr == SEND) end_overflow->sends++;
00234 else end_overflow->recvs++;
00235 delete e;
00236 }
00237 else {
00238 end_overflow->next = e;
00239 end_overflow = e;
00240 }
00241 }
00242 else overflow = end_overflow = e;
00243 if (sr == SEND) ofSends++;
00244 else ofRecvs++;
00245 }
00246 else {
00247 if (buckets[destBkt]) {
00248 if (end_bucket[destBkt]->timestamp == ts) {
00249
00250 if (sr == SEND) end_bucket[destBkt]->sends++;
00251 else end_bucket[destBkt]->recvs++;
00252 delete e;
00253 }
00254 else {
00255 end_bucket[destBkt]->next = e;
00256 end_bucket[destBkt] = e;
00257 }
00258 }
00259 else buckets[destBkt] = end_bucket[destBkt] = e;
00260 if (sr == SEND) sends[destBkt]++;
00261 else recvs[destBkt]++;
00262 }
00263 #ifdef SR_SANITIZE
00264 sanitize();
00265 #endif
00266 }
00268 void Insert(SRentry *e) {
00269 #ifdef SR_SANITIZE
00270 sanitize();
00271 #endif
00272 CmiAssert(e != NULL);
00273 CmiAssert(e->timestamp >= offset);
00274 POSE_TimeType destBkt = (e->timestamp-offset)/size_b;
00275 e->next = NULL;
00276 if (destBkt >= b) {
00277 if (overflow) {
00278 end_overflow->next = e;
00279 end_overflow = e;
00280 }
00281 else overflow = end_overflow = e;
00282 ofSends += e->sends;
00283 ofRecvs += e->recvs;
00284 }
00285 else {
00286 if (buckets[destBkt]) {
00287 end_bucket[destBkt]->next = e;
00288 end_bucket[destBkt] = e;
00289 }
00290 else buckets[destBkt] = end_bucket[destBkt] = e;
00291 sends[destBkt] += e->sends;
00292 recvs[destBkt] += e->recvs;
00293 }
00294 #ifdef SR_SANITIZE
00295 sanitize();
00296 #endif
00297 }
00299
00301 void Restructure(POSE_TimeType newGVTest, POSE_TimeType firstTS,int firstSR);
00303 void MapToBuckets(SRentry *bkt, SRentry *endBkt, int *s, int *r);
00305 UpdateMsg *PackTable(POSE_TimeType pvt, POSE_TimeType *maxSRb);
00307 void SortTable();
00309 void CompressAndSortBucket(POSE_TimeType i, int is_overflow);
00311 void FreeTable();
00313 void dump();
00315 char *dumpString();
00317 void sanitize();
00319 void self_test();
00320 };
00321
00322 #endif