00001
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #define STRATEGY_ONE_ELANPUT 0
00022 #define STRATEGY_TWO_ELANPUT 1
00023
00024
00025
00026
00027 typedef struct pmsg_list {
00028 ELAN_EVENT *e;
00029 char *msg;
00030 struct pmsg_list *next;
00031 int size, destpe;
00032 void *addr;
00033 PersistentHandle h;
00034 int strategy, phase;
00035 } PMSG_LIST;
00036
00037 static PMSG_LIST *pending_persistent_msgs = NULL;
00038 static PMSG_LIST *end_pending_persistent_msgs = NULL;
00039 static PMSG_LIST *free_list_head = NULL;
00040
00041
00042 #define NEW_PMSG_LIST(evt, m, s, dest, _addr, ph, stra) \
00043 if (free_list_head) { msg_tmp = free_list_head; free_list_head=free_list_head->next; } \
00044 else msg_tmp = (PMSG_LIST *) CmiAlloc(sizeof(PMSG_LIST)); \
00045 msg_tmp->msg = m; \
00046 msg_tmp->e = evt; \
00047 msg_tmp->size = s; \
00048 msg_tmp->next = NULL; \
00049 msg_tmp->destpe = dest; \
00050 msg_tmp->addr = _addr; \
00051 msg_tmp->h = ph; \
00052 msg_tmp->phase = 0; \
00053 msg_tmp->strategy = stra;
00054
00055 #define APPEND_PMSG_LIST(msg_tmp) \
00056 if (pending_persistent_msgs==0) \
00057 pending_persistent_msgs = msg_tmp; \
00058 else \
00059 end_pending_persistent_msgs->next = msg_tmp; \
00060 end_pending_persistent_msgs = msg_tmp;
00061
00062
00063 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
00064 {
00065 CmiAssert(h!=NULL);
00066 PersistentSendsTable *slot = (PersistentSendsTable *)h;
00067 CmiAssert(slot->used == 1);
00068 CmiAssert(slot->destPE == destPE);
00069 if (size > slot->sizeMax) {
00070 CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
00071 CmiAbort("Abort: Invalid size\n");
00072 }
00073
00074
00075
00076 if (slot->destAddress[0]) {
00077 ELAN_EVENT *e1, *e2;
00078 int strategy = STRATEGY_ONE_ELANPUT;
00079
00080 int *footer = (int*)((char*)m + size);
00081 footer[0] = size;
00082 footer[1] = 1;
00083 if (strategy == STRATEGY_ONE_ELANPUT) CMI_MESSAGE_SIZE(m) = size;
00084 else CMI_MESSAGE_SIZE(m) = 0;
00085 e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
00086 switch (strategy ) {
00087 case STRATEGY_ONE_ELANPUT:
00088 case STRATEGY_TWO_ELANPUT: {
00089 PMSG_LIST *msg_tmp;
00090 NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
00091 APPEND_PMSG_LIST(msg_tmp);
00092 swapSendSlotBuffers(slot);
00093 break;
00094 }
00095 case 2:
00096 elan_wait(e1, ELAN_POLL_EVENT);
00097 e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
00098 elan_wait(e2, ELAN_POLL_EVENT);
00099 CMI_MESSAGE_SIZE(m) = 0;
00100
00101 CmiFree(m);
00102 }
00103 }
00104 else {
00105 #if 1
00106 if (slot->messageBuf != NULL) {
00107 CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
00108 CmiAbort("");
00109 }
00110 slot->messageBuf = m;
00111 slot->messageSize = size;
00112 #else
00113
00114 PersistentHandle *phs_tmp = phs;
00115 int phsSize_tmp = phsSize;
00116 phs = NULL; phsSize = 0;
00117 CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
00118 CmiSyncSendAndFree(slot->destPE, size, m);
00119 phs = phs_tmp; phsSize = phsSize_tmp;
00120 #endif
00121 }
00122 }
00123
00124 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
00125 {
00126 CmiState cs = CmiGetState();
00127 char *dupmsg = (char *) CmiAlloc(size);
00128 memcpy(dupmsg, msg, size);
00129
00130
00131 CMI_SET_BROADCAST_ROOT(dupmsg, 0);
00132
00133 if (cs->pe==destPE) {
00134 CQdCreate(CpvAccess(cQdState), 1);
00135 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
00136 }
00137 else
00138 CmiSendPersistentMsg(h, destPE, size, dupmsg);
00139 }
00140
00141
00142
00143
00144
00145 static int remote_put_done(PMSG_LIST *smsg)
00146 {
00147 int flag = elan_poll(smsg->e, ELAN_POLL_EVENT);
00148 if (flag) {
00149 switch (smsg->strategy) {
00150 case 0:
00151 smsg->phase = 1;
00152 CmiFree(smsg->msg);
00153 return 2;
00154 case 1:
00155 if (smsg->phase == 1) {
00156
00157
00158
00159 return 2;
00160 }
00161 else {
00162 smsg->phase = 1;
00163 CmiFree(smsg->msg);
00164
00165 PersistentSendsTable *slot = (PersistentSendsTable *)(smsg->h);
00166 smsg->e = elan_put(elan_base->state, &smsg->size, smsg->addr, sizeof(int), smsg->destpe);
00167 return 1;
00168 }
00169 }
00170 }
00171 return 0;
00172 }
00173
00174
00175 void release_pmsg_list()
00176 {
00177 PMSG_LIST *prev=0, *temp;
00178 PMSG_LIST *msg_tmp = pending_persistent_msgs;
00179
00180 while (msg_tmp) {
00181 int status = remote_put_done(msg_tmp);
00182 if (status == 2) {
00183 temp = msg_tmp->next;
00184 if (prev==0)
00185 pending_persistent_msgs = temp;
00186 else
00187 prev->next = temp;
00188
00189 if (free_list_head) { msg_tmp->next = free_list_head; free_list_head = msg_tmp; }
00190 else free_list_head = msg_tmp;
00191 msg_tmp = temp;
00192 }
00193 else {
00194 prev = msg_tmp;
00195 msg_tmp = msg_tmp->next;
00196 }
00197 }
00198 end_pending_persistent_msgs = prev;
00199 }
00200
00201 extern void CmiReference(void *blk);
00202
00203
00204 int PumpPersistent()
00205 {
00206 int status = 0;
00207 PersistentReceivesTable *slot = persistentReceivesTableHead;
00208 while (slot) {
00209 char *msg = slot->messagePtr[0];
00210 int size = *(slot->recvSizePtr[0]);
00211 if (size)
00212 {
00213 int *footer = (int*)(msg + size);
00214 if (footer[0] == size && footer[1] == 1) {
00215
00216
00217 #if 0
00218 void *dupmsg;
00219 dupmsg = CmiAlloc(size);
00220
00221 _MEMCHECK(dupmsg);
00222 memcpy(dupmsg, msg, size);
00223 memset(msg, 0, size+2*sizeof(int));
00224 msg = dupmsg;
00225 #else
00226
00227
00228
00229 CmiReference(msg);
00230 swapRecvSlotBuffers(slot);
00231 #endif
00232
00233 CmiPushPE(CMI_DEST_RANK(msg), msg);
00234 #if CMK_BROADCAST_SPANNING_TREE
00235 if (CMI_BROADCAST_ROOT(msg))
00236 SendSpanningChildren(size, msg);
00237 #endif
00238
00239 *(slot->recvSizePtr[0]) = 0;
00240 footer[0] = footer[1] = 0;
00241
00242 #if 0
00243
00244
00245 msg=slot->messagePtr[0];
00246 size = *(slot->recvSizePtr[0]);
00247 footer = (int*)(msg + size);
00248 *(slot->recvSizePtr[0]) = 0;
00249 footer[0] = footer[1] = 0;
00250 #endif
00251 status = 1;
00252 }
00253 }
00254 slot = slot->next;
00255 }
00256 return status;
00257 }
00258
00259 void *PerAlloc(int size)
00260 {
00261 return CmiAlloc(size);
00262 }
00263
00264 void PerFree(char *msg)
00265 {
00266 elan_CmiStaticFree(msg);
00267 }
00268
00269
00270 void persist_machine_init(void)
00271 {
00272 }
00273
00274 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
00275 {
00276 int i;
00277 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
00278 char *buf = PerAlloc(maxBytes+sizeof(int)*2);
00279 _MEMCHECK(buf);
00280 memset(buf, 0, maxBytes+sizeof(int)*2);
00281 slot->messagePtr[i] = buf;
00282
00283 slot->recvSizePtr[i] = (unsigned int*)buf;
00284 }
00285 slot->sizeMax = maxBytes;
00286 }
00287
00288