arch/elan/persistent.c

Go to the documentation of this file.
00001 
00006 /*
00007   included in machine.c
00008   common code for persistent communication now is moved to persist-comm.c
00009   Gengbin Zheng, 12/5/2003
00010 */
00011 
00012 /*
00013   machine specific persistent comm functions:
00014   * CmiSendPersistentMsg
00015   * CmiSyncSendPersistent
00016   * PumpPersistent
00017   * PerAlloc PerFree      // persistent message memory allocation/free functions
00018   * persist_machine_init  // machine specific initialization call
00019 */
00020 
00021 #define STRATEGY_ONE_ELANPUT   0
00022 #define STRATEGY_TWO_ELANPUT   1
00023 
00024 /****************************************************************************
00025        Pending send messages
00026 ****************************************************************************/
00027 typedef struct pmsg_list {
00028   ELAN_EVENT *e;
00029   char *msg;
00030   struct pmsg_list *next;
00031   int size, destpe;
00032   void *addr;
00033   PersistentHandle  h;
00034   int strategy, phase;
00035 } PMSG_LIST;
00036 
00037 static PMSG_LIST *pending_persistent_msgs = NULL;
00038 static PMSG_LIST *end_pending_persistent_msgs = NULL;
00039 static PMSG_LIST *free_list_head = NULL;
00040 
00041 /* free_list_head keeps a list of reusable PMSG_LIST */
00042 #define NEW_PMSG_LIST(evt, m, s, dest, _addr, ph, stra) \
00043   if (free_list_head) { msg_tmp = free_list_head; free_list_head=free_list_head->next; }  \
00044   else msg_tmp = (PMSG_LIST *) CmiAlloc(sizeof(PMSG_LIST));     \
00045   msg_tmp->msg = m;     \
00046   msg_tmp->e = evt;     \
00047   msg_tmp->size = s;    \
00048   msg_tmp->next = NULL; \
00049   msg_tmp->destpe = dest;       \
00050   msg_tmp->addr = _addr;        \
00051   msg_tmp->h = ph;              \
00052   msg_tmp->phase = 0;   \
00053   msg_tmp->strategy = stra;     
00054 
00055 #define APPEND_PMSG_LIST(msg_tmp)       \
00056   if (pending_persistent_msgs==0)       \
00057     pending_persistent_msgs = msg_tmp;  \
00058   else  \
00059     end_pending_persistent_msgs->next = msg_tmp;        \
00060   end_pending_persistent_msgs = msg_tmp;
00061 
00062 
00063 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
00064 {
00065   CmiAssert(h!=NULL);
00066   PersistentSendsTable *slot = (PersistentSendsTable *)h;
00067   CmiAssert(slot->used == 1);
00068   CmiAssert(slot->destPE == destPE);
00069   if (size > slot->sizeMax) {
00070     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
00071     CmiAbort("Abort: Invalid size\n");
00072   }
00073 
00074 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), destPE, slot->destAddress[0], size);*/
00075 
00076   if (slot->destAddress[0]) {
00077     ELAN_EVENT *e1, *e2;
00078     int strategy = STRATEGY_ONE_ELANPUT;
00079     /* if (size > 280) strategy = STRATEGY_TWO_ELANPUT; */
00080     int *footer = (int*)((char*)m + size);
00081     footer[0] = size;
00082     footer[1] = 1;
00083     if (strategy == STRATEGY_ONE_ELANPUT) CMI_MESSAGE_SIZE(m) = size;
00084     else CMI_MESSAGE_SIZE(m) = 0;
00085     e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
00086     switch (strategy ) {
00087     case STRATEGY_ONE_ELANPUT:
00088     case STRATEGY_TWO_ELANPUT:  {
00089       PMSG_LIST *msg_tmp;
00090       NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
00091       APPEND_PMSG_LIST(msg_tmp);
00092       swapSendSlotBuffers(slot);
00093       break;
00094       }
00095     case 2:
00096       elan_wait(e1, ELAN_POLL_EVENT);
00097       e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
00098       elan_wait(e2, ELAN_POLL_EVENT);
00099       CMI_MESSAGE_SIZE(m) = 0;
00100       /*CmiPrintf("[%d] elan finished. \n", CmiMyPe());*/
00101       CmiFree(m);
00102     }
00103   }
00104   else {
00105 #if 1
00106     if (slot->messageBuf != NULL) {
00107       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
00108       CmiAbort("");
00109     }
00110     slot->messageBuf = m;
00111     slot->messageSize = size;
00112 #else
00113     /* normal send */
00114     PersistentHandle  *phs_tmp = phs;
00115     int phsSize_tmp = phsSize;
00116     phs = NULL; phsSize = 0;
00117     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
00118     CmiSyncSendAndFree(slot->destPE, size, m);
00119     phs = phs_tmp; phsSize = phsSize_tmp;
00120 #endif
00121   }
00122 }
00123 
00124 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
00125 {
00126   CmiState cs = CmiGetState();
00127   char *dupmsg = (char *) CmiAlloc(size);
00128   memcpy(dupmsg, msg, size);
00129 
00130   /*  CmiPrintf("Setting root to %d\n", 0); */
00131   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00132 
00133   if (cs->pe==destPE) {
00134     CQdCreate(CpvAccess(cQdState), 1);
00135     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
00136   }
00137   else
00138     CmiSendPersistentMsg(h, destPE, size, dupmsg);
00139 }
00140 
00141 /* 
00142   1: finish the first put but still need to be in the queue for the second put.
00143   2: finish and should be removed from queue.
00144 */
00145 static int remote_put_done(PMSG_LIST *smsg)
00146 {
00147   int flag = elan_poll(smsg->e, ELAN_POLL_EVENT);
00148   if (flag) {
00149       switch (smsg->strategy) {
00150       case 0: 
00151         smsg->phase = 1;
00152         CmiFree(smsg->msg);
00153         return 2;
00154       case 1:
00155         if (smsg->phase == 1) {
00156           /*
00157             CmiPrintf("remote_put_done on %d\n", CmiMyPe());
00158           */
00159           return 2;
00160         }
00161         else {
00162           smsg->phase = 1;
00163           CmiFree(smsg->msg);
00164                                                                                 
00165           PersistentSendsTable *slot = (PersistentSendsTable *)(smsg->h);
00166           smsg->e = elan_put(elan_base->state, &smsg->size, smsg->addr, sizeof(int), smsg->destpe);
00167           return 1;
00168         }
00169      }
00170   }
00171   return 0;
00172 }
00173 
00174 /* called in CmiReleaseSentMessages */
00175 void release_pmsg_list()
00176 {
00177   PMSG_LIST *prev=0, *temp;
00178   PMSG_LIST *msg_tmp = pending_persistent_msgs;
00179 
00180   while (msg_tmp) {
00181     int status = remote_put_done(msg_tmp);
00182     if (status == 2) {
00183       temp = msg_tmp->next;
00184       if (prev==0)
00185         pending_persistent_msgs = temp;
00186       else
00187         prev->next = temp;
00188       /*CmiFree(msg_tmp);*/
00189       if (free_list_head) { msg_tmp->next = free_list_head; free_list_head = msg_tmp; }
00190       else free_list_head = msg_tmp;
00191       msg_tmp = temp;
00192     }
00193     else {
00194       prev = msg_tmp;
00195       msg_tmp = msg_tmp->next;
00196     }
00197   }
00198   end_pending_persistent_msgs = prev;
00199 }
00200 
00201 extern void CmiReference(void *blk);
00202 
00203 /* called in PumpMsgs */
00204 int PumpPersistent()
00205 {
00206   int status = 0;
00207   PersistentReceivesTable *slot = persistentReceivesTableHead;
00208   while (slot) {
00209     char *msg = slot->messagePtr[0];
00210     int size = *(slot->recvSizePtr[0]);
00211     if (size)
00212     {
00213       int *footer = (int*)(msg + size);
00214       if (footer[0] == size && footer[1] == 1) {
00215 /*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
00216 
00217 #if 0
00218       void *dupmsg;
00219       dupmsg = CmiAlloc(size);
00220                                                                                 
00221       _MEMCHECK(dupmsg);
00222       memcpy(dupmsg, msg, size);
00223       memset(msg, 0, size+2*sizeof(int));
00224       msg = dupmsg;
00225 #else
00226       /* return messagePtr directly and user MUST make sure not to delete it. */
00227       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
00228 
00229       CmiReference(msg);
00230       swapRecvSlotBuffers(slot);
00231 #endif
00232 
00233       CmiPushPE(CMI_DEST_RANK(msg), msg);
00234 #if CMK_BROADCAST_SPANNING_TREE
00235       if (CMI_BROADCAST_ROOT(msg))
00236           SendSpanningChildren(size, msg);
00237 #endif
00238       /* clear footer after message used */
00239       *(slot->recvSizePtr[0]) = 0;
00240       footer[0] = footer[1] = 0;
00241 
00242 #if 0
00243       /* not safe at all! */
00244       /* instead of clear before use, do it earlier */
00245       msg=slot->messagePtr[0];
00246       size = *(slot->recvSizePtr[0]);
00247       footer = (int*)(msg + size);
00248       *(slot->recvSizePtr[0]) = 0;
00249       footer[0] = footer[1] = 0;
00250 #endif
00251       status = 1;
00252       }
00253     }
00254     slot = slot->next;
00255   }
00256   return status;
00257 }
00258 
00259 void *PerAlloc(int size)
00260 {
00261   return CmiAlloc(size);
00262 }
00263                                                                                 
00264 void PerFree(char *msg)
00265 {
00266   elan_CmiStaticFree(msg);
00267 }
00268 
00269 /* machine dependent init call */
00270 void persist_machine_init(void)
00271 {
00272 }
00273 
00274 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
00275 {
00276   int i;
00277   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00278     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
00279     _MEMCHECK(buf);
00280     memset(buf, 0, maxBytes+sizeof(int)*2);
00281     slot->messagePtr[i] = buf;
00282     /* note: assume first integer in elan converse header is the msg size */
00283     slot->recvSizePtr[i] = (unsigned int*)buf;
00284   }
00285   slot->sizeMax = maxBytes;
00286 }
00287 
00288 

Generated on Sun Jun 29 13:29:05 2008 for Charm++ by  doxygen 1.5.1