arch/util/persist-comm.c

Go to the documentation of this file.
00001 
00010 
00011 #include "converse.h"
00012 
00013 #if CMK_PERSISTENT_COMM
00014 
00015 #include "persist_impl.h"
00016 
00017 #define TABLESIZE  512
00018 PersistentSendsTable persistentSendsTable[TABLESIZE];
00019 int persistentSendsTableCount = 0;
00020 PersistentReceivesTable *persistentReceivesTableHead;
00021 PersistentReceivesTable *persistentReceivesTableTail;
00022 int persistentReceivesTableCount = 0;
00023 
00024 /* Converse message type */
00025 typedef struct _PersistentRequestMsg {
00026   char core[CmiMsgHeaderSizeBytes];
00027   int requestorPE;
00028   int maxBytes;
00029   PersistentHandle sourceHandlerIndex;
00030 } PersistentRequestMsg;
00031 
00032 typedef struct _PersistentReqGrantedMsg {
00033   char core[CmiMsgHeaderSizeBytes];
00034   void *msgAddr[PERSIST_BUFFERS_NUM];
00035   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
00036   PersistentHandle sourceHandlerIndex;
00037   PersistentHandle destHandlerIndex;
00038 } PersistentReqGrantedMsg;
00039 
00040 typedef struct _PersistentDestoryMsg {
00041   char core[CmiMsgHeaderSizeBytes];
00042   PersistentHandle destHandlerIndex;
00043 } PersistentDestoryMsg;
00044 
00045 /* Converse handler */
00046 int persistentRequestHandlerIdx;
00047 int persistentReqGrantedHandlerIdx;
00048 int persistentDestoryHandlerIdx;
00049 
00050 PersistentHandle  *phs = NULL;
00051 int phsSize;
00052 
00053 /******************************************************************************
00054      Utilities
00055 ******************************************************************************/
00056 
00057 void initSendSlot(PersistentSendsTable *slot)
00058 {
00059   int i;
00060   slot->used = 0;
00061   slot->destPE = -1;
00062   slot->sizeMax = 0;
00063   slot->destHandle = 0; 
00064   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00065     slot->destAddress[i] = NULL;
00066     slot->destSizeAddress[i] = NULL;
00067   }
00068   slot->messageBuf = 0;
00069   slot->messageSize = 0;
00070 }
00071 
00072 void swapSendSlotBuffers(PersistentSendsTable *slot)
00073 {
00074   if (PERSIST_BUFFERS_NUM == 2) {
00075   void *tmp = slot->destAddress[0];
00076   slot->destAddress[0] = slot->destAddress[1];
00077   slot->destAddress[1] = tmp;
00078   tmp = slot->destSizeAddress[0];
00079   slot->destSizeAddress[0] = slot->destSizeAddress[1];
00080   slot->destSizeAddress[1] = tmp;
00081   }
00082 }
00083 
00084 void initRecvSlot(PersistentReceivesTable *slot)
00085 {
00086   int i;
00087   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00088     slot->messagePtr[i] = NULL;
00089     slot->recvSizePtr[i] = NULL;
00090   }
00091   slot->sizeMax = 0;
00092   slot->prev = slot->next = NULL;
00093 }
00094 
00095 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
00096 {
00097   if (PERSIST_BUFFERS_NUM == 2) {
00098   void *tmp = slot->messagePtr[0];
00099   slot->messagePtr[0] = slot->messagePtr[1];
00100   slot->messagePtr[1] = tmp;
00101   tmp = slot->recvSizePtr[0];
00102   slot->recvSizePtr[0] = slot->recvSizePtr[1];
00103   slot->recvSizePtr[1] = tmp;
00104   }
00105 }
00106 
00107 PersistentHandle getFreeSendSlot()
00108 {
00109   int i;
00110   if (persistentSendsTableCount == TABLESIZE) CmiAbort("persistentSendsTable full.\n");
00111   persistentSendsTableCount++;
00112   for (i=1; i<TABLESIZE; i++)
00113     if (persistentSendsTable[i].used == 0) break;
00114   return &persistentSendsTable[i];
00115 }
00116 
00117 PersistentHandle getFreeRecvSlot()
00118 {
00119   PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
00120   initRecvSlot(slot);
00121   if (persistentReceivesTableHead == NULL) {
00122     persistentReceivesTableHead = persistentReceivesTableTail = slot;
00123   }
00124   else {
00125     persistentReceivesTableTail->next = slot;
00126     slot->prev = persistentReceivesTableTail;
00127     persistentReceivesTableTail = slot;
00128   }
00129   persistentReceivesTableCount++;
00130   return slot;
00131 }
00132 
00133 /******************************************************************************
00134      Create Persistent Comm handler
00135      When creating a persistent comm with destPE and maxSize
00136      1. allocate a free PersistentSendsTable entry, send a 
00137         PersistentRequestMsg message to destPE
00138         buffer persistent message before  Persistent Comm is setup;
00139      2. destPE execute Converse handler persistentRequestHandler() on the
00140         PersistentRequestMsg message:
00141         allocate a free PersistentReceivesTable entry;
00142         allocate a message buffer of size maxSize for the communication;
00143         Send back a PersistentReqGrantedMsg with message address, etc for
00144         elan_put;
00145      3. Converse handler persistentReqGrantedHandler() executed on
00146         sender for the PersistentReqGrantedMsg. setup finish, send buffered
00147         message.
00148 ******************************************************************************/
00149 
00150 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
00151 {
00152   PersistentHandle h = getFreeSendSlot();
00153 
00154   PersistentSendsTable *slot = (PersistentSendsTable *)h;
00155   slot->used = 1;
00156   slot->destPE = destPE;
00157   slot->sizeMax = maxBytes;
00158 
00159   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
00160   msg->maxBytes = maxBytes;
00161   msg->sourceHandlerIndex = h;
00162   msg->requestorPE = CmiMyPe();
00163 
00164   CmiSetHandler(msg, persistentRequestHandlerIdx);
00165   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
00166 
00167   return h;
00168 }
00169 
00170 static void persistentRequestHandler(void *env)
00171 {             
00172   PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
00173   char *buf;
00174   int i;
00175 
00176   PersistentHandle h = getFreeRecvSlot();
00177   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00178   /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
00179 
00180   /* build reply message */
00181   PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
00182 
00183   setupRecvSlot(slot, msg->maxBytes);
00184 
00185   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00186     gmsg->msgAddr[i] = slot->messagePtr[i];
00187     gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
00188   }
00189 
00190   gmsg->sourceHandlerIndex = msg->sourceHandlerIndex;
00191   gmsg->destHandlerIndex = h;
00192 
00193   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
00194   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
00195 
00196   CmiFree(msg);
00197 }
00198 
00199 static void persistentReqGrantedHandler(void *env)
00200 {
00201   int i;
00202   /*CmiPrintf("Persistent handler granted\n");*/
00203   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
00204   PersistentHandle h = msg->sourceHandlerIndex;
00205   PersistentSendsTable *slot = (PersistentSendsTable *)h;
00206   CmiAssert(slot->used == 1);
00207   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00208     slot->destAddress[i] = msg->msgAddr[i];
00209     slot->destSizeAddress[i] = msg->slotFlagAddress[i];
00210   }
00211   slot->destHandle = msg->destHandlerIndex;
00212 
00213   if (slot->messageBuf) {
00214     CmiSendPersistentMsg(h, slot->destPE, slot->messageSize, slot->messageBuf);
00215     slot->messageBuf = NULL;
00216   }
00217   CmiFree(msg);
00218 }
00219 
00220 /*
00221   Another API:
00222   receiver initiate the persistent communication
00223 */
00224 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
00225 {
00226   PersistentReq ret;
00227   int i;
00228 
00229   PersistentHandle h = getFreeRecvSlot();
00230   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00231 
00232   setupRecvSlot(slot, maxBytes);
00233 
00234   ret.pe = CmiMyPe();
00235   ret.maxBytes = maxBytes;
00236   ret.myHand = h;
00237   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00238     ret.messagePtr[i] = slot->messagePtr[i];
00239     ret.recvSizePtr[i] = slot->recvSizePtr[i];
00240   }
00241 
00242   return ret;
00243 }
00244 
00245 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
00246 {
00247   int i;
00248   PersistentHandle h = getFreeSendSlot();
00249 
00250   PersistentSendsTable *slot = (PersistentSendsTable *)h;
00251   slot->used = 1;
00252   slot->destPE = recvHand.pe;
00253   slot->sizeMax = recvHand.maxBytes;
00254 
00255   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00256     slot->destAddress[i] = recvHand.messagePtr[i];
00257     slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
00258   }
00259   slot->destHandle = recvHand.myHand;
00260   return h;
00261 }
00262 
00263 /******************************************************************************
00264      destory Persistent Comm handler
00265 ******************************************************************************/
00266 
00267 /* Converse Handler */
00268 void persistentDestoryHandler(void *env)
00269 {             
00270   int i;
00271   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
00272   PersistentHandle h = msg->destHandlerIndex;
00273   CmiAssert(h!=NULL);
00274   CmiFree(msg);
00275   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
00276 
00277   persistentReceivesTableCount --;
00278   if (slot->prev) {
00279     slot->prev->next = slot->next;
00280   }
00281   else
00282    persistentReceivesTableHead = slot->next;
00283   if (slot->next) {
00284     slot->next->prev = slot->prev;
00285   }
00286   else
00287     persistentReceivesTableTail = slot->prev;
00288 
00289   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
00290     if (slot->messagePtr[i]) /*elan_CmiStaticFree(slot->messagePtr);*/
00291       PerFree((char*)slot->messagePtr[i]);
00292 
00293   CmiFree(slot);
00294 }
00295 
00296 /* FIXME: need to buffer until ReqGranted message come back? */
00297 void CmiDestoryPersistent(PersistentHandle h)
00298 {
00299   if (h == 0) CmiAbort("CmiDestoryPersistent: not a valid PersistentHandle\n");
00300 
00301   PersistentSendsTable *slot = (PersistentSendsTable *)h;
00302   CmiAssert(slot->destHandle != 0);
00303 
00304   PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
00305                               CmiAlloc(sizeof(PersistentDestoryMsg));
00306   msg->destHandlerIndex = slot->destHandle;
00307 
00308   CmiSetHandler(msg, persistentDestoryHandlerIdx);
00309   CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
00310 
00311   /* free this slot */
00312   initSendSlot(slot);
00313 
00314   persistentSendsTableCount --;
00315 }
00316 
00317 
00318 void CmiDestoryAllPersistent()
00319 {
00320   int i;
00321   for (i=0; i<TABLESIZE; i++) {
00322     if (persistentSendsTable[i].messageBuf) 
00323       CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
00324     initSendSlot(&persistentSendsTable[i]);
00325   }
00326   persistentSendsTableCount = 0;
00327 
00328   PersistentReceivesTable *slot = persistentReceivesTableHead;
00329   while (slot) {
00330     PersistentReceivesTable *next = slot->next;
00331     int i;
00332     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
00333       if (slot->recvSizePtr[i])
00334         CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
00335       if (slot->messagePtr[i]) PerFree((char*)slot->messagePtr[i]);
00336     }
00337     CmiFree(slot);
00338     slot = next;
00339   }
00340   persistentReceivesTableHead = persistentReceivesTableTail = NULL;
00341   persistentReceivesTableCount = 0;
00342 }
00343 
00344 void CmiPersistentInit()
00345 {
00346   int i;
00347   persistentRequestHandlerIdx = 
00348        CmiRegisterHandler((CmiHandler)persistentRequestHandler);
00349   persistentReqGrantedHandlerIdx = 
00350        CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
00351   persistentDestoryHandlerIdx = 
00352        CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
00353 
00354   persist_machine_init();
00355 
00356   for (i=0; i<TABLESIZE; i++) {
00357     initSendSlot(&persistentSendsTable[i]);
00358   }
00359   persistentSendsTableCount = 0;
00360   persistentReceivesTableHead = persistentReceivesTableTail = NULL;
00361   persistentReceivesTableCount = 0;
00362 }
00363 
00364 
00365 void CmiUsePersistentHandle(PersistentHandle *p, int n)
00366 {
00367 #ifndef CMK_OPTIMIZE
00368   int i;
00369   for (i=0; i<n; i++)
00370     if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
00371 #endif
00372   phs = p;
00373   phsSize = n;
00374 }
00375 
00376 #endif
00377 

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1