arch/t3e/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/t3e/machine.c,v $
00003  * $Author: gioachin $
00004  * $Date: 2004-10-06 01:28:55 $
00005  * $Revision: 1.27 $
00006  *****************************************************************************/
00007 
00017 #include <stdlib.h>
00018 #include <unistd.h>
00019 #include <malloc.h>
00020 #include <mpp/shmem.h>
00021 #include "converse.h"
00022 
00023 /*
00024  *  We require statically allocated variables for locks.  This defines
00025  *  the max number of processors available.
00026  */
00027 #define MAX_PES 2048
00028 
00029 /*
00030  * Some constants
00031  */
00032 enum boolean {false = 0, true = 1};
00033 enum {list_empty = -1 };
00034 
00035 
00036 /*
00037  * Local declarations for Cmi, used by common code
00038  */
00039 CpvDeclare(void*, CmiLocalQueue);
00040 int _Cmi_mype;
00041 int _Cmi_numpes;
00042 int _Cmi_myrank;
00043 
00044 /*
00045  * Local queue functions, used by common code to store messages 
00046  * to my own node efficiently.  These are used when 
00047  * CMK_CMIDELIVERS_USE_COMMON_CODE is true.
00048  */
00049 /*
00050  * Distributed list declarations.  This linked list goes across machines,
00051  * storing all the messages for this node until this processor copies them
00052  * into local memory.
00053  */
00054 typedef struct McDistListS
00055 {
00056   int nxt_node;
00057   struct McMsgHdrS *nxt_addr;
00058   int msg_sz;
00059 } McDistList;
00060 
00061 typedef struct McMsgHdrS
00062 {
00063   McDistList list_node;
00064   enum {Unknown, Message, BcastMessage } msg_type;
00065   enum boolean received_f;
00066   union
00067   {
00068     struct McMsgHdrS *ptr;
00069     int count;
00070   } bcast;
00071   int bcast_msg_size;
00072   int handler;
00073 } McMsgHdr;
00074 
00075 
00076 /*
00077  * Mc functions, used in machine.c only.
00078  */
00079 static void McInit();
00080 static void McInitList();
00081 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe);
00082 static void McRetrieveRemote(void);
00083 static void McCleanUpInTransit(void);
00084 
00085 /*
00086  * These declarations are for a local linked list to hold messages which
00087  * have been copied into local memory.  It is a modified version of the
00088  * Origin2000 code with the locks removed.
00089  */
00090 /* Allocation block size, to reduce num of mallocs */
00091 #define BLK_LEN  512  
00092 
00093 typedef struct McQueueS
00094 {
00095   void     **blk;
00096   unsigned int blk_len;
00097   unsigned int first;
00098   unsigned int len;
00099 } McQueue;
00100 
00101 static McQueue *McQueueCreate(void);
00102 static void McQueueAddToBack(McQueue *queue, void *element);
00103 static void *McQueueRemoveFromFront(McQueue *queue);
00104 static void *McQueueRemoveFromBack(McQueue *queue);
00105 
00106 /*************************************************************
00107  * static variable declarations
00108  */
00109 /*
00110  *  Local queues used for mem management.
00111  *
00112  * These queues hold outgoing messages which will be picked up by
00113  * receiver PEs.  Garbage collection works by scanning the 
00114  * in_transit_queue for messages, freeing delivered ones, and moving
00115  * others to in_transit_tmp_queue.  Then the pointers are swapped,
00116  * so in_transit_queue contains all the undelivered messages, and
00117  * in_transit_tmp_queue is empty.
00118  */
00119 static McQueue *in_transit_queue;
00120 static McQueue *in_transit_tmp_queue;
00121 
00122 /* tmp_queue is used to invert the order of incoming messages */
00123 static McQueue *tmp_queue;  
00124 
00125 /* received_queue holds all the messages which have been moved
00126  * into local memory.  Messages are dequede from here.
00127  */
00128 static McQueue *received_queue;
00129 
00130 /* received_token_queue saves incoming broadcast-message tokens,
00131  * until McRetrieveRemote is done with them.
00132  */
00133 static McQueue *received_token_queue;
00134 
00135 /* outgoing broadcast message queue, holds messages until all receivers have
00136  * picked it up
00137  */
00138 static McQueue *broadcast_queue;
00139 static McQueue *broadcast_tmp_queue;
00140 
00141 /*
00142  * head is the pointer to my next incoming message.
00143  */
00144 static McDistList head;
00145 
00146 /* Static variables are necessary for locks. */
00147 static long *my_lock;
00148 static long head_lock[MAX_PES];
00149 static long bcast_lock[MAX_PES];
00150 
00151 #define ALIGN8(x)  (8*(((x)+7)/8))
00152 
00153 int McChecksum(char *msg, int size)
00154 {
00155   int chksm;
00156   int i;
00157 
00158   chksm=0xff;
00159   for(i=0; i < size; i++)
00160     chksm ^= *(msg+i);
00161   return chksm;
00162 }
00163 
00164 /**********************************************************************
00165  *  CMI Functions START HERE
00166  */
00167 
00168 
00169 /**********************************************************************
00170  * Cmi Message calls.  This implementation uses sync-type sends for
00171  * everything.  An async interface would be efficient, and not difficult
00172  * to add
00173  */
00174 void CmiSyncSendFn(int dest_pe, int size, char *msg)
00175 {
00176   McMsgHdr *dup_msg;
00177 
00178   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00179   memcpy(dup_msg,msg,size);
00180   dup_msg->msg_type = Message;
00181 
00182   McRetrieveRemote();
00183 
00184   if (dest_pe == _Cmi_mype)
00185     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dup_msg);
00186   else
00187   {
00188     McEnqueueRemote(dup_msg,ALIGN8(size),dest_pe); 
00189   }
00190   CQdCreate(CpvAccess(cQdState), 1);
00191 }
00192 
00193 CmiCommHandle CmiAsyncSendFn(int dest_pe, int size, char *msg)
00194 {
00195   CmiSyncSendFn(dest_pe, size, msg);
00196   return 0;
00197 }
00198 
00199 void CmiFreeSendFn(int dest_pe, int size, char *msg)
00200 {
00201   /* No need to copy message, since we will immediately free it */
00202   McRetrieveRemote();
00203   ((McMsgHdr *)msg)->msg_type = Message;
00204 
00205   if (dest_pe == _Cmi_mype)
00206     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00207   else
00208   {
00209     McEnqueueRemote(msg,size,dest_pe); 
00210   }
00211   CQdCreate(CpvAccess(cQdState), 1);
00212 }
00213 
00214 void CmiSyncBroadcastFn(int size, char *msg)
00215 {
00216   int i;
00217   McMsgHdr *dup_msg;
00218   McMsgHdr bcast_msg_tok;
00219   McMsgHdr *dup_tok;
00220   int hdr_size;
00221 
00222   /*
00223    * Copy user's message, and set count to the correct number of recients
00224    */
00225   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00226   memcpy(dup_msg,msg,size);
00227   dup_msg->bcast.count = _Cmi_numpes - 1;
00228   /*
00229   CmiPrintf("PE %d broadcast handler=%d\n",_Cmi_mype,dup_msg->handler);
00230   */
00231   /*
00232    * Make the broadcast token point to the copied message
00233    */
00234   bcast_msg_tok.msg_type = BcastMessage;
00235   bcast_msg_tok.bcast.ptr = dup_msg;
00236   bcast_msg_tok.bcast_msg_size = size;
00237 
00238   hdr_size = ALIGN8(sizeof(McMsgHdr));
00239 
00240   /*
00241    * Enqueue copies of the token message on other nodes.  This code should
00242    * be similar to CmiSyncSend
00243    */
00244   for(i=0; i<_Cmi_numpes; i++)
00245     if (i != _Cmi_mype)
00246     {
00247       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00248       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00249       McEnqueueRemote(dup_tok,hdr_size,i); 
00250     }
00251   /*
00252    * The token message will be deleted as a normal message,
00253    * but the message being broadcast needs to be saved for future
00254    * garbage collection.
00255    */
00256   McQueueAddToBack(broadcast_queue,dup_msg);
00257   CQdCreate(CpvAccess(cQdState), _Cmi_numpes-1);
00258 }
00259 
00260 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00261 {
00262   CmiSyncBroadcastFn(size,msg);
00263   return 0;
00264 }
00265 
00266 void CmiFreeBroadcastFn(int size, char *msg)
00267 {
00268   CmiSyncBroadcastFn(size,msg);
00269   CmiFree(msg);
00270 }
00271 
00272 void CmiSyncBroadcastAllFn(int size, char *msg)
00273 {
00274   int i;
00275   CmiSyncBroadcastFn(size,msg);
00276   CmiSyncSendFn(_Cmi_mype, size, msg);
00277 }
00278 
00279 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00280 {
00281   CmiSyncBroadcastAllFn(size,msg);
00282   return 0;
00283 }
00284 
00285 void CmiFreeBroadcastAllFn(int size, char *msg)
00286 {
00287   CmiSyncBroadcastAllFn(size,msg);
00288   CmiFree(msg);
00289 }
00290 
00291 
00292 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg)
00293 {
00294   int i;
00295   McMsgHdr *dup_msg;
00296   McMsgHdr bcast_msg_tok;
00297   McMsgHdr *dup_tok;
00298   int hdr_size;
00299   int n_remote_pes;
00300 
00301   /*
00302    * Count how many remote PEs, and send to the local PE if it is in the list
00303    */
00304   /*
00305   CmiPrintf("CmiSyncListSendFn: size %d handler %d\n",
00306             size,((McMsgHdr *)msg)->handler);
00307   CmiPrintf("CmiSyncListSendFn: size %d npes %d\n",size,npes);
00308   */
00309   n_remote_pes = 0;
00310   for (i=0; i < npes; i++)
00311   {
00312     if (pes[i] == _Cmi_mype)
00313       CmiSyncSendFn(_Cmi_mype, size, msg);
00314     else
00315       n_remote_pes++;
00316   }
00317   if (n_remote_pes == 0)  // Nothing to do
00318     return;
00319   
00320   /*
00321    * Copy user's message, and set count to the correct number of recients
00322    */
00323   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00324   memcpy(dup_msg,msg,size);
00325   dup_msg->bcast.count = n_remote_pes;
00326   /*
00327    * Make the broadcast token point to the copied message
00328    */
00329   bcast_msg_tok.msg_type = BcastMessage;
00330   bcast_msg_tok.bcast.ptr = dup_msg;
00331   bcast_msg_tok.bcast_msg_size = size;
00332 
00333   hdr_size = ALIGN8(sizeof(McMsgHdr));
00334 
00335   /*
00336    * Enqueue copies of the token message on other nodes.  This code should
00337    * be similar to CmiSyncSend
00338    */
00339   for(i=0; i<npes; i++)
00340     if (pes[i] != _Cmi_mype)
00341     {
00342       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00343       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00344       McEnqueueRemote(dup_tok,hdr_size,pes[i]); 
00345       CQdCreate(CpvAccess(cQdState), 1);
00346     }
00347   /*
00348    * The token message will be deleted as a normal message,
00349    * but the message being broadcast needs to be saved for future
00350    * garbage collection.
00351    */
00352   McQueueAddToBack(broadcast_queue,dup_msg);
00353 }
00354 
00355 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg)
00356 {
00357   CmiSyncListSendFn(npes, pes, size, msg);
00358   return 0;
00359 }
00360 
00361 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg)
00362 {
00363   CmiSyncListSendFn(npes,pes,size,msg);
00364   CmiFree(msg);
00365 }
00366 
00367 typedef struct {
00368   char header[CmiMsgHeaderSizeBytes];
00369   CmiGroup grp;
00370   int size;
00371   char *user_msg;
00372 } McMultiMsg;
00373 
00374 CpvDeclare(int,McMulticastWaitHandler);
00375 
00376 void CmiSyncMulticastFn(CmiGroup grp, int size, char *msg)
00377 {
00378   int npes;
00379   int *pes;
00380   McMultiMsg multi_msg;
00381   
00382   /*
00383   CmiPrintf("CmiSyncMulticastFn: size %d handler %d\n",
00384             size,((McMsgHdr *)msg)->handler);
00385    */
00386   /*
00387    *  Check for group, and busy-wait, if necessary, for group info
00388    */
00389   CmiLookupGroup(grp, &npes, &pes);
00390   if (pes != 0)
00391     CmiSyncListSendFn( npes, pes, size, msg);
00392   else
00393   {
00394     multi_msg.grp = grp;
00395     multi_msg.size = size;
00396     multi_msg.user_msg = (char *) CmiAlloc(ALIGN8(size));
00397     memcpy(multi_msg.user_msg,msg,size);
00398 
00399     CmiSetHandler(&multi_msg,CpvAccess(McMulticastWaitHandler));
00400     CmiSyncSendFn(CmiMyPe(),sizeof(McMultiMsg),(char *)&multi_msg);
00401   }
00402 }
00403 
00404 void McMulticastWaitFn(McMultiMsg *msg)
00405 {
00406   CmiFreeMulticastFn(msg->grp,msg->size,msg->user_msg);
00407 }
00408 
00409 void CmiMulticastInit(void)
00410 {
00411   CpvInitialize(int,McMulticastWaitHandler);
00412   CpvAccess(McMulticastWaitHandler) = CmiRegisterHandler(McMulticastWaitFn);
00413 }
00414 
00415 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int size, char *msg)
00416 {
00417   CmiSyncMulticastFn(grp, size, msg);
00418   return 0;
00419 }
00420 
00421 void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
00422 {
00423   CmiSyncMulticastFn(grp, size, msg);
00424   CmiFree(msg);
00425 }
00426 
00427 
00428 /***********************************************************************
00429  *
00430  * Abort function:
00431  *
00432  ************************************************************************/
00433 
00434 void CmiAbort(const char *message)
00435 {
00436   CmiError(message);
00437   globalexit(1);
00438 }
00439 
00440 /**********************************************************************
00441  * CMI utility functions for startup, shutdown, and other miscellaneous
00442  * activities.
00443  */
00444 
00445 /*
00446  * This port uses the common CmiDeliver code, so we only provide
00447  * CmiGetNonLocal()
00448  */
00449 void *CmiGetNonLocal()
00450 {
00451   McRetrieveRemote();
00452 
00453   return (void *)McQueueRemoveFromFront(received_queue);
00454 }
00455 
00456 void 
00457 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00458 {
00459   McInit();
00460   CthInit(argv);
00461   ConverseCommonInit(argv);
00462   for(argc=0;argv[argc];argc++);
00463   if (initret==0)
00464   {
00465     fn(argc,argv);
00466     if (usched==0) CsdScheduler(-1);
00467     ConverseExit();
00468   }
00469 }
00470 
00471 void ConverseExit()
00472 {
00473 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00474   if (CmiMyPe() == 0){
00475     CmiPrintf("End of program\n");
00476   }
00477 #endif
00478   ConverseCommonExit();
00479   localexit(0);
00480 }
00481 
00482 void CmiNotifyIdle(void)
00483 {
00484   /* Use this opportunity to clean up the in_transit_queue */
00485   McCleanUpInTransit();
00486 }
00487 
00488 /**********************************************************************
00489  * Mc Functions:
00490  * Mc functions are used internally in machine.c only
00491  */
00492 static void McInit(void)
00493 {
00494   CpvInitialize(void *, CmiLocalQueue);
00495   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00496   _Cmi_mype = _my_pe();
00497   _Cmi_numpes = _num_pes();
00498   _Cmi_myrank = 0;
00499 
00500   McInitList();
00501 }
00502 
00503 static void McInitList(void)
00504 {
00505   int i;
00506 
00507   received_queue = McQueueCreate();
00508   tmp_queue = McQueueCreate();
00509   received_token_queue = McQueueCreate();
00510   broadcast_queue = McQueueCreate();
00511   broadcast_tmp_queue = McQueueCreate();
00512   in_transit_tmp_queue = McQueueCreate();
00513   in_transit_queue = McQueueCreate();
00514 
00515   head.nxt_node = list_empty;
00516   head.nxt_addr = NULL;
00517   head.msg_sz = 0;
00518   if (_Cmi_numpes > MAX_PES)
00519   {
00520     CmiPrintf("Not enough processors allocated in machine.c.\n");
00521     CmiPrintf("Change MAX_PES in t3e/machine.c to at least %d and recompile Converse\n",
00522     _Cmi_numpes);
00523   }
00524   for(i=0; i < _Cmi_numpes; i++)
00525   {
00526     head_lock[i] = 0;
00527     bcast_lock[i] = 0;
00528   }
00529   my_lock = &(head_lock[_Cmi_mype]);
00530   barrier();
00531   shmem_clear_lock(my_lock);
00532   shmem_clear_lock(&bcast_lock[_Cmi_mype]);
00533 }
00534 
00535 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
00536 {
00537  /*
00538   * To enqueue on a remote node, we should:
00539   * 0. Free any delivered messages from the message_in_transit list.
00540   * 1. Add message in the "message_in_transit" list
00541   * 2. Fill in the fields in the message header
00542   * 3. Lock the head pointer on the remote node.
00543   * 4. Swap the list pointer with that on the other node.
00544   * 5. Release lock
00545   */
00546 
00547   McDistList tmp_link;
00548   McDistList *msg_link;
00549 
00550   /*  CmiPrintf("PE %d outgoing msg = %d msg_type = %d size = %d\n",
00551             _Cmi_mype,msg,((McMsgHdr *)msg)->msg_type,msg_sz);*/
00552   /* 0. Free any delivered messages from the in_transit_queue list. */
00553   McCleanUpInTransit();
00554 
00555   /* 1. Add message in the "in_transit_queue" list */
00556   McQueueAddToBack(in_transit_queue,msg);
00557 
00558   /* 2. Fill in the fields in the message header */
00559   msg_link = &(((McMsgHdr *)msg)->list_node);
00560   ((McMsgHdr *)msg)->received_f = false;
00561 
00562   /* Set list fields to point back to this processor, this message.  */
00563   tmp_link.nxt_node = _Cmi_mype;
00564   tmp_link.nxt_addr = msg;
00565   tmp_link.msg_sz = msg_sz;
00566 
00567   /* 3. Lock the head pointer on the remote node.
00568      Acquire lock on the destination queue.  If locks turn oout to
00569      be inefficient, use fetch and increment to imp. lock
00570    */
00571 
00572   shmem_set_lock(&(head_lock[dst_pe]));
00573 
00574 
00575   /* 4. Swap the list pointer with that on the other node.
00576    */
00577   /* First, get current head pointer, and stick it in this 
00578    * message data area.
00579    */
00580   shmem_get(msg_link, &head, sizeof(McDistList)/sizeof(int), dst_pe);
00581   /* Next, write the new message into the top of the list */
00582   shmem_put(&head, &tmp_link, sizeof(McDistList)/sizeof(int),dst_pe);
00583 
00584 #ifdef DEBUG
00585   printf("[%d] Adding Message to pe %d\n",_Cmi_mype,dst_pe);
00586   printf("[%d]   nxt_node = %d\n",_Cmi_mype,tmp_link.nxt_node);
00587   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,tmp_link.nxt_addr);
00588   printf("[%d]   msg_sz = %x\n",_Cmi_mype,tmp_link.msg_sz);
00589   printf("[%d] Old Message is now at %x\n",_Cmi_mype,msg_link);
00590   printf("[%d]   nxt_node = %d\n",_Cmi_mype,msg_link->nxt_node);
00591   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,msg_link->nxt_addr);
00592   printf("[%d]   msg_sz = %x\n",_Cmi_mype,msg_link->msg_sz);
00593 #endif
00594 
00595   /* 5. Release lock */
00596   shmem_clear_lock(&(head_lock[dst_pe]));
00597 }
00598 
00599 static void McRetrieveRemote(void)
00600 {
00601   /*
00602    * The local host should retrieve messages from the distributed list
00603    * and put them in local memory, in a messages queue.
00604    * Steps:
00605    * 0) Lock list pointer.
00606    * 1) Replace list pointer with NULL and unlock list
00607    * 2) Get each message into local memory
00608    * 3) Enqueue list into local message queue, in reverse order
00609    */
00610 
00611   McDistList list_head;
00612   McDistList *cur_node;
00613   McMsgHdr *cur_msg; 
00614   int received_f;
00615   enum boolean bcast_msg;
00616   McMsgHdr *bcast_ptr;
00617 
00618   /* Get the head of the list */
00619 
00620   if (head.nxt_node == list_empty)  /* apparently there are no messages */
00621     return;
00622 
00623   /* 0) Lock list pointer. */
00624   shmem_set_lock(my_lock);
00625 
00626   /* 1) Replace list pointer with NULL and unlock list */
00627   list_head = head;
00628   head.nxt_node = list_empty;
00629   head.nxt_addr = NULL;
00630   head.msg_sz = 0;
00631   shmem_clear_lock(my_lock);
00632 
00633   /* 2) Get each message into local memory
00634    * Start copying the messages into local memory, putting messages into
00635    * a local list for future reversing.
00636    */
00637   cur_node = &list_head;
00638   received_f = true;
00639 
00640   while (cur_node->nxt_node != list_empty)
00641   {
00642     cur_msg = (McMsgHdr *)CmiAlloc(ALIGN8(cur_node->msg_sz));
00643     if (cur_msg ==NULL)
00644     {
00645       CmiError("%s:%d Cannot Allocate Memory\n",__FILE__,__LINE__);
00646       globalexit(1);
00647     }
00648 
00649     shmem_get(cur_msg, cur_node->nxt_addr,
00650               ALIGN8(cur_node->msg_sz)/8, cur_node->nxt_node);
00651 
00652     /*    CmiPrintf("PE %d incoming msg = %d msg_type = %d, size = %d\n",
00653               _Cmi_mype,cur_msg,cur_msg->msg_type,cur_node->msg_sz);*/
00654 
00655     /* If it is a broadcast message, retrieve the actual message */
00656     if (cur_msg->msg_type == BcastMessage)
00657     {
00658 
00659       bcast_msg = true;
00660       bcast_ptr = (McMsgHdr *)CmiAlloc(ALIGN8(cur_msg->bcast_msg_size));
00661       shmem_set_lock(&(bcast_lock[cur_node->nxt_node]));
00662 
00663       /*
00664       CmiPrintf(
00665         "PE %d getting message from node %d at addr %d to %d, size=%d\n",
00666         _Cmi_mype,cur_node->nxt_node,cur_msg->bcast.ptr,bcast_ptr,
00667         cur_msg->bcast_msg_size
00668         );
00669         */
00670       /* Get the message */
00671       shmem_get(bcast_ptr,cur_msg->bcast.ptr,
00672                 ALIGN8(cur_msg->bcast_msg_size)/8,cur_node->nxt_node);
00673       /* Decrement the count, and write it back to the original node. */
00674       /*
00675       CmiPrintf(
00676       "PE %d received broadcast message count=%d size=%d handler=%d\n",
00677       _Cmi_mype,bcast_ptr->bcast.count,
00678       cur_msg->bcast_msg_size,bcast_ptr->handler
00679       );
00680       */
00681 
00682       bcast_ptr->bcast.count--;
00683 
00684       shmem_put(&(cur_msg->bcast.ptr->bcast.count),
00685                 &bcast_ptr->bcast.count,1,cur_node->nxt_node);
00686       shmem_clear_lock(&(bcast_lock[cur_node->nxt_node]));
00687     }
00688     else bcast_msg = false;
00689 
00690     /* Mark the remote message for future deletion */
00691     shmem_put(&(cur_node->nxt_addr->received_f),&received_f,
00692               1, cur_node->nxt_node);
00693 
00694     /* Add to list for reversing */
00695     if (bcast_msg)
00696     {
00697       McQueueAddToBack(received_token_queue,cur_msg);
00698       McQueueAddToBack(tmp_queue,bcast_ptr);
00699     }
00700     else 
00701       McQueueAddToBack(tmp_queue,cur_msg);
00702 
00703     /* Move pointer to next message */
00704     cur_node = &(cur_msg->list_node);
00705   }
00706 
00707   /* 3) Enqueue list into local message queue, in reverse order */
00708   while ((cur_msg = McQueueRemoveFromBack(tmp_queue)) != NULL)  {
00709     McQueueAddToBack(received_queue,cur_msg);
00710   }
00711 
00712   /* 4) Delete broadcast-message tokens */
00713   while ((cur_msg = McQueueRemoveFromBack(received_token_queue)) != NULL)  {
00714     CmiFree(cur_msg);
00715   }
00716   return;
00717 }
00718 
00719 static void McCleanUpInTransit(void)
00720 {
00721   McMsgHdr *msg;
00722   McQueue *swap_ptr;
00723 
00724   /* Check broadcast message queue, to see if messages have been retrieved
00725    */
00726   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(broadcast_queue)) 
00727          != NULL)
00728   {
00729     if (msg->bcast.count == 0)
00730     {
00731       /* 
00732          CmiPrintf("PE %d freeing broadcast message at %d\n",_Cmi_mype,msg);
00733        */
00734       CmiFree(msg);
00735     }
00736     else
00737     {
00738       McQueueAddToBack(broadcast_tmp_queue,msg);
00739     }
00740   }
00741   /*
00742    * swap queues, so tmp_queue is now empty, and in_transit_queue has
00743    * only non-received messages.
00744    */
00745   swap_ptr = broadcast_tmp_queue;
00746   broadcast_tmp_queue = broadcast_queue;
00747   broadcast_queue = swap_ptr;
00748 
00749   /* 
00750    * Free received messages, and move others to tmp_queue.  Similar to
00751    * above
00752    */
00753   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(in_transit_queue)) 
00754          != NULL)
00755   {
00756     if (msg->received_f)
00757     {
00758       CmiFree(msg);
00759     }
00760     else
00761     {
00762       McQueueAddToBack(in_transit_tmp_queue,msg);
00763     }
00764   }
00765   /*
00766    * swap queues, so tmp_queue is now empty, and in_transit_queue has
00767    * only non-received messages.
00768    */
00769   swap_ptr = in_transit_tmp_queue;
00770   in_transit_tmp_queue = in_transit_queue;
00771   in_transit_queue = swap_ptr;
00772 #ifdef DEBUG
00773   CmiPrintf("[%d] done in_transit_queue = %d, tmp_queue = %d\n",
00774         _Cmi_mype,in_transit_queue->len,in_transit_tmp_queue->len);
00775 #endif
00776 }
00777 
00778 /*******************************************************************
00779  * The following internal functions implements FIFO queues for
00780  * messages in the local address space.  This is used for the
00781  * received_queue, the in_transit_queue, and tmp_queue.  Code
00782  * originally comes from the Origin2000 port, with modifications.
00783  */
00784 static void **McQueueAllocBlock(unsigned int len)
00785 {
00786   void ** blk;
00787 
00788   blk=(void **)malloc(len*sizeof(void *));
00789   if(blk==(void **)0) {
00790     CmiError("Cannot Allocate Memory!\n");
00791     abort();
00792   }
00793   return blk;
00794 }
00795 
00796 static void 
00797 McQueueSpillBlock(void **srcblk, void **destblk, 
00798              unsigned int first, unsigned int len)
00799 {
00800   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00801   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00802 }
00803 
00804 static McQueue * McQueueCreate(void)
00805 {
00806   McQueue *queue;
00807 
00808   queue = (McQueue *) malloc(sizeof(McQueue));
00809   if(queue==(McQueue *)0) {
00810     CmiError("Cannot Allocate Memory!\n");
00811     abort();
00812   }
00813   queue->blk = McQueueAllocBlock(BLK_LEN);
00814   queue->blk_len = BLK_LEN;
00815   queue->first = 0;
00816   queue->len = 0;
00817   return queue;
00818 }
00819 
00820 int inside_comm = 0;
00821 
00822 static void McQueueAddToBack(McQueue *queue, void *element)
00823 {
00824   inside_comm = 1;
00825   if(queue->len==queue->blk_len) {
00826     void **blk;
00827 
00828     queue->blk_len *= 3;
00829     blk = McQueueAllocBlock(queue->blk_len);
00830     McQueueSpillBlock(queue->blk, blk, queue->first, queue->len);
00831     free(queue->blk);
00832     queue->blk = blk;
00833     queue->first = 0;
00834   }
00835 #ifdef DEBUG
00836   CmiPrintf("[%d] Adding %x\n",_Cmi_mype,element);
00837 #endif
00838   queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00839   inside_comm = 0;
00840 }
00841 
00842 static void * McQueueRemoveFromBack(McQueue *queue)
00843 {
00844   void *element;
00845   element = (void *) 0;
00846   if(queue->len) {
00847     element = queue->blk[(queue->first+queue->len-1)%queue->blk_len];
00848     queue->len--;
00849   }
00850   return element;
00851 }
00852 
00853 static void * McQueueRemoveFromFront(McQueue *queue)
00854 {
00855   void *element;
00856   element = (void *) 0;
00857   if(queue->len) {
00858     element = queue->blk[queue->first++];
00859     queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00860     queue->len--;
00861   }
00862   return element;
00863 }
00864 
00865 
00866 /* Timer Functions */
00867 
00868 static double clocktick;
00869 static long inittime_wallclock;
00870 static long inittime_virtual;
00871 
00872 void CmiTimerInit()
00873 {
00874   inittime_wallclock = _rtc();
00875   inittime_virtual = cpused();
00876   clocktick = 1.0 / (sysconf(_SC_CLK_TCK));
00877 }
00878 
00879 double CmiWallTimer()
00880 {
00881   long now;
00882 
00883   now = _rtc();
00884   return (clocktick * (now - inittime_wallclock));
00885 }
00886 
00887 double CmiCpuTimer()
00888 {
00889   long now;
00890 
00891   now = cpused();
00892   return (clocktick * (now - inittime_virtual));
00893 }
00894 
00895 double CmiTimer()
00896 {
00897   long now;
00898 
00899   now = _rtc();
00900   return (clocktick * (now - inittime_wallclock));
00901 }
00902 
00903 /*   Memory lock and unlock functions */
00904 /*      --- on T3E, no shared memory and quickthreads are used, so memory */
00905 /*          calls reentrant problem. these are only dummy functions */
00906 
00907 static volatile int memflag;
00908 void CmiMemLock() { memflag=1; }
00909 void CmiMemUnlock() { memflag=0; }
00910 

Generated on Sun Jun 29 13:29:05 2008 for Charm++ by  doxygen 1.5.1