arch/net/machine-pxshm.c

Go to the documentation of this file.
00001 
00031 #include <sys/types.h>
00032 #include <sys/mman.h>
00033 #include <unistd.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 #include <errno.h>
00037 
00038 /************** 
00039    Determine which type of synchronization to use 
00040 */
00041 #if PXSHM_OSSPINLOCK
00042 #include <libkern/OSAtomic.h>
00043 #elif PXSHM_LOCK
00044 #include <semaphore.h>
00045 #else
00046 /* Default to using fences */
00047 #define PXSHM_FENCE 1
00048 #endif
00049 
00050 
00051 #define MEMDEBUG(x) //x
00052 
00053 #define PXSHM_STATS 0
00054 
00055 
00056 /*** The following code was copied verbatim from pcqueue.h file ***/
00057 #if PXSHM_FENCE
00058 #ifdef POWER_PC
00059 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00060 #else
00061 #define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
00062 //#define CmiMemoryWriteFence(startPtr,nBytes) 
00063 #endif
00064 #else
00065 #define CmiMemoryWriteFence(startPtr,nBytes)  
00066 #endif
00067 
00068 #if PXSHM_FENCE
00069 #ifdef POWER_PC
00070 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
00071 #else
00072 #define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
00073 //#define CmiMemoryReadFence(startPtr,nBytes) 
00074 #endif
00075 #else
00076 #define CmiMemoryReadFence(startPtr,nBytes) 
00077 #endif
00078 
00079 /***************************************************************************************/
00080 
00081 enum entities {SENDER,RECEIVER};
00082 
00083 /************************
00084  *      Implementation currently assumes that
00085  *      1) all nodes have the same number of processors
00086  *  2) in the nodelist all processors in a node are listed in sequence
00087  *   0 1 2 3      4 5 6 7 
00088  *   -------      -------
00089  *    node 1       node 2 
00090  ************************/
00091 
00092 #define NAMESTRLEN 50
00093 #define PREFIXSTRLEN 30 
00094 
00095 #define SHMBUFLEN 1000000
00096 
00097 #define SENDQSTARTSIZE 128
00098 
00099 
00101 typedef struct {
00102         int count; //number of messages
00103         int bytes; //number of bytes
00104 
00105 #if PXSHM_OSSPINLOCK
00106         OSSpinLock lock;
00107 #endif
00108 
00109 #if PXSHM_FENCE
00110         volatile int flagSender;
00111         volatile int flagReceiver;
00112         volatile int turn;
00113 #endif  
00114 
00115 } sharedBufHeader;
00116 
00117 
00118 typedef struct {
00119 #if PXSHM_LOCK
00120         sem_t *mutex;
00121 #endif
00122         sharedBufHeader *header;        
00123         char *data;
00124 } sharedBufData;
00125 
00126 typedef struct {
00127         int size; //total size of data array
00128         int begin; //position of first element
00129         int end;        //position of next element
00130         int numEntries; //number of entries
00131 
00132         OutgoingMsg *data;
00133 
00134 } PxshmSendQ;
00135 
00136 typedef struct {
00137         int nodesize;
00138         int noderank;
00139         int nodestart,nodeend;//proc numbers for the start and end of this node
00140         char prefixStr[PREFIXSTRLEN];
00141         char **recvBufNames;
00142         char **sendBufNames;
00143 
00144         sharedBufData *recvBufs;
00145         sharedBufData *sendBufs;
00146 
00147         PxshmSendQ **sendQs;
00148 
00149 
00150 #if PXSHM_STATS
00151         int sendCount;
00152         int validCheckCount;
00153         int lockRecvCount;
00154         double validCheckTime;
00155         double sendTime;
00156         double commServerTime;
00157 #endif
00158 
00159 } PxshmContext;
00160 
00161 
00162 
00163 PxshmContext *pxshmContext=NULL; //global context
00164 
00165 
00166 void calculateNodeSizeAndRank(char **);
00167 void setupSharedBuffers();
00168 void initAllSendQs();
00169 
00170 /******************
00171  *      Initialization routine
00172  *      currently just testing start up
00173  * ****************/
00174 void CmiInitPxshm(char **argv){
00175         MACHSTATE(3,"CminitPxshm start");
00176         pxshmContext = (PxshmContext *)malloc(sizeof(PxshmContext));
00177 
00178         if(Cmi_charmrun_pid <= 0){
00179                 CmiAbort("pxshm must be run with charmrun");
00180         }
00181         calculateNodeSizeAndRank(argv);
00182         if(pxshmContext->nodesize == 1){
00183                 return;
00184         }
00185         
00186         MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
00187 
00188         snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
00189 
00190 
00191         MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00192 
00193         setupSharedBuffers();
00194 
00195         MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
00196 
00197 
00198         initAllSendQs();
00199         
00200         MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
00201 
00202         MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
00203 
00204 
00205 #if PXSHM_STATS
00206         pxshmContext->sendCount=0;
00207         pxshmContext->sendTime=0.0;
00208         pxshmContext->validCheckCount=0;
00209         pxshmContext->validCheckTime=0.0;
00210         pxshmContext->commServerTime = 0;
00211         pxshmContext->lockRecvCount = 0;
00212 #endif
00213 
00214 };
00215 
00216 /**************
00217  * shutdown shmem objects and semaphores
00218  *
00219  * *******************/
00220 void tearDownSharedBuffers();
00221 
00222 void CmiExitPxshm(){
00223         int i=0;
00224         
00225         if(pxshmContext->nodesize != 1){
00226                 tearDownSharedBuffers();
00227         
00228                 for(i=0;i<pxshmContext->nodesize;i++){
00229                         if(i != pxshmContext->noderank){
00230                                 break;
00231                         }
00232                         free(pxshmContext->recvBufNames[i]);
00233                         free(pxshmContext->sendBufNames[i]);
00234                 }
00235                 free(pxshmContext->recvBufNames);
00236                 free(pxshmContext->sendBufNames);
00237 
00238                 free(pxshmContext->recvBufs);
00239                 free(pxshmContext->sendBufs);
00240 
00241         }
00242 #if PXSHM_STATS
00243 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
00244 #endif
00245         free(pxshmContext);
00246 }
00247 
00248 /******************
00249  *Should this message be sent using PxShm or not ?
00250  * ***********************/
00251 
00252 inline int CmiValidPxshm(OutgoingMsg ogm, OtherNode node){
00253 #if PXSHM_STATS
00254         pxshmContext->validCheckCount++;
00255 #endif
00256 
00257 /*      if(pxshmContext->nodesize == 1){
00258                 return 0;
00259         }*/
00260         //replace by bitmap later
00261         if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
00262                 return 1;
00263         }else{
00264                 return 0;
00265         }
00266 };
00267 
00268 
00269 inline int PxshmRank(int dst){
00270         return dst - pxshmContext->nodestart;
00271 }
00272 inline void pushSendQ(PxshmSendQ *q,OutgoingMsg msg);
00273 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
00274 inline int flushSendQ(int dstRank);
00275 
00276 /***************
00277  *
00278  *Send this message through shared memory
00279  *if you cannot get lock, put it in the sendQ
00280  *Before sending messages pick them from sendQ
00281  *
00282  * ****************************/
00283 
00284 void CmiSendMessagePxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
00285 
00286         
00287 #if PXSHM_STATS
00288         double _startSendTime = CmiWallTimer();
00289 #endif
00290 
00291         
00292         int dstRank = PxshmRank(ogm->dst);
00293         MEMDEBUG(CmiMemoryCheck());
00294   
00295         DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
00296         
00297   
00298         MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00299 
00300         CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
00301         
00302         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00303 
00304 
00305 #if PXSHM_OSSPINLOCK
00306         if(! OSSpinLockTry(&dstBuf->header->lock)){
00307 #elif PXSHM_LOCK
00308         if(sem_trywait(dstBuf->mutex) < 0){
00309 #elif PXSHM_FENCE
00310         dstBuf->header->flagSender = 1;
00311         dstBuf->header->turn = RECEIVER;
00312         CmiMemoryReadFence(0,0);
00313         CmiMemoryWriteFence(0,0);
00314         if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
00315           dstBuf->header->flagSender = 0;
00316 #endif
00317 
00321                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
00322                 ogm->refcount++;
00323                 MEMDEBUG(CmiMemoryCheck());
00324                 return;
00325         }else{
00326 
00327                 /***
00328                  * We got the lock for this buffer
00329                  * first write all the messages in the sendQ and then write this guy
00330                  * */
00331                  if(pxshmContext->sendQs[dstRank]->numEntries == 0){
00332                    // send message user event
00333                                 int ret = sendMessage(ogm,dstBuf,pxshmContext->sendQs[dstRank]);
00334                                 MACHSTATE(3,"Pxshm Send succeeded immediately");
00335                  }else{
00336                                 ogm->refcount+=2;/*this message should not get deleted when the queue is flushed*/
00337                                 pushSendQ(pxshmContext->sendQs[dstRank],ogm);
00338                                 MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
00339                                 int sent = flushSendQ(dstRank);
00340                                 ogm->refcount--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
00341                                 MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
00342                  }
00343                  /* unlock the recvbuffer*/
00344 
00345 #if PXSHM_OSSPINLOCK
00346                  OSSpinLockUnlock(&dstBuf->header->lock);
00347 #elif PXSHM_LOCK
00348                  sem_post(dstBuf->mutex);
00349 #elif PXSHM_FENCE
00350                  CmiMemoryReadFence(0,0);                       
00351                  CmiMemoryWriteFence(0,0);
00352                  dstBuf->header->flagSender = 0;
00353 #endif
00354         }
00355 #if PXSHM_STATS
00356                 pxshmContext->sendCount ++;
00357                 pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
00358 #endif
00359         MEMDEBUG(CmiMemoryCheck());
00360 
00361 };
00362 
00363 inline void emptyAllRecvBufs();
00364 inline void flushAllSendQs();
00365 
00366 /**********
00367  * Extract all the messages from the recvBuffers you can
00368  * Flush all sendQs
00369  * ***/
00370 inline void CommunicationServerPxshm(){
00371         
00372 #if PXSHM_STATS
00373         double _startCommServerTime =CmiWallTimer();
00374 #endif  
00375         
00376         MEMDEBUG(CmiMemoryCheck());
00377         emptyAllRecvBufs();
00378         flushAllSendQs();
00379 
00380 #if PXSHM_STATS
00381         pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00382 #endif
00383 
00384         MEMDEBUG(CmiMemoryCheck());
00385 };
00386 
00387 static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
00388         CommunicationServerPxshm();
00389 }
00390 
00391 
00392 static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
00393 {
00394         CmiNotifyStillIdle(s);
00395 }
00396 
00397 
00398 void calculateNodeSizeAndRank(char **argv){
00399         pxshmContext->nodesize=1;
00400         MACHSTATE(3,"calculateNodeSizeAndRank start");
00401         //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
00402         CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
00403         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
00404 
00405         pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
00406         
00407         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
00408         
00409         pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
00410         
00411         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00412 
00413         pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
00414 
00415         if(pxshmContext->nodeend >= _Cmi_numnodes){
00416                 pxshmContext->nodeend = _Cmi_numnodes-1;
00417                 pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
00418         }
00419         
00420         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
00421 }
00422 
00423 void allocBufNameStrings(char ***bufName);
00424 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
00425 /***************
00426  *      calculate the name of the shared objects and semaphores
00427  *      
00428  *      name scheme
00429  *      shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
00430  *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
00431  *                the semaphore name used by us is the same as the shared memory object name
00432  *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
00433  *
00434  *      open these shared objects and semaphores
00435  * *********/
00436 void setupSharedBuffers(){
00437         int i=0;
00438 
00439         allocBufNameStrings(&(pxshmContext->recvBufNames));
00440         
00441         MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
00442         MEMDEBUG(CmiMemoryCheck());
00443 
00444         allocBufNameStrings((&pxshmContext->sendBufNames));
00445         
00446         MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
00447 
00448         for(i=0;i<pxshmContext->nodesize;i++){
00449                 if(i != pxshmContext->noderank){
00450                         snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
00451                         MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
00452                         snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
00453                         MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
00454                 }
00455         }
00456         
00457         createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
00458         createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
00459         
00460         for(i=0;i<pxshmContext->nodesize;i++){
00461                 if(i != pxshmContext->noderank){
00462                         CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
00463                         pxshmContext->sendBufs[i].header->count = 0;
00464                         pxshmContext->sendBufs[i].header->bytes = 0;
00465                 }
00466         }
00467 }
00468 
00469 void allocBufNameStrings(char ***bufName){
00470         int i,count;
00471         
00472         int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
00473         char *tmp = malloc(totalAlloc);
00474         
00475         MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
00476 
00477         *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
00478         
00479         for(i=0,count=0;i<pxshmContext->nodesize;i++){
00480                 if(i != pxshmContext->noderank){
00481                         (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
00482                         count++;
00483                 }else{
00484                         (*bufName)[i] = NULL;
00485                 }
00486         }
00487 }
00488 
00489 void createShmObject(char *name,int size,char **pPtr);
00490 
00491 void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
00492         int i=0;
00493         
00494         *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*pxshmContext->nodesize);
00495         
00496         for(i=0;i<pxshmContext->nodesize;i++){
00497                 if(i != pxshmContext->noderank){
00498                         createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
00499                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00500 #if PXSHM_OSSPINLOCK
00501                         (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
00502 #elif PXSHM_LOCK
00503                         (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
00504 #endif
00505                 }else{
00506                         (*bufs)[i].header = NULL;
00507                         (*bufs)[i].data = NULL;
00508 #if PXSHM_LOCK
00509                         (*bufs)[i].mutex = NULL;
00510 #endif
00511                 }
00512         }       
00513 }
00514 
00515 
00516 
00517 void createShmObject(char *name,int size,char **pPtr){
00518         int fd=-1;
00519         int flags;      // opening flags for shared object
00520         int open_repeat_count = 0;
00521 
00522         flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
00523         
00524         while(fd<0 && open_repeat_count < 100){
00525           open_repeat_count++;
00526           fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
00527           
00528           if(fd < 0 && open_repeat_count > 10){
00529             fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
00530             fflush(stderr);
00531           }
00532         }
00533 
00534         CmiAssert(fd >= 0);
00535 
00536         ftruncate(fd,size); //set the size of the shared memory object
00537 
00538         *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
00539         CmiAssert(*pPtr != NULL);
00540 
00541         close(fd);
00542 }
00543 
00544 void tearDownSharedBuffers(){
00545         int i;
00546         for(i= 0;i<pxshmContext->nodesize;i++){
00547                 if(i != pxshmContext->noderank){
00548                         if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
00549                                 fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
00550                         }
00551 
00552 #if PXSHM_LOCK
00553                         sem_close(pxshmContext->recvBufs[i].mutex);
00554                         sem_unlink(pxshmContext->recvBufNames[i]);
00555                         sem_close(pxshmContext->sendBufs[i].mutex);
00556 #endif
00557                 }
00558         }
00559 };
00560 
00561 
00562 void initSendQ(PxshmSendQ *q,int size);
00563 
00564 void initAllSendQs(){
00565         int i=0;
00566         pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
00567         for(i=0;i<pxshmContext->nodesize;i++){
00568                 if(i != pxshmContext->noderank){
00569                         (pxshmContext->sendQs)[i] = (PxshmSendQ *)malloc(sizeof(PxshmSendQ));
00570                         initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
00571                 }else{
00572                         (pxshmContext->sendQs)[i] = NULL;
00573                 }
00574         }
00575 };
00576 
00577 
00578 /****************
00579  *copy this message into the sharedBuf
00580  If it does not succeed
00581  *put it into the sendQ 
00582  *NOTE: This method is called only after obtaining the corresponding mutex
00583  * ********/
00584 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
00585 
00586         if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
00588                 dstBuf->header->count++;
00589                 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
00590                 dstBuf->header->bytes += ogm->size;
00591                 MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00592                 return 1;
00593         }
00594         /***
00595          * Shared Buffer is too full for this message
00596          * **/
00597         printf("send buffer is too full\n");
00598         pushSendQ(dstSendQ,ogm);
00599         ogm->refcount++;
00600         MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00601         return 0;
00602 }
00603 
00604 inline OutgoingMsg popSendQ(PxshmSendQ *q);
00605 
00606 /****
00607  *Try to send all the messages in the sendq to this destination rank
00608  *NOTE: This method is called only after obtaining the corresponding mutex
00609  * ************/
00610 
00611 inline int flushSendQ(int dstRank){
00612         sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
00613         PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
00614         int count=dstSendQ->numEntries;
00615         int sent=0;
00616         while(count > 0){
00617                 OutgoingMsg ogm = popSendQ(dstSendQ);
00618                 ogm->refcount--;
00619                 MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
00620                 int ret = sendMessage(ogm,dstBuf,dstSendQ);
00621                 if(ret==1){
00622                         sent++;
00623       GarbageCollectMsg(ogm);
00624                 }
00625                 count--;
00626         }
00627         return sent;
00628 }
00629 
00630 inline void emptyRecvBuf(sharedBufData *recvBuf);
00631 
00632 inline void emptyAllRecvBufs(){
00633         int i;
00634         for(i=0;i<pxshmContext->nodesize;i++){
00635                 if(i != pxshmContext->noderank){
00636                         sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
00637                         if(recvBuf->header->count > 0){
00638 
00639 #if PXSHM_STATS
00640                                 pxshmContext->lockRecvCount++;
00641 #endif
00642 
00643 
00644 #if PXSHM_OSSPINLOCK
00645                                 if(! OSSpinLockTry(&recvBuf->header->lock)){
00646 #elif PXSHM_LOCK
00647                                 if(sem_trywait(recvBuf->mutex) < 0){
00648 #elif PXSHM_FENCE
00649                                 recvBuf->header->flagReceiver = 1;
00650                                 recvBuf->header->turn = SENDER;
00651                                 CmiMemoryReadFence(0,0);
00652                                 CmiMemoryWriteFence(0,0);
00653                                 if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
00654                                         recvBuf->header->flagReceiver = 0;
00655 #endif
00656                                 }else{
00657 
00658 
00659                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
00660                                         emptyRecvBuf(recvBuf);
00661 
00662 #if PXSHM_OSSPINLOCK
00663                                         OSSpinLockUnlock(&recvBuf->header->lock);
00664 #elif PXSHM_LOCK
00665                                         sem_post(recvBuf->mutex);
00666 #elif PXSHM_FENCE
00667                                         CmiMemoryReadFence(0,0);                        
00668                                         CmiMemoryWriteFence(0,0);
00669                                         recvBuf->header->flagReceiver = 0;
00670 #endif
00671 
00672                                 }
00673                         
00674                         }
00675                 }
00676         }
00677 };
00678 
00679 inline void flushAllSendQs(){
00680         int i=0;
00681         
00682         for(i=0;i<pxshmContext->nodesize;i++){
00683                 if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
00684         
00685 #if PXSHM_OSSPINLOCK
00686                         if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
00687 #elif PXSHM_LOCK
00688                         if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
00689 #elif PXSHM_FENCE
00690                         pxshmContext->sendBufs[i].header->flagSender = 1;
00691                         pxshmContext->sendBufs[i].header->turn = RECEIVER;
00692                         CmiMemoryReadFence(0,0);                        
00693                         CmiMemoryWriteFence(0,0);
00694                         if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
00695 #endif
00696 
00697                                 MACHSTATE1(3,"flushSendQ %d",i);
00698                                 flushSendQ(i);
00699 
00700 
00701                                 
00702 #if PXSHM_OSSPINLOCK    
00703                                 OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
00704 #elif PXSHM_LOCK
00705                                 sem_post(pxshmContext->sendBufs[i].mutex);
00706 #elif PXSHM_FENCE
00707                                 CmiMemoryReadFence(0,0);                        
00708                                 CmiMemoryWriteFence(0,0);
00709                                 pxshmContext->sendBufs[i].header->flagSender = 0;
00710 #endif
00711                         }else{
00712 
00713 #if PXSHM_FENCE
00714                           pxshmContext->sendBufs[i].header->flagSender = 0;
00715 #endif                          
00716 
00717                         }
00718 
00719                 }        
00720         }       
00721 };
00722 
00723 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
00724 
00725 void emptyRecvBuf(sharedBufData *recvBuf){
00726         int numMessages = recvBuf->header->count;
00727         int i=0;
00728 
00729         char *ptr=recvBuf->data;
00730 
00731         for(i=0;i<numMessages;i++){
00732                 int size;
00733                 int rank, srcpe, seqno, magic, i;
00734                 unsigned int broot;
00735                 char *msg = ptr;
00736                 char *newMsg;
00737 
00738                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00739                 size = CmiMsgHeaderGetLength(msg);
00740         
00741                 newMsg = (char *)CmiAlloc(size);
00742                 memcpy(newMsg,msg,size);
00743 
00744                 handoverPxshmMessage(newMsg,size,rank,broot);
00745                 
00746                 ptr += size;
00747 
00748                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00749         }
00750         /*
00751   if(ptr - recvBuf->data != recvBuf->header->bytes){
00752                 CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d initialBytes %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages,initialBytes);
00753         }*/
00754         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00755         recvBuf->header->count=0;
00756         recvBuf->header->bytes=0;
00757 }
00758 
00759 
00760 void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
00761         CmiAssert(rank == 0);
00762 #if CMK_BROADCAST_SPANNING_TREE
00763         if (rank == DGRAM_BROADCAST
00764 #if CMK_NODE_QUEUE_AVAILABLE
00765           || rank == DGRAM_NODEBROADCAST
00766 #endif
00767          ){
00768                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00769                                         }
00770 #elif CMK_BROADCAST_HYPERCUBE
00771         if (rank == DGRAM_BROADCAST
00772 #if CMK_NODE_QUEUE_AVAILABLE
00773           || rank == DGRAM_NODEBROADCAST
00774 #endif
00775          ){
00776                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00777                                         }
00778 #endif
00779 
00780                 switch (rank) {
00781         case DGRAM_BROADCAST: {
00782           CmiPushPE(0, newmsg);
00783           break;
00784       }
00785         default:
00786                                 {
00787                                         
00788           CmiPushPE(rank, newmsg);
00789                                 }
00790         }    /* end of switch */
00791 }
00792 
00793 
00794 /**************************
00795  *sendQ helper functions
00796  * ****************/
00797 
00798 void initSendQ(PxshmSendQ *q,int size){
00799         q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
00800 
00801         q->size = size;
00802         q->numEntries = 0;
00803 
00804         q->begin = 0;
00805         q->end = 0;
00806 }
00807 
00808 void pushSendQ(PxshmSendQ *q,OutgoingMsg msg){
00809         if(q->numEntries == q->size){
00810                 //need to resize 
00811                 OutgoingMsg *oldData = q->data;
00812                 int newSize = q->size<<1;
00813                 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
00814                 //copy head to the beginning of the new array
00815                 
00816                 CmiAssert(q->begin == q->end);
00817 
00818                 CmiAssert(q->begin < q->size);
00819                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
00820 
00821                 if(q->end != 0){
00822                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
00823                 }
00824                 free(oldData);
00825                 q->begin = 0;
00826                 q->end = q->size;
00827                 q->size = newSize;
00828         }
00829         q->data[q->end] = msg;
00830         (q->end)++;
00831         if(q->end >= q->size){
00832                 q->end -= q->size;
00833         }
00834         q->numEntries++;
00835 }
00836 
00837 OutgoingMsg popSendQ(PxshmSendQ *q){
00838         OutgoingMsg ret;
00839         if(0 == q->numEntries){
00840                 return NULL;
00841         }
00842 
00843         ret = q->data[q->begin];
00844         (q->begin)++;
00845         if(q->begin >= q->size){
00846                 q->begin -= q->size;
00847         }
00848         
00849         q->numEntries--;
00850         return ret;
00851 }

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1