00001
00010
00011 #include "converse.h"
00012 #if CMK_PERSISTENT_COMM
00013
00014
00015
00016 #include "compress.C"
00017 #include "compress-external.C"
00018
00019 #include "machine-persistent.h"
00020 #include "machine-lrts.h"
00021 #define ENVELOP_SIZE 104
00022
00023 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
00024 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
00025 CpvDeclare(int, persistentSendsTableCount);
00026 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
00027 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
00028 CpvDeclare(int, persistentReceivesTableCount);
00029
00030
00031 typedef struct _PersistentRequestMsg {
00032 char core[CmiMsgHeaderSizeBytes];
00033 int requestorPE;
00034 int maxBytes;
00035 PersistentHandle sourceHandler;
00036 #if DELTA_COMPRESS
00037 int compressStart;
00038 int dataType;
00039 #endif
00040 } PersistentRequestMsg;
00041
00042 typedef struct _PersistentReqGrantedMsg {
00043 char core[CmiMsgHeaderSizeBytes];
00044
00045
00046
00047
00048 PersistentBuf buf[PERSIST_BUFFERS_NUM];
00049 PersistentHandle sourceHandler;
00050 PersistentHandle destHandler;
00051 PersistentHandle destDataHandler;
00052 } PersistentReqGrantedMsg;
00053
00054 typedef struct _PersistentDestroyMsg {
00055 char core[CmiMsgHeaderSizeBytes];
00056 PersistentHandle destHandlerIndex;
00057 } PersistentDestroyMsg;
00058
00059
00060 int persistentRequestHandlerIdx;
00061 int persistentReqGrantedHandlerIdx;
00062 int persistentDestroyHandlerIdx;
00063 int persistentDecompressHandlerIdx;
00064 int persistentNoDecompressHandlerIdx;
00065
00066 CpvDeclare(PersistentHandle *, phs);
00067 CpvDeclare(int, phsSize);
00068 CpvDeclare(int, curphs);
00069
00070
00071
00072
00073
00074 void initRecvSlot(PersistentReceivesTable *slot);
00075 void initSendSlot(PersistentSendsTable *slot);
00076
00077 void swapSendSlotBuffers(PersistentSendsTable *slot)
00078 {
00079 if (PERSIST_BUFFERS_NUM == 2) {
00080 #if 0
00081 void *tmp = slot->destAddress[0];
00082 slot->destAddress[0] = slot->destAddress[1];
00083 slot->destAddress[1] = tmp;
00084 tmp = slot->destSizeAddress[0];
00085 slot->destSizeAddress[0] = slot->destSizeAddress[1];
00086 slot->destSizeAddress[1] = tmp;
00087 #else
00088 PersistentBuf tmp = slot->destBuf[0];
00089 slot->destBuf[0] = slot->destBuf[1];
00090 slot->destBuf[1] = tmp;
00091 #endif
00092 }
00093 }
00094
00095 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
00096 {
00097 if (PERSIST_BUFFERS_NUM == 2) {
00098 #if 0
00099 void *tmp = slot->messagePtr[0];
00100 slot->messagePtr[0] = slot->messagePtr[1];
00101 slot->messagePtr[1] = tmp;
00102 tmp = slot->recvSizePtr[0];
00103 slot->recvSizePtr[0] = slot->recvSizePtr[1];
00104 slot->recvSizePtr[1] = tmp;
00105 #else
00106 PersistentBuf tmp = slot->destBuf[0];
00107 slot->destBuf[0] = slot->destBuf[1];
00108 slot->destBuf[1] = tmp;
00109 #endif
00110 }
00111 }
00112
00113 PersistentHandle getFreeSendSlot(void)
00114 {
00115 PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
00116 initSendSlot(slot);
00117 if (CpvAccess(persistentSendsTableHead) == NULL) {
00118 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
00119 }
00120 else {
00121 CpvAccess(persistentSendsTableTail)->next = slot;
00122 slot->prev = CpvAccess(persistentSendsTableTail);
00123 CpvAccess(persistentSendsTableTail) = slot;
00124 }
00125 CpvAccess(persistentSendsTableCount)++;
00126 return slot;
00127 }
00128
00129 PersistentHandle getFreeRecvSlot(void)
00130 {
00131 PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
00132 initRecvSlot(slot);
00133 if (CpvAccess(persistentReceivesTableHead) == NULL) {
00134 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
00135 }
00136 else {
00137 CpvAccess(persistentReceivesTableTail)->next = slot;
00138 slot->prev = CpvAccess(persistentReceivesTableTail);
00139 CpvAccess(persistentReceivesTableTail) = slot;
00140 }
00141 CpvAccess(persistentReceivesTableCount)++;
00142 return slot;
00143 }
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161 PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compressStart, int type)
00162 {
00163 PersistentHandle h;
00164 PersistentSendsTable *slot;
00165
00166 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
00167
00168 h = getFreeSendSlot();
00169 slot = (PersistentSendsTable *)h;
00170
00171 slot->destPE = destPE;
00172 slot->sizeMax = ALIGN16(maxBytes);
00173 slot->addrIndex = 0;
00174 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00175 msg->maxBytes = maxBytes;
00176 msg->sourceHandler = h;
00177 msg->requestorPE = CmiMyPe();
00178 #if DELTA_COMPRESS
00179 slot->previousMsg = NULL;
00180 slot->compressStart = msg->compressStart = compressStart;
00181 slot->dataType = msg->dataType = type;
00182 slot->compressSize = 0;
00183 slot->compressFlag = 1;
00184 #endif
00185 CmiSetHandler(msg, persistentRequestHandlerIdx);
00186 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00187
00188 return h;
00189 }
00190
00191
00192 PersistentHandle CmiCreateCompressPersistentSize(int destPE, int maxBytes, int compressStart, int compressSize, int type)
00193 {
00194 PersistentHandle h;
00195 PersistentSendsTable *slot;
00196
00197 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
00198
00199 h = getFreeSendSlot();
00200 slot = (PersistentSendsTable *)h;
00201
00202 slot->destPE = destPE;
00203 slot->sizeMax = ALIGN16(maxBytes);
00204 slot->addrIndex = 0;
00205 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00206 msg->maxBytes = maxBytes;
00207 msg->sourceHandler = h;
00208 msg->requestorPE = CmiMyPe();
00209 #if DELTA_COMPRESS
00210 slot->previousMsg = NULL;
00211 slot->compressStart = msg->compressStart = compressStart;
00212 slot->compressSize = compressSize;
00213 slot->dataType = msg->dataType = type;
00214 slot->compressFlag = 1;
00215 #endif
00216 CmiSetHandler(msg, persistentRequestHandlerIdx);
00217 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00218
00219 return h;
00220 }
00221
00222 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
00223 {
00224 PersistentHandle h;
00225 PersistentSendsTable *slot;
00226
00227 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
00228
00229 h = getFreeSendSlot();
00230 slot = (PersistentSendsTable *)h;
00231
00232 slot->destPE = destPE;
00233 slot->sizeMax = ALIGN16(maxBytes);
00234 slot->addrIndex = 0;
00235 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00236 msg->maxBytes = maxBytes;
00237 msg->sourceHandler = h;
00238 msg->requestorPE = CmiMyPe();
00239
00240 #if DELTA_COMPRESS
00241 slot->compressFlag = 0;
00242 #endif
00243 CmiSetHandler(msg, persistentRequestHandlerIdx);
00244 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00245
00246 return h;
00247 }
00248
00249 #if DELTA_COMPRESS
00250 static void persistentNoDecompressHandler(void *msg)
00251 {
00252
00253 PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
00254 int size = ((CmiMsgHeaderExt*)msg)->size;
00255 slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
00256
00257 #if COPY_HISTORY
00258 memcpy(slot->history, msg, size);
00259 #endif
00260 CldRestoreHandler((char *)msg);
00261 (((CmiMsgHeaderExt*)msg)->xhdl) = (((CmiMsgHeaderExt*)msg)->xxhdl);
00262 CmiHandleMessage(msg);
00263 }
00264
00265 static void persistentDecompressHandler(void *msg)
00266 {
00267
00268 PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
00269 int historyIndex;
00270 int i;
00271 char *cmsg = (char*)msg;
00272 int size = ((CmiMsgHeaderExt*)msg)->size;
00273 int compressSize = *(int*)(msg+slot->compressStart);
00274 int originalSize = *(int*)(msg+slot->compressStart+sizeof(int));
00275
00276 char *decompressData =(char*) malloc(originalSize);
00277 #if COPY_HISTORY
00278 char *history = slot->history;
00279 #else
00280 historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
00281 slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
00282 char *history = (char*)(slot->destBuf[historyIndex].destAddress);
00283 #endif
00284
00285 int left_size = size - slot->compressStart - originalSize;
00286 char *base_dst = cmsg+size-1;
00287 char *base_src = cmsg+ size - originalSize +compressSize+sizeof(int) -1;
00288 for(i=0; i<left_size; i++)
00289 {
00290 *base_dst = *base_src;
00291 base_dst--;
00292 base_src--;
00293 }
00294
00295 if(slot->dataType == CMI_FLOATING)
00296 decompressFloatingPoint(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
00297 else if(slot->dataType == CMI_CHAR)
00298 decompressChar(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
00299 else if(slot->dataType == CMI_ZLIB)
00300 decompressZlib(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
00301 else if(slot->dataType == CMI_LZ4)
00302 decompressLz4(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
00303 memcpy(msg+slot->compressStart, decompressData, originalSize);
00304 free(decompressData);
00305 CldRestoreHandler(cmsg);
00306 (((CmiMsgHeaderExt*)msg)->xhdl) = (((CmiMsgHeaderExt*)msg)->xxhdl);
00307
00308 #if VERIFY
00309
00310 char real1 = cmsg[size - originalSize +sizeof(int)+compressSize];
00311 char real2 = cmsg[size - originalSize +sizeof(int)+compressSize+1];
00312 char checksum1 = cmsg[0];
00313 for(i=1; i< slot->compressStart; i++)
00314 checksum1 ^= cmsg[i];
00315 if(memcmp(&checksum1, &real1, 1))
00316 CmiPrintf("receiver chumsum wrong header \n");
00317 char checksum2 = cmsg[slot->compressStart];
00318 for(i=slot->compressStart+1; i< size; i++)
00319 checksum2 ^= cmsg[i];
00320 if(memcmp(&checksum2, &real2, 1))
00321 CmiPrintf("receiver chumsum wrong data \n");
00322
00323 #endif
00324 #if COPY_HISTORY
00325 memcpy(slot->history, msg, size);
00326 #endif
00327 CmiHandleMessage(msg);
00328 }
00329
00330 #if 0
00331 int CompressPersistentMsg(PersistentHandle h, int size, void **m)
00332 {
00333 void *msg = *m;
00334 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00335 int newSize;
00336 void *history = slot->previousMsg;
00337 void *dest=NULL;
00338 int compressSize=size;
00339 if(history == NULL)
00340 {
00341 newSize = size;
00342 slot->previousMsg = msg;
00343 CmiReference(msg);
00344 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00345 CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
00346 }else
00347 {
00348 if(slot->compressSize == 0)
00349 {
00350 slot->compressSize = size - slot->compressStart;
00351 }
00352 if(slot->compressSize>100)
00353 {
00354 dest = CmiAlloc(size);
00355 compressChar((char*)msg+slot->compressStart, (char*)dest+slot->compressStart+sizeof(int), slot->compressSize, &compressSize, (char*)history+slot->compressStart);
00356 }
00357
00358 CmiFree(history);
00359 history = msg;
00360 CmiReference(msg);
00361 if(slot->compressSize-compressSize <= 100)
00362 {
00363 newSize = size;
00364 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00365 CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
00366 if(dest != NULL)
00367 CmiFree(dest);
00368 }else
00369 {
00370
00371 memcpy(dest, msg, slot->compressStart);
00372
00373 *(int*)(dest+slot->compressStart) = compressSize;
00374
00375 int leftSize = size - slot->compressStart - slot->compressSize;
00376 if(leftSize > 0)
00377 memcpy((char*)dest+slot->compressStart+sizeof(int)+compressSize, msg+slot->compressStart+slot->compressSize, leftSize);
00378 newSize = size-slot->compressSize+compressSize+sizeof(int);
00379 (((CmiMsgHeaderExt*)dest)->xxhdl) = (((CmiMsgHeaderExt*)dest)->xhdl);
00380 CldSwitchHandler((char *)dest, persistentDecompressHandlerIdx);
00381 CmiPrintf(" handler =(%d : %d : %d) (%d:%d:%d) %d\n", (((CmiMsgHeaderExt*)dest)->hdl), (((CmiMsgHeaderExt*)dest)->xhdl), (((CmiMsgHeaderExt*)dest)->xxhdl), (((CmiMsgHeaderExt*)msg)->hdl), (((CmiMsgHeaderExt*)msg)->xhdl), (((CmiMsgHeaderExt*)msg)->xxhdl), persistentDecompressHandlerIdx);
00382 *m=dest;
00383 }
00384
00385 }
00386 ((CmiMsgHeaderExt*)*m)-> persistRecvHandler = slot->destDataHandle;
00387 ((CmiMsgHeaderExt*)*m)->size = size;
00388 return newSize;
00389 }
00390
00391 #else
00392 int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
00393 {
00394 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00395 int newSize;
00396 void *history = slot->previousMsg;
00397 void *dest=NULL;
00398 int compressSize=size;
00399 int i;
00400 char *cmsg = (char*)msg;
00401
00402
00403 ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
00404 ((CmiMsgHeaderExt*)msg)->size = size;
00405
00406 if(history == NULL)
00407 {
00408 newSize = size;
00409 slot->previousMsg = msg;
00410 slot->previousSize = size;
00411 CmiReference(msg);
00412 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00413 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
00414 }else if(size != slot->previousSize)
00415 {
00416 newSize = size;
00417 CmiFree(slot->previousMsg);
00418 slot->previousMsg = msg;
00419 if(slot->compressSize == slot->previousSize - slot->compressStart)
00420 slot->compressSize = size - slot->compressStart;
00421 slot->previousSize = size;
00422 CmiReference(msg);
00423 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00424 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
00425 }
00426 else {
00427
00428 if(slot->compressSize == 0) {slot->compressSize = size-slot->compressStart; }
00429 #if VERIFY
00430 char checksum1;
00431 char checksum2;
00432 void *history_save = CmiAlloc(size);
00433 memcpy(history_save, history, size);
00434 checksum1 = cmsg[0];
00435 for(i=1; i< slot->compressStart; i++)
00436 checksum1 ^= cmsg[i];
00437 checksum2 = cmsg[slot->compressStart];
00438 for(i=slot->compressStart+1; i< size; i++)
00439 checksum2 ^= cmsg[i];
00440 #endif
00441
00442 #if EXTERNAL_COMPRESS
00443 int maxSize = (slot->compressSize+40)>LZ4_compressBound(slot->compressSize) ? slot->compressSize+40 : LZ4_compressBound(slot->compressSize);
00444 #else
00445 int maxSize = slot->compressSize;
00446 #endif
00447 dest = malloc(maxSize);
00448 if(slot->dataType == CMI_FLOATING)
00449 compressFloatingPoint(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
00450 else if(slot->dataType == CMI_CHAR)
00451 compressChar(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
00452 else if(slot->dataType == CMI_ZLIB)
00453 compressZlib(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
00454 else if(slot->dataType == CMI_LZ4)
00455 compressLz4(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
00456
00457 #if VERIFY
00458 void *recover = malloc(slot->compressSize);
00459 decompressChar(dest, recover, slot->compressSize, compressSize, history_save+slot->compressStart);
00460 if(memcmp(msg+slot->compressStart, recover, slot->compressSize))
00461 CmiPrintf("sth wrong with compression\n");
00462 #endif
00463 if(slot->compressSize - compressSize <= 100)
00464 {
00465 newSize = size;
00466 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00467 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
00468 CmiFree(slot->previousMsg);
00469 slot->previousMsg = msg;
00470 CmiReference(msg);
00471 }else
00472 {
00473 memcpy(history+slot->compressStart, msg+slot->compressStart, slot->compressSize);
00474 *(int*)(msg+slot->compressStart) = compressSize;
00475 *(int*)(msg+slot->compressStart+sizeof(int)) = slot->compressSize;
00476 memcpy(msg+slot->compressStart+2*sizeof(int), dest, compressSize);
00477 int leftSize = size-slot->compressStart-slot->compressSize;
00478
00479 if(leftSize > 0)
00480 memcpy(msg+slot->compressStart+compressSize+2*sizeof(int), msg+slot->compressStart+slot->compressSize, leftSize);
00481 newSize = slot->compressStart + compressSize + 2*sizeof(int) +leftSize;
00482 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
00483 CldSwitchHandler(cmsg, persistentDecompressHandlerIdx);
00484 #if VERIFY
00485 memcpy(msg+newSize, &checksum1, 1);
00486 memcpy(msg+newSize+1, &checksum2, 1);
00487 char *orig = CmiAlloc(size);
00488 memcpy(orig, msg, newSize);
00489
00490 char *decompressData =(char*) malloc(slot->compressSize);
00491 int left_size = size - slot->compressStart - slot->compressSize;
00492 char *base_dst = orig+size-1;
00493 char *base_src = orig + size - slot->compressSize +compressSize+2*sizeof(int) -1;
00494 for(i=0; i<left_size; i++)
00495 {
00496 *base_dst = *base_src;
00497 base_dst--;
00498 base_src--;
00499 }
00500
00501 decompressChar(orig+slot->compressStart+2*sizeof(int), decompressData, slot->compressSize, compressSize, history_save+slot->compressStart);
00502 memcpy(orig+slot->compressStart, decompressData, slot->compressSize);
00503 free(decompressData);
00504 CldRestoreHandler(orig);
00505 (((CmiMsgHeaderExt*)orig)->xhdl) = (((CmiMsgHeaderExt*)orig)->xxhdl);
00506 if(memcmp(orig, history, slot->compressStart))
00507 CmiPrintf("sth wrong header all \n");
00508 if(memcmp(orig+slot->compressStart, history+slot->compressStart, slot->compressSize))
00509 CmiPrintf("sth wrong data \n");
00510 newSize += 2;
00511 #endif
00512 }
00513 free(dest);
00514 }
00515
00516 return newSize;
00517 }
00518
00519 #endif
00520 #else
00521 #endif
00522
00523
00524 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
00525 {
00526
00527
00528 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
00529 return CmiCreatePersistent(pe, maxBytes);
00530 }
00531 PersistentHandle CmiCreateCompressNodePersistent(int destNode, int maxBytes, int start, int type)
00532 {
00533
00534
00535 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
00536 return CmiCreateCompressPersistent(pe, maxBytes, start, type);
00537 }
00538
00539 PersistentHandle CmiCreateCompressNodePersistentSize(int destNode, int maxBytes, int start, int compressSize, int type)
00540 {
00541
00542
00543 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
00544 return CmiCreateCompressPersistentSize(pe, maxBytes, start, compressSize, type);
00545 }
00546
00547
00548 static void persistentRequestHandler(void *env)
00549 {
00550 PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
00551 char *buf;
00552 int i;
00553
00554 PersistentHandle h = getFreeRecvSlot();
00555 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00556
00557
00558 auto gmsg = (PersistentReqGrantedMsg *)CmiAlloc(sizeof(PersistentReqGrantedMsg));
00559
00560 #if DELTA_COMPRESS
00561 slot->compressStart = msg->compressStart;
00562 slot->dataType = msg->dataType;
00563 #if COPY_HISTORY
00564 slot->history = malloc(msg->maxBytes);
00565 #endif
00566 #endif
00567 setupRecvSlot(slot, msg->maxBytes);
00568
00569 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00570 #if 0
00571 gmsg->msgAddr[i] = slot->messagePtr[i];
00572 gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
00573 #else
00574 gmsg->buf[i] = slot->destBuf[i];
00575 #endif
00576 }
00577
00578 gmsg->sourceHandler = msg->sourceHandler;
00579 gmsg->destHandler = getPersistentHandle(h, 1);
00580 #if DELTA_COMPRESS
00581 gmsg->destDataHandler = h;
00582
00583 #endif
00584 CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
00585 CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
00586
00587 CmiFree(msg);
00588 }
00589
00590 static void persistentReqGrantedHandler(void *env)
00591 {
00592 int i;
00593
00594 PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
00595 PersistentHandle h = msg->sourceHandler;
00596 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00597
00598
00599
00600 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00601 #if 0
00602 slot->destAddress[i] = msg->msgAddr[i];
00603 slot->destSizeAddress[i] = msg->slotFlagAddress[i];
00604 #else
00605 slot->destBuf[i] = msg->buf[i];
00606 #endif
00607 }
00608 slot->destHandle = msg->destHandler;
00609 #if DELTA_COMPRESS
00610 slot->destDataHandle = msg->destDataHandler;
00611
00612 #endif
00613 if (slot->messageBuf) {
00614 LrtsSendPersistentMsg(h, CmiGetNodeGlobal(CmiNodeOf(slot->destPE),CmiMyPartition()), slot->messageSize, slot->messageBuf);
00615 slot->messageBuf = NULL;
00616 }
00617 CmiFree(msg);
00618 }
00619
00620
00621
00622
00623
00624 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
00625 {
00626 PersistentReq ret;
00627 int i;
00628
00629 PersistentHandle h = getFreeRecvSlot();
00630 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00631
00632 setupRecvSlot(slot, maxBytes);
00633
00634 ret.pe = CmiMyPe();
00635 ret.maxBytes = maxBytes;
00636 ret.myHand = h;
00637 ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
00638 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00639 #if 0
00640 ret.messagePtr[i] = slot->messagePtr[i];
00641 ret.recvSizePtr[i] = slot->recvSizePtr[i];
00642 #else
00643 ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
00644 memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
00645 #endif
00646 }
00647
00648 return ret;
00649 }
00650
00651 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
00652 {
00653 int i;
00654 PersistentHandle h = getFreeSendSlot();
00655
00656 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00657 slot->destPE = recvHand.pe;
00658 slot->sizeMax = recvHand.maxBytes;
00659
00660 #if 0
00661 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00662 slot->destAddress[i] = recvHand.messagePtr[i];
00663 slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
00664 }
00665 #else
00666 memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
00667 #endif
00668 slot->destHandle = recvHand.myHand;
00669 return h;
00670 }
00671
00672
00673
00674
00675
00676
00677 void persistentDestroyHandler(void *env)
00678 {
00679 int i;
00680 PersistentDestroyMsg *msg = (PersistentDestroyMsg *)env;
00681 PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
00682 CmiAssert(h!=NULL);
00683 CmiFree(msg);
00684 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00685
00686 CpvAccess(persistentReceivesTableCount) --;
00687 if (slot->prev) {
00688 slot->prev->next = slot->next;
00689 }
00690 else
00691 CpvAccess(persistentReceivesTableHead) = slot->next;
00692 if (slot->next) {
00693 slot->next->prev = slot->prev;
00694 }
00695 else
00696 CpvAccess(persistentReceivesTableTail) = slot->prev;
00697
00698 for (i=0; i<PERSIST_BUFFERS_NUM; i++)
00699 if (slot->destBuf[i].destAddress)
00700 PerFree((char*)slot->destBuf[i].destAddress);
00701
00702 clearRecvSlot(slot);
00703
00704 free(slot);
00705 }
00706
00707
00708 void CmiDestroyPersistent(PersistentHandle h)
00709 {
00710 if (h == NULL) return;
00711
00712 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00713
00714
00715 PersistentDestroyMsg *msg = (PersistentDestroyMsg *)
00716 CmiAlloc(sizeof(PersistentDestroyMsg));
00717 msg->destHandlerIndex = slot->destHandle;
00718
00719 CmiSetHandler(msg, persistentDestroyHandlerIdx);
00720 CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestroyMsg),msg);
00721
00722
00723 if (slot->prev) {
00724 slot->prev->next = slot->next;
00725 }
00726 else
00727 CpvAccess(persistentSendsTableHead) = slot->next;
00728 if (slot->next) {
00729 slot->next->prev = slot->prev;
00730 }
00731 else
00732 CpvAccess(persistentSendsTableTail) = slot->prev;
00733 free(slot);
00734
00735 CpvAccess(persistentSendsTableCount) --;
00736 }
00737
00738
00739 void CmiDestroyAllPersistent(void)
00740 {
00741 PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
00742 while (sendslot) {
00743 PersistentSendsTable *next = sendslot->next;
00744 free(sendslot);
00745 sendslot = next;
00746 }
00747 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
00748 CpvAccess(persistentSendsTableCount) = 0;
00749
00750 PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
00751 while (slot) {
00752 PersistentReceivesTable *next = slot->next;
00753 int i;
00754 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00755
00756
00757 if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
00758 }
00759 free(slot);
00760 slot = next;
00761 }
00762 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
00763 CpvAccess(persistentReceivesTableCount) = 0;
00764 }
00765
00766
00767 void CmiPersistentInit(void)
00768 {
00769 int i;
00770
00771 persistentRequestHandlerIdx =
00772 CmiRegisterHandler((CmiHandler)persistentRequestHandler);
00773 persistentReqGrantedHandlerIdx =
00774 CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
00775 persistentDestroyHandlerIdx =
00776 CmiRegisterHandler((CmiHandler)persistentDestroyHandler);
00777
00778 #if DELTA_COMPRESS
00779 persistentDecompressHandlerIdx =
00780 CmiRegisterHandler((CmiHandler)persistentDecompressHandler);
00781 persistentNoDecompressHandlerIdx =
00782 CmiRegisterHandler((CmiHandler)persistentNoDecompressHandler);
00783 #endif
00784
00785 CpvInitialize(PersistentHandle*, phs);
00786 CpvAccess(phs) = NULL;
00787 CpvInitialize(int, phsSize);
00788 CpvInitialize(int, curphs);
00789 CpvAccess(curphs) = 0;
00790
00791 extern void persist_machine_init();
00792 persist_machine_init();
00793
00794 CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
00795 CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
00796 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
00797 CpvInitialize(int, persistentSendsTableCount);
00798 CpvAccess(persistentSendsTableCount) = 0;
00799
00800 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
00801 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
00802 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
00803 CpvInitialize(int, persistentReceivesTableCount);
00804 CpvAccess(persistentReceivesTableCount) = 0;
00805 }
00806
00807 void CmiUsePersistentHandle(PersistentHandle *p, int n)
00808 {
00809 if (n==1 && *p == NULL) { p = NULL; n = 0; }
00810 #if CMK_ERROR_CHECKING && 0
00811 {
00812 int i;
00813 for (i=0; i<n; i++)
00814 if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
00815 }
00816 #endif
00817 CpvAccess(phs) = p;
00818 CpvAccess(phsSize) = n;
00819 CpvAccess(curphs) = 0;
00820 }
00821
00822 void CmiPersistentOneSend(void)
00823 {
00824 if (CpvAccess(phs)) CpvAccess(curphs)++;
00825 }
00826
00827 #endif
00828