arch/shmem/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/shmem/machine.c,v $
00003  * $Author: gioachin $
00004  * $Date: 2004-10-06 01:28:55 $
00005  * $Revision: 1.6 $
00006  *****************************************************************************/
00007 
00017 #include <stdlib.h>
00018 #include <unistd.h>
00019 #include <malloc.h>
00020 #include "converse.h"
00021 #include CMK_SHMEM_H
00022 
00023 #if !CMK_SHMEM_T3E
00024 #define globalexit exit
00025 #endif 
00026 
00027 /*
00028  *  We require statically allocated variables for locks.  This defines
00029  *  the max number of processors available.
00030  */
00031 #define MAX_PES 2048
00032 
00033 /*
00034  * Some constants
00035  */
00036 enum boolean {false = 0, true = 1};
00037 enum {list_empty = -1 };
00038 
00039 
00040 /*
00041  * Local declarations for Cmi, used by common code
00042  */
00043 CpvDeclare(void*, CmiLocalQueue);
00044 int _Cmi_mype;
00045 int _Cmi_numpes;
00046 int _Cmi_myrank;
00047 
00048 /*
00049  * Local queue functions, used by common code to store messages 
00050  * to my own node efficiently.  These are used when 
00051  * CMK_CMIDELIVERS_USE_COMMON_CODE is true.
00052  */
00053 /*
00054  * Distributed list declarations.  This linked list goes across machines,
00055  * storing all the messages for this node until this processor copies them
00056  * into local memory.
00057  */
00058 typedef struct McDistListS
00059 {
00060   int nxt_node;
00061   struct McMsgHdrS *nxt_addr;
00062   int msg_sz;
00063 } McDistList;
00064 
00065 typedef struct McMsgHdrS
00066 {
00067   McDistList list_node;
00068   enum {Unknown, Message, BcastMessage } msg_type;
00069   enum boolean received_f;
00070   union
00071   {
00072     struct McMsgHdrS *ptr;
00073     int count;
00074   } bcast;
00075   int bcast_msg_size;
00076   int handler;
00077 } McMsgHdr;
00078 
00079 
00080 /*
00081  * Mc functions, used in machine.c only.
00082  */
00083 static void McInit();
00084 static void McInitList();
00085 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe);
00086 static void McRetrieveRemote(void);
00087 static void McCleanUpInTransit(void);
00088 
00089 /*
00090  * These declarations are for a local linked list to hold messages which
00091  * have been copied into local memory.  It is a modified version of the
00092  * Origin2000 code with the locks removed.
00093  */
00094 /* Allocation block size, to reduce num of mallocs */
00095 #define BLK_LEN  512  
00096 
00097 typedef struct McQueueS
00098 {
00099   void     **blk;
00100   unsigned int blk_len;
00101   unsigned int first;
00102   unsigned int len;
00103 } McQueue;
00104 
00105 static McQueue *McQueueCreate(void);
00106 static void McQueueAddToBack(McQueue *queue, void *element);
00107 static void *McQueueRemoveFromFront(McQueue *queue);
00108 static void *McQueueRemoveFromBack(McQueue *queue);
00109 
00110 /*************************************************************
00111  * static variable declarations
00112  */
00113 /*
00114  *  Local queues used for mem management.
00115  *
00116  * These queues hold outgoing messages which will be picked up by
00117  * receiver PEs.  Garbage collection works by scanning the 
00118  * in_transit_queue for messages, freeing delivered ones, and moving
00119  * others to in_transit_tmp_queue.  Then the pointers are swapped,
00120  * so in_transit_queue contains all the undelivered messages, and
00121  * in_transit_tmp_queue is empty.
00122  */
00123 static McQueue *in_transit_queue;
00124 static McQueue *in_transit_tmp_queue;
00125 
00126 /* tmp_queue is used to invert the order of incoming messages */
00127 static McQueue *tmp_queue;  
00128 
00129 /* received_queue holds all the messages which have been moved
00130  * into local memory.  Messages are dequede from here.
00131  */
00132 static McQueue *received_queue;
00133 
00134 /* received_token_queue saves incoming broadcast-message tokens,
00135  * until McRetrieveRemote is done with them.
00136  */
00137 static McQueue *received_token_queue;
00138 
00139 /* outgoing broadcast message queue, holds messages until all receivers have
00140  * picked it up
00141  */
00142 static McQueue *broadcast_queue;
00143 static McQueue *broadcast_tmp_queue;
00144 
00145 /*
00146  * head is the pointer to my next incoming message.
00147  */
00148 static McDistList head;
00149 
00150 /* Static variables are necessary for locks. */
00151 static long *my_lock;
00152 static long head_lock[MAX_PES];
00153 static long bcast_lock[MAX_PES];
00154 
00155 #define ALIGN8(x)  (8*(((x)+7)/8))
00156 
00157 int McChecksum(char *msg, int size)
00158 {
00159   int chksm;
00160   int i;
00161 
00162   chksm=0xff;
00163   for(i=0; i < size; i++)
00164     chksm ^= *(msg+i);
00165   return chksm;
00166 }
00167 
00168 /**********************************************************************
00169  *  CMI Functions START HERE
00170  */
00171 
00172 
00173 /**********************************************************************
00174  * Cmi Message calls.  This implementation uses sync-type sends for
00175  * everything.  An async interface would be efficient, and not difficult
00176  * to add
00177  */
00178 void CmiSyncSendFn(int dest_pe, int size, char *msg)
00179 {
00180   McMsgHdr *dup_msg;
00181 
00182   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00183   memcpy(dup_msg,msg,size);
00184   dup_msg->msg_type = Message;
00185 
00186   McRetrieveRemote();
00187 
00188   if (dest_pe == _Cmi_mype)
00189     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dup_msg);
00190   else
00191   {
00192     McEnqueueRemote(dup_msg,ALIGN8(size),dest_pe); 
00193   }
00194   CQdCreate(CpvAccess(cQdState), 1);
00195 }
00196 
00197 CmiCommHandle CmiAsyncSendFn(int dest_pe, int size, char *msg)
00198 {
00199   CmiSyncSendFn(dest_pe, size, msg);
00200   return 0;
00201 }
00202 
00203 void CmiFreeSendFn(int dest_pe, int size, char *msg)
00204 {
00205   /* No need to copy message, since we will immediately free it */
00206   McRetrieveRemote();
00207   ((McMsgHdr *)msg)->msg_type = Message;
00208 
00209   if (dest_pe == _Cmi_mype)
00210     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00211   else
00212   {
00213     McEnqueueRemote(msg,size,dest_pe); 
00214   }
00215   CQdCreate(CpvAccess(cQdState), 1);
00216 }
00217 
00218 void CmiSyncBroadcastFn(int size, char *msg)
00219 {
00220   int i;
00221   McMsgHdr *dup_msg;
00222   McMsgHdr bcast_msg_tok;
00223   McMsgHdr *dup_tok;
00224   int hdr_size;
00225 
00226   /*
00227    * Copy user's message, and set count to the correct number of recients
00228    */
00229   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00230   memcpy(dup_msg,msg,size);
00231   dup_msg->bcast.count = _Cmi_numpes - 1;
00232   /*
00233   CmiPrintf("PE %d broadcast handler=%d\n",_Cmi_mype,dup_msg->handler);
00234   */
00235   /*
00236    * Make the broadcast token point to the copied message
00237    */
00238   bcast_msg_tok.msg_type = BcastMessage;
00239   bcast_msg_tok.bcast.ptr = dup_msg;
00240   bcast_msg_tok.bcast_msg_size = size;
00241 
00242   hdr_size = ALIGN8(sizeof(McMsgHdr));
00243 
00244   /*
00245    * Enqueue copies of the token message on other nodes.  This code should
00246    * be similar to CmiSyncSend
00247    */
00248   for(i=0; i<_Cmi_numpes; i++)
00249     if (i != _Cmi_mype)
00250     {
00251       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00252       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00253       McEnqueueRemote(dup_tok,hdr_size,i); 
00254     }
00255   /*
00256    * The token message will be deleted as a normal message,
00257    * but the message being broadcast needs to be saved for future
00258    * garbage collection.
00259    */
00260   McQueueAddToBack(broadcast_queue,dup_msg);
00261   CQdCreate(CpvAccess(cQdState), _Cmi_numpes-1);
00262 }
00263 
00264 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00265 {
00266   CmiSyncBroadcastFn(size,msg);
00267   return 0;
00268 }
00269 
00270 void CmiFreeBroadcastFn(int size, char *msg)
00271 {
00272   CmiSyncBroadcastFn(size,msg);
00273   CmiFree(msg);
00274 }
00275 
00276 void CmiSyncBroadcastAllFn(int size, char *msg)
00277 {
00278   int i;
00279   CmiSyncBroadcastFn(size,msg);
00280   CmiSyncSendFn(_Cmi_mype, size, msg);
00281 }
00282 
00283 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00284 {
00285   CmiSyncBroadcastAllFn(size,msg);
00286   return 0;
00287 }
00288 
00289 void CmiFreeBroadcastAllFn(int size, char *msg)
00290 {
00291   CmiSyncBroadcastAllFn(size,msg);
00292   CmiFree(msg);
00293 }
00294 
00295 
00296 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg)
00297 {
00298   int i;
00299   McMsgHdr *dup_msg;
00300   McMsgHdr bcast_msg_tok;
00301   McMsgHdr *dup_tok;
00302   int hdr_size;
00303   int n_remote_pes;
00304 
00305   /*
00306    * Count how many remote PEs, and send to the local PE if it is in the list
00307    */
00308   /*
00309   CmiPrintf("CmiSyncListSendFn: size %d handler %d\n",
00310             size,((McMsgHdr *)msg)->handler);
00311   CmiPrintf("CmiSyncListSendFn: size %d npes %d\n",size,npes);
00312   */
00313   n_remote_pes = 0;
00314   for (i=0; i < npes; i++)
00315   {
00316     if (pes[i] == _Cmi_mype)
00317       CmiSyncSendFn(_Cmi_mype, size, msg);
00318     else
00319       n_remote_pes++;
00320   }
00321   if (n_remote_pes == 0)  // Nothing to do
00322     return;
00323   
00324   /*
00325    * Copy user's message, and set count to the correct number of recients
00326    */
00327   dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00328   memcpy(dup_msg,msg,size);
00329   dup_msg->bcast.count = n_remote_pes;
00330   /*
00331    * Make the broadcast token point to the copied message
00332    */
00333   bcast_msg_tok.msg_type = BcastMessage;
00334   bcast_msg_tok.bcast.ptr = dup_msg;
00335   bcast_msg_tok.bcast_msg_size = size;
00336 
00337   hdr_size = ALIGN8(sizeof(McMsgHdr));
00338 
00339   /*
00340    * Enqueue copies of the token message on other nodes.  This code should
00341    * be similar to CmiSyncSend
00342    */
00343   for(i=0; i<npes; i++)
00344     if (pes[i] != _Cmi_mype)
00345     {
00346       dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00347       memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00348       McEnqueueRemote(dup_tok,hdr_size,pes[i]); 
00349       CQdCreate(CpvAccess(cQdState), 1);
00350     }
00351   /*
00352    * The token message will be deleted as a normal message,
00353    * but the message being broadcast needs to be saved for future
00354    * garbage collection.
00355    */
00356   McQueueAddToBack(broadcast_queue,dup_msg);
00357 }
00358 
00359 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg)
00360 {
00361   CmiSyncListSendFn(npes, pes, size, msg);
00362   return 0;
00363 }
00364 
00365 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg)
00366 {
00367   CmiSyncListSendFn(npes,pes,size,msg);
00368   CmiFree(msg);
00369 }
00370 
00371 typedef struct {
00372   char header[CmiMsgHeaderSizeBytes];
00373   CmiGroup grp;
00374   int size;
00375   char *user_msg;
00376 } McMultiMsg;
00377 
00378 CpvDeclare(int,McMulticastWaitHandler);
00379 
00380 void CmiSyncMulticastFn(CmiGroup grp, int size, char *msg)
00381 {
00382   int npes;
00383   int *pes;
00384   McMultiMsg multi_msg;
00385   
00386   /*
00387   CmiPrintf("CmiSyncMulticastFn: size %d handler %d\n",
00388             size,((McMsgHdr *)msg)->handler);
00389    */
00390   /*
00391    *  Check for group, and busy-wait, if necessary, for group info
00392    */
00393   CmiLookupGroup(grp, &npes, &pes);
00394   if (pes != 0)
00395     CmiSyncListSendFn( npes, pes, size, msg);
00396   else
00397   {
00398     multi_msg.grp = grp;
00399     multi_msg.size = size;
00400     multi_msg.user_msg = (char *) CmiAlloc(ALIGN8(size));
00401     memcpy(multi_msg.user_msg,msg,size);
00402 
00403     CmiSetHandler(&multi_msg,CpvAccess(McMulticastWaitHandler));
00404     CmiSyncSendFn(CmiMyPe(),sizeof(McMultiMsg),(char *)&multi_msg);
00405   }
00406 }
00407 
00408 void McMulticastWaitFn(McMultiMsg *msg)
00409 {
00410   CmiFreeMulticastFn(msg->grp,msg->size,msg->user_msg);
00411 }
00412 
00413 void CmiMulticastInit(void)
00414 {
00415   CpvInitialize(int,McMulticastWaitHandler);
00416   CpvAccess(McMulticastWaitHandler) = CmiRegisterHandler(McMulticastWaitFn);
00417 }
00418 
00419 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int size, char *msg)
00420 {
00421   CmiSyncMulticastFn(grp, size, msg);
00422   return 0;
00423 }
00424 
00425 void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
00426 {
00427   CmiSyncMulticastFn(grp, size, msg);
00428   CmiFree(msg);
00429 }
00430 
00431 
00432 /***********************************************************************
00433  *
00434  * Abort function:
00435  *
00436  ************************************************************************/
00437 
00438 void CmiAbort(const char *message)
00439 {
00440   CmiError(message);
00441   globalexit(1);
00442 }
00443 
00444 /**********************************************************************
00445  * CMI utility functions for startup, shutdown, and other miscellaneous
00446  * activities.
00447  */
00448 
00449 /*
00450  * This port uses the common CmiDeliver code, so we only provide
00451  * CmiGetNonLocal()
00452  */
00453 void *CmiGetNonLocal()
00454 {
00455   McRetrieveRemote();
00456 
00457   return (void *)McQueueRemoveFromFront(received_queue);
00458 }
00459 
00460 void 
00461 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00462 {
00463   McInit();
00464   CthInit(argv);
00465   ConverseCommonInit(argv);
00466   for(argc=0;argv[argc];argc++);
00467   if (initret==0)
00468   {
00469     fn(argc,argv);
00470     if (usched==0) CsdScheduler(-1);
00471     ConverseExit();
00472   }
00473 }
00474 
00475 void ConverseExit()
00476 {
00477 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00478   if (CmiMyPe() == 0){
00479     CmiPrintf("End of program\n");
00480   }
00481 #endif
00482   ConverseCommonExit();
00483   exit(0);
00484 }
00485 
00486 void CmiNotifyIdle(void)
00487 {
00488   /* Use this opportunity to clean up the in_transit_queue */
00489   McCleanUpInTransit();
00490 }
00491 
00492 /* lock
00493 */
00494 
00495 #if CMK_SHMEM_LOCK
00496 
00497 void set_lock(long *lock, int pe)
00498 {
00499   while (shmem_long_swap(lock, 1L, pe)) {
00500     //shmem_long_wait(&tmp, 1);
00501   }
00502 //printf("set lock end: %ld. \n", *lock);
00503 }
00504 
00505 void clear_lock(long *lock, int pe)
00506 {
00507 //printf("[%d] clear lock on %d %ld. \n", _Cmi_mype, pe, *lock);
00508   shmem_long_swap(lock, 0L, pe);
00509 //  shmem_long_p(lock, 0L, pe);
00510 //printf("clear lock end lock:%ld\n", *lock);
00511 }
00512 
00513 #else
00514 #define set_lock(lock, pe)  shmem_set_lock(lock)
00515 #define clear_lock(lock, pe)  shmem_clear_lock(lock)
00516 #endif
00517 
00518 /**********************************************************************
00519  * Mc Functions:
00520  * Mc functions are used internally in machine.c only
00521  */
00522 
00523 static void McInit(void)
00524 {
00525   CMK_SHMEM_INIT;
00526 
00527   CpvInitialize(void *, CmiLocalQueue);
00528   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00529   _Cmi_mype = _my_pe();
00530   _Cmi_numpes = _num_pes();
00531   _Cmi_myrank = 0;
00532 
00533   McInitList();
00534 }
00535 
00536 static void McInitList(void)
00537 {
00538   int i;
00539 
00540   received_queue = McQueueCreate();
00541   tmp_queue = McQueueCreate();
00542   received_token_queue = McQueueCreate();
00543   broadcast_queue = McQueueCreate();
00544   broadcast_tmp_queue = McQueueCreate();
00545   in_transit_tmp_queue = McQueueCreate();
00546   in_transit_queue = McQueueCreate();
00547 
00548   head.nxt_node = list_empty;
00549   head.nxt_addr = NULL;
00550   head.msg_sz = 0;
00551   if (_Cmi_numpes > MAX_PES)
00552   {
00553     CmiPrintf("Not enough processors allocated in machine.c.\n");
00554     CmiPrintf("Change MAX_PES in t3e/machine.c to at least %d and recompile Converse\n",
00555     _Cmi_numpes);
00556   }
00557   for(i=0; i < _Cmi_numpes; i++)
00558   {
00559     head_lock[i] = 0;
00560     bcast_lock[i] = 0;
00561   }
00562   my_lock = &(head_lock[_Cmi_mype]);
00563   shmem_barrier_all();
00564 /*
00565   clear_lock(my_lock, _Cmi_mype);
00566   clear_lock(&bcast_lock[_Cmi_mype], _Cmi_mype);
00567 */
00568 }
00569 
00570 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
00571 {
00572  /*
00573   * To enqueue on a remote node, we should:
00574   * 0. Free any delivered messages from the message_in_transit list.
00575   * 1. Add message in the "message_in_transit" list
00576   * 2. Fill in the fields in the message header
00577   * 3. Lock the head pointer on the remote node.
00578   * 4. Swap the list pointer with that on the other node.
00579   * 5. Release lock
00580   */
00581 
00582   McDistList tmp_link;
00583   McDistList *msg_link;
00584 
00585   /*  CmiPrintf("PE %d outgoing msg = %d msg_type = %d size = %d dst_pe=%d\n",
00586             _Cmi_mype,msg,((McMsgHdr *)msg)->msg_type,msg_sz, dst_pe); */
00587   /* 0. Free any delivered messages from the in_transit_queue list. */
00588   McCleanUpInTransit();
00589 
00590   /* 1. Add message in the "in_transit_queue" list */
00591   McQueueAddToBack(in_transit_queue,msg);
00592 
00593   /* 2. Fill in the fields in the message header */
00594   msg_link = &(((McMsgHdr *)msg)->list_node);
00595   ((McMsgHdr *)msg)->received_f = false;
00596 
00597   /* Set list fields to point back to this processor, this message.  */
00598   tmp_link.nxt_node = _Cmi_mype;
00599   tmp_link.nxt_addr = msg;
00600   tmp_link.msg_sz = msg_sz;
00601 
00602   /* 3. Lock the head pointer on the remote node.
00603      Acquire lock on the destination queue.  If locks turn oout to
00604      be inefficient, use fetch and increment to imp. lock
00605    */
00606 
00607   set_lock(&head_lock[dst_pe], dst_pe);
00608 
00609 
00610   /* 4. Swap the list pointer with that on the other node.
00611    */
00612   /* First, get current head pointer, and stick it in this 
00613    * message data area.
00614    */
00615   shmem_int_get((int*)msg_link, (int*)&head, sizeof(McDistList)/sizeof(int), dst_pe);
00616   /* Next, write the new message into the top of the list */
00617   shmem_int_put((int*)&head, (int*)&tmp_link, sizeof(McDistList)/sizeof(int),dst_pe);
00618   shmem_quiet();
00619 
00620 #ifdef DEBUG
00621   printf("[%d] Adding Message to pe %d\n",_Cmi_mype,dst_pe);
00622   printf("[%d]   nxt_node = %d\n",_Cmi_mype,tmp_link.nxt_node);
00623   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,tmp_link.nxt_addr);
00624   printf("[%d]   msg_sz = %x\n",_Cmi_mype,tmp_link.msg_sz);
00625   printf("[%d] Old Message is now at %x\n",_Cmi_mype,msg_link);
00626   printf("[%d]   nxt_node = %d\n",_Cmi_mype,msg_link->nxt_node);
00627   printf("[%d]   nxt_addr = %x\n",_Cmi_mype,msg_link->nxt_addr);
00628   printf("[%d]   msg_sz = %x\n",_Cmi_mype,msg_link->msg_sz);
00629 #endif
00630 
00631   /* 5. Release lock */
00632   clear_lock(&head_lock[dst_pe], dst_pe);
00633 }
00634 
00635 static void McRetrieveRemote(void)
00636 {
00637   /*
00638    * The local host should retrieve messages from the distributed list
00639    * and put them in local memory, in a messages queue.
00640    * Steps:
00641    * 0) Lock list pointer.
00642    * 1) Replace list pointer with NULL and unlock list
00643    * 2) Get each message into local memory
00644    * 3) Enqueue list into local message queue, in reverse order
00645    */
00646 
00647   McDistList list_head;
00648   McDistList *cur_node;
00649   McMsgHdr *cur_msg; 
00650   int received_f;
00651   enum boolean bcast_msg;
00652   McMsgHdr *bcast_ptr;
00653 
00654   /* Get the head of the list */
00655 
00656   if (head.nxt_node == list_empty)  /* apparently there are no messages */
00657     return;
00658 
00659   /* 0) Lock list pointer. */
00660   set_lock(my_lock, _Cmi_mype);
00661 
00662   /* 1) Replace list pointer with NULL and unlock list */
00663   list_head = head;
00664   head.nxt_node = list_empty;
00665   head.nxt_addr = NULL;
00666   head.msg_sz = 0;
00667   clear_lock(my_lock, _Cmi_mype);
00668 
00669   /* 2) Get each message into local memory
00670    * Start copying the messages into local memory, putting messages into
00671    * a local list for future reversing.
00672    */
00673   cur_node = &list_head;
00674   received_f = true;
00675 
00676   while (cur_node->nxt_node != list_empty)
00677   {
00678     cur_msg = (McMsgHdr *)CmiAlloc(ALIGN8(cur_node->msg_sz));
00679     if (cur_msg ==NULL)
00680     {
00681       CmiError("%s:%d Cannot Allocate Memory\n",__FILE__,__LINE__);
00682       globalexit(1);
00683     }
00684 
00685     shmem_get64((long*)cur_msg, (long*)cur_node->nxt_addr,
00686               ALIGN8(cur_node->msg_sz)/8, cur_node->nxt_node);
00687 
00688     /*    CmiPrintf("PE %d incoming msg = %d msg_type = %d, size = %d\n",
00689               _Cmi_mype,cur_msg,cur_msg->msg_type,cur_node->msg_sz);*/
00690 
00691     /* If it is a broadcast message, retrieve the actual message */
00692     if (cur_msg->msg_type == BcastMessage)
00693     {
00694 
00695       bcast_msg = true;
00696       bcast_ptr = (McMsgHdr *)CmiAlloc(ALIGN8(cur_msg->bcast_msg_size));
00697       set_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
00698 
00699       /*
00700       CmiPrintf(
00701         "PE %d getting message from node %d at addr %d to %d, size=%d\n",
00702         _Cmi_mype,cur_node->nxt_node,cur_msg->bcast.ptr,bcast_ptr,
00703         cur_msg->bcast_msg_size
00704         );
00705         */
00706       /* Get the message */
00707       shmem_get64((long*)bcast_ptr,(long*)cur_msg->bcast.ptr,
00708                 ALIGN8(cur_msg->bcast_msg_size)/8,cur_node->nxt_node);
00709       /* Decrement the count, and write it back to the original node. */
00710       /*
00711       CmiPrintf(
00712       "PE %d received broadcast message count=%d size=%d handler=%d\n",
00713       _Cmi_mype,bcast_ptr->bcast.count,
00714       cur_msg->bcast_msg_size,bcast_ptr->handler
00715       );
00716       */
00717 
00718       bcast_ptr->bcast.count--;
00719 
00720       shmem_int_put(&(cur_msg->bcast.ptr->bcast.count),
00721                 &bcast_ptr->bcast.count,1,cur_node->nxt_node);
00722       shmem_quiet();
00723       clear_lock(&bcast_lock[cur_node->nxt_node], cur_node->nxt_node);
00724     }
00725     else bcast_msg = false;
00726 
00727     /* Mark the remote message for future deletion */
00728     shmem_int_put(&(cur_node->nxt_addr->received_f),&received_f,
00729               1, cur_node->nxt_node);
00730     shmem_quiet();
00731 
00732     /* Add to list for reversing */
00733     if (bcast_msg)
00734     {
00735       McQueueAddToBack(received_token_queue,cur_msg);
00736       McQueueAddToBack(tmp_queue,bcast_ptr);
00737     }
00738     else 
00739       McQueueAddToBack(tmp_queue,cur_msg);
00740 
00741     /* Move pointer to next message */
00742     cur_node = &(cur_msg->list_node);
00743   }
00744 
00745   /* 3) Enqueue list into local message queue, in reverse order */
00746   while ((cur_msg = McQueueRemoveFromBack(tmp_queue)) != NULL)  {
00747     McQueueAddToBack(received_queue,cur_msg);
00748   }
00749 
00750   /* 4) Delete broadcast-message tokens */
00751   while ((cur_msg = McQueueRemoveFromBack(received_token_queue)) != NULL)  {
00752     CmiFree(cur_msg);
00753   }
00754   return;
00755 }
00756 
00757 static void McCleanUpInTransit(void)
00758 {
00759   McMsgHdr *msg;
00760   McQueue *swap_ptr;
00761 
00762   /* Check broadcast message queue, to see if messages have been retrieved
00763    */
00764   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(broadcast_queue)) 
00765          != NULL)
00766   {
00767     if (msg->bcast.count == 0)
00768     {
00769       /* 
00770          CmiPrintf("PE %d freeing broadcast message at %d\n",_Cmi_mype,msg);
00771        */
00772       CmiFree(msg);
00773     }
00774     else
00775     {
00776       McQueueAddToBack(broadcast_tmp_queue,msg);
00777     }
00778   }
00779   /*
00780    * swap queues, so tmp_queue is now empty, and in_transit_queue has
00781    * only non-received messages.
00782    */
00783   swap_ptr = broadcast_tmp_queue;
00784   broadcast_tmp_queue = broadcast_queue;
00785   broadcast_queue = swap_ptr;
00786 
00787   /* 
00788    * Free received messages, and move others to tmp_queue.  Similar to
00789    * above
00790    */
00791   while ((msg = (McMsgHdr *)McQueueRemoveFromFront(in_transit_queue)) 
00792          != NULL)
00793   {
00794     if (msg->received_f)
00795     {
00796       CmiFree(msg);
00797     }
00798     else
00799     {
00800       McQueueAddToBack(in_transit_tmp_queue,msg);
00801     }
00802   }
00803   /*
00804    * swap queues, so tmp_queue is now empty, and in_transit_queue has
00805    * only non-received messages.
00806    */
00807   swap_ptr = in_transit_tmp_queue;
00808   in_transit_tmp_queue = in_transit_queue;
00809   in_transit_queue = swap_ptr;
00810 #ifdef DEBUG
00811   CmiPrintf("[%d] done in_transit_queue = %d, tmp_queue = %d\n",
00812         _Cmi_mype,in_transit_queue->len,in_transit_tmp_queue->len);
00813 #endif
00814 }
00815 
00816 /*******************************************************************
00817  * The following internal functions implements FIFO queues for
00818  * messages in the local address space.  This is used for the
00819  * received_queue, the in_transit_queue, and tmp_queue.  Code
00820  * originally comes from the Origin2000 port, with modifications.
00821  */
00822 static void **McQueueAllocBlock(unsigned int len)
00823 {
00824   void ** blk;
00825 
00826   blk=(void **)malloc(len*sizeof(void *));
00827   if(blk==(void **)0) {
00828     CmiError("Cannot Allocate Memory!\n");
00829     abort();
00830   }
00831   return blk;
00832 }
00833 
00834 static void 
00835 McQueueSpillBlock(void **srcblk, void **destblk, 
00836              unsigned int first, unsigned int len)
00837 {
00838   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00839   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00840 }
00841 
00842 static McQueue * McQueueCreate(void)
00843 {
00844   McQueue *queue;
00845 
00846   queue = (McQueue *) malloc(sizeof(McQueue));
00847   if(queue==(McQueue *)0) {
00848     CmiError("Cannot Allocate Memory!\n");
00849     abort();
00850   }
00851   queue->blk = McQueueAllocBlock(BLK_LEN);
00852   queue->blk_len = BLK_LEN;
00853   queue->first = 0;
00854   queue->len = 0;
00855   return queue;
00856 }
00857 
00858 int inside_comm = 0;
00859 
00860 static void McQueueAddToBack(McQueue *queue, void *element)
00861 {
00862   inside_comm = 1;
00863   if(queue->len==queue->blk_len) {
00864     void **blk;
00865 
00866     queue->blk_len *= 3;
00867     blk = McQueueAllocBlock(queue->blk_len);
00868     McQueueSpillBlock(queue->blk, blk, queue->first, queue->len);
00869     free(queue->blk);
00870     queue->blk = blk;
00871     queue->first = 0;
00872   }
00873 #ifdef DEBUG
00874   CmiPrintf("[%d] Adding %x\n",_Cmi_mype,element);
00875 #endif
00876   queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00877   inside_comm = 0;
00878 }
00879 
00880 static void * McQueueRemoveFromBack(McQueue *queue)
00881 {
00882   void *element;
00883   element = (void *) 0;
00884   if(queue->len) {
00885     element = queue->blk[(queue->first+queue->len-1)%queue->blk_len];
00886     queue->len--;
00887   }
00888   return element;
00889 }
00890 
00891 static void * McQueueRemoveFromFront(McQueue *queue)
00892 {
00893   void *element;
00894   element = (void *) 0;
00895   if(queue->len) {
00896     element = queue->blk[queue->first++];
00897     queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00898     queue->len--;
00899   }
00900   return element;
00901 }
00902 
00903 
00904 /* Timer Functions */
00905 #if 0
00906 
00907 static double clocktick;
00908 static long inittime_wallclock;
00909 static long inittime_virtual;
00910 
00911 void CmiTimerInit()
00912 {
00913   inittime_wallclock = _rtc();
00914   inittime_virtual = cpused();
00915   clocktick = 1.0 / (sysconf(_SC_CLK_TCK));
00916 }
00917 
00918 double CmiWallTimer()
00919 {
00920   long now;
00921 
00922   now = _rtc();
00923   return (clocktick * (now - inittime_wallclock));
00924 }
00925 
00926 double CmiCpuTimer()
00927 {
00928   long now;
00929 
00930   now = cpused();
00931   return (clocktick * (now - inittime_virtual));
00932 }
00933 
00934 double CmiTimer()
00935 {
00936   long now;
00937 
00938   now = _rtc();
00939   return (clocktick * (now - inittime_wallclock));
00940 }
00941 
00942 #elif CMK_TIMER_USE_CLOCK_GETTIME
00943 
00944 CpvStaticDeclare(double,inittime_wallclock);
00945 CpvStaticDeclare(double,inittime_virtual);
00946 
00947 void CmiTimerInit(void)
00948 {
00949   struct timespec temp;
00950   CpvInitialize(double, inittime_wallclock);
00951   CpvInitialize(double, inittime_virtual);
00952   clock_gettime(CLOCK_SGI_CYCLE, &temp);
00953   CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
00954                                   1e-9 * temp.tv_nsec;
00955   CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
00956 }
00957 
00958 double CmiWallTimer(void)
00959 {
00960   struct timespec temp;
00961   double currenttime;
00962 
00963   clock_gettime(CLOCK_SGI_CYCLE, &temp); 
00964   currenttime = (double) temp.tv_sec +
00965                 1e-9 * temp.tv_nsec;
00966   return (currenttime - CpvAccess(inittime_wallclock));
00967 }
00968 
00969 double CmiCpuTimer(void)
00970 {
00971   struct timespec temp;
00972   double currenttime;
00973 
00974   clock_gettime(CLOCK_SGI_CYCLE, &temp);
00975   currenttime = (double) temp.tv_sec +
00976                 1e-9 * temp.tv_nsec;
00977   return (currenttime - CpvAccess(inittime_virtual));
00978 }
00979 
00980 double CmiTimer(void)
00981 {
00982   return CmiCpuTimer();
00983 }              
00984 #endif
00985 
00986 /*   Memory lock and unlock functions */
00987 /*      --- on T3E, no shared memory and quickthreads are used, so memory */
00988 /*          calls reentrant problem. these are only dummy functions */
00989 
00990 static volatile int memflag;
00991 void CmiMemLock() { memflag=1; }
00992 void CmiMemUnlock() { memflag=0; }
00993 

Generated on Sun Jun 29 13:29:05 2008 for Charm++ by  doxygen 1.5.1