00001
00010
00011 #include "converse.h"
00012
00013 #if CMK_PERSISTENT_COMM
00014
00015 #include "machine-persistent.h"
00016
00017 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
00018 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
00019 CpvDeclare(int, persistentSendsTableCount);
00020 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
00021 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
00022 CpvDeclare(int, persistentReceivesTableCount);
00023
00024
00025 typedef struct _PersistentRequestMsg {
00026 char core[CmiMsgHeaderSizeBytes];
00027 int requestorPE;
00028 int maxBytes;
00029 PersistentHandle sourceHandler;
00030 } PersistentRequestMsg;
00031
00032 typedef struct _PersistentReqGrantedMsg {
00033 char core[CmiMsgHeaderSizeBytes];
00034
00035
00036
00037
00038 PersistentBuf buf[PERSIST_BUFFERS_NUM];
00039 PersistentHandle sourceHandler;
00040 PersistentHandle destHandler;
00041 } PersistentReqGrantedMsg;
00042
00043 typedef struct _PersistentDestoryMsg {
00044 char core[CmiMsgHeaderSizeBytes];
00045 PersistentHandle destHandlerIndex;
00046 } PersistentDestoryMsg;
00047
00048
00049 int persistentRequestHandlerIdx;
00050 int persistentReqGrantedHandlerIdx;
00051 int persistentDestoryHandlerIdx;
00052
00053 CpvDeclare(PersistentHandle *, phs);
00054 CpvDeclare(int, phsSize);
00055 CpvDeclare(int, curphs);
00056
00057
00058
00059
00060
00061 extern void initRecvSlot(PersistentReceivesTable *slot);
00062 extern void initSendSlot(PersistentSendsTable *slot);
00063
00064 void swapSendSlotBuffers(PersistentSendsTable *slot)
00065 {
00066 if (PERSIST_BUFFERS_NUM == 2) {
00067 #if 0
00068 void *tmp = slot->destAddress[0];
00069 slot->destAddress[0] = slot->destAddress[1];
00070 slot->destAddress[1] = tmp;
00071 tmp = slot->destSizeAddress[0];
00072 slot->destSizeAddress[0] = slot->destSizeAddress[1];
00073 slot->destSizeAddress[1] = tmp;
00074 #else
00075 PersistentBuf tmp = slot->destBuf[0];
00076 slot->destBuf[0] = slot->destBuf[1];
00077 slot->destBuf[1] = tmp;
00078 #endif
00079 }
00080 }
00081
00082 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
00083 {
00084 if (PERSIST_BUFFERS_NUM == 2) {
00085 #if 0
00086 void *tmp = slot->messagePtr[0];
00087 slot->messagePtr[0] = slot->messagePtr[1];
00088 slot->messagePtr[1] = tmp;
00089 tmp = slot->recvSizePtr[0];
00090 slot->recvSizePtr[0] = slot->recvSizePtr[1];
00091 slot->recvSizePtr[1] = tmp;
00092 #else
00093 PersistentBuf tmp = slot->destBuf[0];
00094 slot->destBuf[0] = slot->destBuf[1];
00095 slot->destBuf[1] = tmp;
00096 #endif
00097 }
00098 }
00099
00100 PersistentHandle getFreeSendSlot()
00101 {
00102 PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
00103 initSendSlot(slot);
00104 if (CpvAccess(persistentSendsTableHead) == NULL) {
00105 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
00106 }
00107 else {
00108 CpvAccess(persistentSendsTableTail)->next = slot;
00109 slot->prev = CpvAccess(persistentSendsTableTail);
00110 CpvAccess(persistentSendsTableTail) = slot;
00111 }
00112 CpvAccess(persistentSendsTableCount)++;
00113 return slot;
00114 }
00115
00116 PersistentHandle getFreeRecvSlot()
00117 {
00118 PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
00119 initRecvSlot(slot);
00120 if (CpvAccess(persistentReceivesTableHead) == NULL) {
00121 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
00122 }
00123 else {
00124 CpvAccess(persistentReceivesTableTail)->next = slot;
00125 slot->prev = CpvAccess(persistentReceivesTableTail);
00126 CpvAccess(persistentReceivesTableTail) = slot;
00127 }
00128 CpvAccess(persistentReceivesTableCount)++;
00129 return slot;
00130 }
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
00150 {
00151 PersistentHandle h;
00152 PersistentSendsTable *slot;
00153
00154 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
00155
00156
00157
00158
00159
00160
00161
00162
00163 h = getFreeSendSlot();
00164 slot = (PersistentSendsTable *)h;
00165
00166 slot->destPE = destPE;
00167 slot->sizeMax = maxBytes;
00168
00169 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00170 msg->maxBytes = maxBytes;
00171 msg->sourceHandler = h;
00172 msg->requestorPE = CmiMyPe();
00173
00174 CmiSetHandler(msg, persistentRequestHandlerIdx);
00175 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00176
00177 return h;
00178 }
00179
00180
00181 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
00182 {
00183
00184
00185 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
00186 return CmiCreatePersistent(pe, maxBytes);
00187 }
00188
00189 static void persistentRequestHandler(void *env)
00190 {
00191 PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
00192 char *buf;
00193 int i;
00194
00195 PersistentHandle h = getFreeRecvSlot();
00196 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00197
00198
00199
00200 PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
00201
00202 setupRecvSlot(slot, msg->maxBytes);
00203
00204 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00205 #if 0
00206 gmsg->msgAddr[i] = slot->messagePtr[i];
00207 gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
00208 #else
00209 gmsg->buf[i] = slot->destBuf[i];
00210 #endif
00211 }
00212
00213 gmsg->sourceHandler = msg->sourceHandler;
00214 gmsg->destHandler = getPersistentHandle(h, 1);
00215
00216 CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
00217 CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
00218
00219 CmiFree(msg);
00220 }
00221
00222 static void persistentReqGrantedHandler(void *env)
00223 {
00224 int i;
00225
00226 PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
00227 PersistentHandle h = msg->sourceHandler;
00228 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00229
00230
00231
00232 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00233 #if 0
00234 slot->destAddress[i] = msg->msgAddr[i];
00235 slot->destSizeAddress[i] = msg->slotFlagAddress[i];
00236 #else
00237 slot->destBuf[i] = msg->buf[i];
00238 #endif
00239 }
00240 slot->destHandle = msg->destHandler;
00241
00242 if (slot->messageBuf) {
00243 LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
00244 slot->messageBuf = NULL;
00245 }
00246 CmiFree(msg);
00247 }
00248
00249
00250
00251
00252
00253 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
00254 {
00255 PersistentReq ret;
00256 int i;
00257
00258 PersistentHandle h = getFreeRecvSlot();
00259 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00260
00261 setupRecvSlot(slot, maxBytes);
00262
00263 ret.pe = CmiMyPe();
00264 ret.maxBytes = maxBytes;
00265 ret.myHand = h;
00266 ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
00267 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00268 #if 0
00269 ret.messagePtr[i] = slot->messagePtr[i];
00270 ret.recvSizePtr[i] = slot->recvSizePtr[i];
00271 #else
00272 ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
00273 memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
00274 #endif
00275 }
00276
00277 return ret;
00278 }
00279
00280 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
00281 {
00282 int i;
00283 PersistentHandle h = getFreeSendSlot();
00284
00285 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00286 slot->destPE = recvHand.pe;
00287 slot->sizeMax = recvHand.maxBytes;
00288
00289 #if 0
00290 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00291 slot->destAddress[i] = recvHand.messagePtr[i];
00292 slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
00293 }
00294 #else
00295 memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
00296 #endif
00297 slot->destHandle = recvHand.myHand;
00298 return h;
00299 }
00300
00301
00302
00303
00304
00305
00306 void persistentDestoryHandler(void *env)
00307 {
00308 int i;
00309 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
00310 PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
00311 CmiAssert(h!=NULL);
00312 CmiFree(msg);
00313 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00314
00315 CpvAccess(persistentReceivesTableCount) --;
00316 if (slot->prev) {
00317 slot->prev->next = slot->next;
00318 }
00319 else
00320 CpvAccess(persistentReceivesTableHead) = slot->next;
00321 if (slot->next) {
00322 slot->next->prev = slot->prev;
00323 }
00324 else
00325 CpvAccess(persistentReceivesTableTail) = slot->prev;
00326
00327 for (i=0; i<PERSIST_BUFFERS_NUM; i++)
00328 if (slot->destBuf[i].destAddress)
00329 PerFree((char*)slot->destBuf[i].destAddress);
00330
00331 clearRecvSlot(slot);
00332
00333 free(slot);
00334 }
00335
00336
00337 void CmiDestoryPersistent(PersistentHandle h)
00338 {
00339 if (h == NULL) return;
00340
00341 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00342
00343
00344 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
00345 CmiAlloc(sizeof(PersistentDestoryMsg));
00346 msg->destHandlerIndex = slot->destHandle;
00347
00348 CmiSetHandler(msg, persistentDestoryHandlerIdx);
00349 CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
00350
00351
00352 if (slot->prev) {
00353 slot->prev->next = slot->next;
00354 }
00355 else
00356 CpvAccess(persistentSendsTableHead) = slot->next;
00357 if (slot->next) {
00358 slot->next->prev = slot->prev;
00359 }
00360 else
00361 CpvAccess(persistentSendsTableTail) = slot->prev;
00362 free(slot);
00363
00364 CpvAccess(persistentSendsTableCount) --;
00365 }
00366
00367
00368 void CmiDestoryAllPersistent()
00369 {
00370 PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
00371 while (sendslot) {
00372 PersistentSendsTable *next = sendslot->next;
00373 free(sendslot);
00374 sendslot = next;
00375 }
00376 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
00377 CpvAccess(persistentSendsTableCount) = 0;
00378
00379 PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
00380 while (slot) {
00381 PersistentReceivesTable *next = slot->next;
00382 int i;
00383 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00384 if (slot->destBuf[i].destSizeAddress)
00385 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
00386 if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
00387 }
00388 free(slot);
00389 slot = next;
00390 }
00391 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
00392 CpvAccess(persistentReceivesTableCount) = 0;
00393 }
00394
00395 void CmiPersistentInit()
00396 {
00397 int i;
00398
00399 persistentRequestHandlerIdx =
00400 CmiRegisterHandler((CmiHandler)persistentRequestHandler);
00401 persistentReqGrantedHandlerIdx =
00402 CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
00403 persistentDestoryHandlerIdx =
00404 CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
00405
00406
00407 CpvInitialize(PersistentHandle*, phs);
00408 CpvAccess(phs) = NULL;
00409 CpvInitialize(int, phsSize);
00410 CpvInitialize(int, curphs);
00411 CpvAccess(curphs) = 0;
00412
00413 persist_machine_init();
00414
00415 CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
00416 CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
00417 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
00418 CpvInitialize(int, persistentSendsTableCount);
00419 CpvAccess(persistentSendsTableCount) = 0;
00420
00421 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
00422 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
00423 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
00424 CpvInitialize(int, persistentReceivesTableCount);
00425 CpvAccess(persistentReceivesTableCount) = 0;
00426 }
00427
00428
00429 void CmiUsePersistentHandle(PersistentHandle *p, int n)
00430 {
00431 if (n==1 && *p == NULL) { p = NULL; n = 0; }
00432 #if CMK_ERROR_CHECKING && 0
00433 {
00434 int i;
00435 for (i=0; i<n; i++)
00436 if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
00437 }
00438 #endif
00439 CpvAccess(phs) = p;
00440 CpvAccess(phsSize) = n;
00441 CpvAccess(curphs) = 0;
00442 }
00443
00444 void CmiPersistentOneSend()
00445 {
00446 if (CpvAccess(phs)) CpvAccess(curphs)++;
00447 }
00448
00449 #endif
00450