00001
00010
00011 #include "converse.h"
00012
00013 #if CMK_PERSISTENT_COMM
00014
00015 #include "persist_impl.h"
00016
00017 #define TABLESIZE 512
00018 PersistentSendsTable persistentSendsTable[TABLESIZE];
00019 int persistentSendsTableCount = 0;
00020 PersistentReceivesTable *persistentReceivesTableHead;
00021 PersistentReceivesTable *persistentReceivesTableTail;
00022 int persistentReceivesTableCount = 0;
00023
00024
00025 typedef struct _PersistentRequestMsg {
00026 char core[CmiMsgHeaderSizeBytes];
00027 int requestorPE;
00028 int maxBytes;
00029 PersistentHandle sourceHandlerIndex;
00030 } PersistentRequestMsg;
00031
00032 typedef struct _PersistentReqGrantedMsg {
00033 char core[CmiMsgHeaderSizeBytes];
00034 void *msgAddr[PERSIST_BUFFERS_NUM];
00035 void *slotFlagAddress[PERSIST_BUFFERS_NUM];
00036 PersistentHandle sourceHandlerIndex;
00037 PersistentHandle destHandlerIndex;
00038 } PersistentReqGrantedMsg;
00039
00040 typedef struct _PersistentDestoryMsg {
00041 char core[CmiMsgHeaderSizeBytes];
00042 PersistentHandle destHandlerIndex;
00043 } PersistentDestoryMsg;
00044
00045
00046 int persistentRequestHandlerIdx;
00047 int persistentReqGrantedHandlerIdx;
00048 int persistentDestoryHandlerIdx;
00049
00050 PersistentHandle *phs = NULL;
00051 int phsSize;
00052
00053
00054
00055
00056
00057 void initSendSlot(PersistentSendsTable *slot)
00058 {
00059 int i;
00060 slot->used = 0;
00061 slot->destPE = -1;
00062 slot->sizeMax = 0;
00063 slot->destHandle = 0;
00064 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00065 slot->destAddress[i] = NULL;
00066 slot->destSizeAddress[i] = NULL;
00067 }
00068 slot->messageBuf = 0;
00069 slot->messageSize = 0;
00070 }
00071
00072 void swapSendSlotBuffers(PersistentSendsTable *slot)
00073 {
00074 if (PERSIST_BUFFERS_NUM == 2) {
00075 void *tmp = slot->destAddress[0];
00076 slot->destAddress[0] = slot->destAddress[1];
00077 slot->destAddress[1] = tmp;
00078 tmp = slot->destSizeAddress[0];
00079 slot->destSizeAddress[0] = slot->destSizeAddress[1];
00080 slot->destSizeAddress[1] = tmp;
00081 }
00082 }
00083
00084 void initRecvSlot(PersistentReceivesTable *slot)
00085 {
00086 int i;
00087 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00088 slot->messagePtr[i] = NULL;
00089 slot->recvSizePtr[i] = NULL;
00090 }
00091 slot->sizeMax = 0;
00092 slot->prev = slot->next = NULL;
00093 }
00094
00095 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
00096 {
00097 if (PERSIST_BUFFERS_NUM == 2) {
00098 void *tmp = slot->messagePtr[0];
00099 slot->messagePtr[0] = slot->messagePtr[1];
00100 slot->messagePtr[1] = tmp;
00101 tmp = slot->recvSizePtr[0];
00102 slot->recvSizePtr[0] = slot->recvSizePtr[1];
00103 slot->recvSizePtr[1] = tmp;
00104 }
00105 }
00106
00107 PersistentHandle getFreeSendSlot()
00108 {
00109 int i;
00110 if (persistentSendsTableCount == TABLESIZE) CmiAbort("persistentSendsTable full.\n");
00111 persistentSendsTableCount++;
00112 for (i=1; i<TABLESIZE; i++)
00113 if (persistentSendsTable[i].used == 0) break;
00114 return &persistentSendsTable[i];
00115 }
00116
00117 PersistentHandle getFreeRecvSlot()
00118 {
00119 PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
00120 initRecvSlot(slot);
00121 if (persistentReceivesTableHead == NULL) {
00122 persistentReceivesTableHead = persistentReceivesTableTail = slot;
00123 }
00124 else {
00125 persistentReceivesTableTail->next = slot;
00126 slot->prev = persistentReceivesTableTail;
00127 persistentReceivesTableTail = slot;
00128 }
00129 persistentReceivesTableCount++;
00130 return slot;
00131 }
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
00151 {
00152 PersistentHandle h = getFreeSendSlot();
00153
00154 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00155 slot->used = 1;
00156 slot->destPE = destPE;
00157 slot->sizeMax = maxBytes;
00158
00159 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00160 msg->maxBytes = maxBytes;
00161 msg->sourceHandlerIndex = h;
00162 msg->requestorPE = CmiMyPe();
00163
00164 CmiSetHandler(msg, persistentRequestHandlerIdx);
00165 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00166
00167 return h;
00168 }
00169
00170 static void persistentRequestHandler(void *env)
00171 {
00172 PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
00173 char *buf;
00174 int i;
00175
00176 PersistentHandle h = getFreeRecvSlot();
00177 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00178
00179
00180
00181 PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
00182
00183 setupRecvSlot(slot, msg->maxBytes);
00184
00185 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00186 gmsg->msgAddr[i] = slot->messagePtr[i];
00187 gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
00188 }
00189
00190 gmsg->sourceHandlerIndex = msg->sourceHandlerIndex;
00191 gmsg->destHandlerIndex = h;
00192
00193 CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
00194 CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
00195
00196 CmiFree(msg);
00197 }
00198
00199 static void persistentReqGrantedHandler(void *env)
00200 {
00201 int i;
00202
00203 PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
00204 PersistentHandle h = msg->sourceHandlerIndex;
00205 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00206 CmiAssert(slot->used == 1);
00207 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00208 slot->destAddress[i] = msg->msgAddr[i];
00209 slot->destSizeAddress[i] = msg->slotFlagAddress[i];
00210 }
00211 slot->destHandle = msg->destHandlerIndex;
00212
00213 if (slot->messageBuf) {
00214 CmiSendPersistentMsg(h, slot->destPE, slot->messageSize, slot->messageBuf);
00215 slot->messageBuf = NULL;
00216 }
00217 CmiFree(msg);
00218 }
00219
00220
00221
00222
00223
00224 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
00225 {
00226 PersistentReq ret;
00227 int i;
00228
00229 PersistentHandle h = getFreeRecvSlot();
00230 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00231
00232 setupRecvSlot(slot, maxBytes);
00233
00234 ret.pe = CmiMyPe();
00235 ret.maxBytes = maxBytes;
00236 ret.myHand = h;
00237 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00238 ret.messagePtr[i] = slot->messagePtr[i];
00239 ret.recvSizePtr[i] = slot->recvSizePtr[i];
00240 }
00241
00242 return ret;
00243 }
00244
00245 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
00246 {
00247 int i;
00248 PersistentHandle h = getFreeSendSlot();
00249
00250 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00251 slot->used = 1;
00252 slot->destPE = recvHand.pe;
00253 slot->sizeMax = recvHand.maxBytes;
00254
00255 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00256 slot->destAddress[i] = recvHand.messagePtr[i];
00257 slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
00258 }
00259 slot->destHandle = recvHand.myHand;
00260 return h;
00261 }
00262
00263
00264
00265
00266
00267
00268 void persistentDestoryHandler(void *env)
00269 {
00270 int i;
00271 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
00272 PersistentHandle h = msg->destHandlerIndex;
00273 CmiAssert(h!=NULL);
00274 CmiFree(msg);
00275 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00276
00277 persistentReceivesTableCount --;
00278 if (slot->prev) {
00279 slot->prev->next = slot->next;
00280 }
00281 else
00282 persistentReceivesTableHead = slot->next;
00283 if (slot->next) {
00284 slot->next->prev = slot->prev;
00285 }
00286 else
00287 persistentReceivesTableTail = slot->prev;
00288
00289 for (i=0; i<PERSIST_BUFFERS_NUM; i++)
00290 if (slot->messagePtr[i])
00291 PerFree((char*)slot->messagePtr[i]);
00292
00293 CmiFree(slot);
00294 }
00295
00296
00297 void CmiDestoryPersistent(PersistentHandle h)
00298 {
00299 if (h == 0) CmiAbort("CmiDestoryPersistent: not a valid PersistentHandle\n");
00300
00301 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00302 CmiAssert(slot->destHandle != 0);
00303
00304 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
00305 CmiAlloc(sizeof(PersistentDestoryMsg));
00306 msg->destHandlerIndex = slot->destHandle;
00307
00308 CmiSetHandler(msg, persistentDestoryHandlerIdx);
00309 CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
00310
00311
00312 initSendSlot(slot);
00313
00314 persistentSendsTableCount --;
00315 }
00316
00317
00318 void CmiDestoryAllPersistent()
00319 {
00320 int i;
00321 for (i=0; i<TABLESIZE; i++) {
00322 if (persistentSendsTable[i].messageBuf)
00323 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
00324 initSendSlot(&persistentSendsTable[i]);
00325 }
00326 persistentSendsTableCount = 0;
00327
00328 PersistentReceivesTable *slot = persistentReceivesTableHead;
00329 while (slot) {
00330 PersistentReceivesTable *next = slot->next;
00331 int i;
00332 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00333 if (slot->recvSizePtr[i])
00334 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
00335 if (slot->messagePtr[i]) PerFree((char*)slot->messagePtr[i]);
00336 }
00337 CmiFree(slot);
00338 slot = next;
00339 }
00340 persistentReceivesTableHead = persistentReceivesTableTail = NULL;
00341 persistentReceivesTableCount = 0;
00342 }
00343
00344 void CmiPersistentInit()
00345 {
00346 int i;
00347 persistentRequestHandlerIdx =
00348 CmiRegisterHandler((CmiHandler)persistentRequestHandler);
00349 persistentReqGrantedHandlerIdx =
00350 CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
00351 persistentDestoryHandlerIdx =
00352 CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
00353
00354 persist_machine_init();
00355
00356 for (i=0; i<TABLESIZE; i++) {
00357 initSendSlot(&persistentSendsTable[i]);
00358 }
00359 persistentSendsTableCount = 0;
00360 persistentReceivesTableHead = persistentReceivesTableTail = NULL;
00361 persistentReceivesTableCount = 0;
00362 }
00363
00364
00365 void CmiUsePersistentHandle(PersistentHandle *p, int n)
00366 {
00367 #ifndef CMK_OPTIMIZE
00368 int i;
00369 for (i=0; i<n; i++)
00370 if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
00371 #endif
00372 phs = p;
00373 phsSize = n;
00374 }
00375
00376 #endif
00377