arch/net/machine-sysvshm.c

Go to the documentation of this file.
00001 
00027 #include <sys/types.h>
00028 #include <sys/mman.h>
00029 #include <unistd.h>
00030 #include <sys/stat.h>
00031 #include <fcntl.h>
00032 #include <errno.h>
00033 
00034 #include <sys/shm.h>
00035 #include <sys/sem.h>
00036 
00037 #define SYSVSHM_BITS 5                  /* Number of bits to represent number of cpus in a node - currently 32 cpus supported */
00038 #define SYSVSHM_SIZE (1<<SYSVSHM_BITS)  /* Number of bits to represent number of cpus in a node - currently 32 cpus supported */
00039 
00040 #define MEMDEBUG(x) /* x */
00041 
00042 #define SYSVSHM_STATS 0
00043 
00044 #define ACQUIRENW(i) sb.sem_num=i; sb.sem_op=-1; sb.sem_flg=IPC_NOWAIT
00045 #define ACQUIRE(i)   sb.sem_num=i; sb.sem_op=-1; sb.sem_flg=SEM_UNDO
00046 #define RELEASE(i)   sb.sem_num=i; sb.sem_op=1;  sb.sem_flg=SEM_UNDO
00047 
00048 #define TESTARRAY 1
00049 
00050 #define SNDBITS(i) ((sysvshmContext->nodestart+i)%SYSVSHM_SIZE)
00051 #define RCVBITS(i) ((sysvshmContext->nodestart+sysvshmContext->noderank)%SYSVSHM_SIZE)
00052 #define PIDBITS    Cmi_charmrun_pid%(sizeof(int)-2*SYSVSHM_SIZE)
00053 #define SHMSNDNAME(i)   (PIDBITS<<(SYSVSHM_BITS*2))+(RCVBITS(i)<<SYSVSHM_BITS)+SNDBITS(i)
00054 #define SHMRCVNAME(i)   (PIDBITS<<(SYSVSHM_BITS*2))+(SNDBITS(i)<<SYSVSHM_BITS)+RCVBITS(i)
00055 #define SEMSNDNAME(i)   (PIDBITS<<(SYSVSHM_BITS*2))+SNDBITS(i)
00056 #define SEMRCVNAME(i)   (PIDBITS<<(SYSVSHM_BITS*2))+RCVBITS(i)
00057 
00058 
00059 /************************
00060  *      Implementation currently assumes that
00061  *      1) all nodes have the same number of processors
00062  *  2) in the nodelist all processors in a node are listed in sequence
00063  *   0 1 2 3      4 5 6 7 
00064  *   -------      -------
00065  *    node 1       node 2 
00066  ************************/
00067 
00068 #define SHMBUFLEN 1000000
00069 
00070 #define SENDQSTARTSIZE 128
00071 
00072 
00073 /* This struct is used as the first portion of a shared memory region, followed by data */
00074 typedef struct {
00075         int count; /* number of messages */
00076         int bytes; /* number of bytes */
00077 
00078 } sharedBufHeader;
00079 
00080 
00081 typedef struct {
00082         int semid;
00083         int shmid;
00084         sharedBufHeader *header;        
00085         char *data;
00086 } sharedBufData;
00087 
00088 typedef struct {
00089         int size;  /* total size of data array */
00090         int begin; /* position of first element */
00091         int end;   /*   position of next element */
00092         int numEntries; /* number of entries */
00093 
00094         OutgoingMsg *data;
00095 
00096 } SysvshmSendQ;
00097 
00098 typedef struct {
00099         int nodesize;
00100         int noderank;
00101         int nodestart,nodeend; /* proc numbers for the start and end of this node */
00102 
00103         ushort *semarray;
00104 
00105         int *sendbufnames;
00106         int *recvbufnames;
00107 
00108         sharedBufData *recvBufs;
00109         sharedBufData *sendBufs;
00110 
00111         SysvshmSendQ **sendQs;
00112 
00113 
00114 #if SYSVSHM_STATS
00115         int sendCount;
00116         int validCheckCount;
00117         int lockRecvCount;
00118         double validCheckTime;
00119         double sendTime;
00120         double commServerTime;
00121 #endif
00122 
00123 } SysvshmContext;
00124 
00125 SysvshmContext *sysvshmContext=NULL; /* global context */
00126 
00127 
00128 void calculateNodeSizeAndRank(char **);
00129 void setupSharedBuffers();
00130 void initAllSendQs();
00131 
00132 /******************
00133  *      Initialization routine
00134  *      currently just testing start up
00135  * ****************/
00136 void CmiInitSysvshm(char **argv){
00137         MACHSTATE(3,"CminitSysvshm start");
00138         sysvshmContext = (SysvshmContext *)malloc(sizeof(SysvshmContext));
00139 
00140         if(Cmi_charmrun_pid <= 0){
00141                 CmiAbort("sysvshm must be run with charmrun");
00142         }
00143         calculateNodeSizeAndRank(argv);
00144         if(sysvshmContext->nodesize == 1){
00145                 return;
00146         }
00147         MACHSTATE1(3,"CminitSysvshm  %d calculateNodeSizeAndRank",sysvshmContext->nodesize);
00148 
00149         setupSharedBuffers();
00150 
00151         MACHSTATE2(3,"CminitSysvshm %d %d setupSharedBuffers",Cmi_charmrun_pid,sysvshmContext->nodesize);
00152 
00153         initAllSendQs();
00154         
00155         MACHSTATE2(3,"CminitSysvshm %d %d initAllSendQs",Cmi_charmrun_pid,sysvshmContext->nodesize);
00156 
00157         MACHSTATE2(3,"CminitSysvshm %d %d done",Cmi_charmrun_pid,sysvshmContext->nodesize);
00158 
00159 
00160 #if SYSVSHM_STATS
00161         sysvshmContext->sendCount=0;
00162         sysvshmContext->sendTime=0.0;
00163         sysvshmContext->validCheckCount=0;
00164         sysvshmContext->validCheckTime=0.0;
00165         sysvshmContext->commServerTime = 0;
00166         sysvshmContext->lockRecvCount = 0;
00167 #endif
00168 
00169 };
00170 
00171 /**************
00172  * shutdown shmem objects and semaphores
00173  *
00174  * *******************/
00175 void tearDownSharedBuffers();
00176 
00177 void CmiExitSysvshm(){
00178         int i=0;
00179         
00180         if(sysvshmContext->nodesize != 1){
00181                 tearDownSharedBuffers();
00182 
00183 
00184                 free(sysvshmContext->recvbufnames);
00185                 free(sysvshmContext->sendbufnames);
00186                 free(sysvshmContext->semarray);
00187 
00188                 free(sysvshmContext->recvBufs);
00189                 free(sysvshmContext->sendBufs);
00190         }
00191 #if SYSVSHM_STATS
00192         CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",
00193         _Cmi_mynode,sysvshmContext->sendCount,sysvshmContext->sendTime,sysvshmContext->validCheckCount,sysvshmContext->validCheckTime,sysvshmContext->commServerTime,sysvshmContext->lockRecvCount);
00194 #endif
00195         free(sysvshmContext);
00196 }
00197 
00198 /******************
00199  *Should this message be sent using SysvShm or not ?
00200  * ***********************/
00201 
00202 inline int CmiValidSysvshm(OutgoingMsg ogm, OtherNode node){
00203 #if SYSVSHM_STATS
00204         sysvshmContext->validCheckCount++;
00205 #endif
00206 
00207         if(ogm->dst >= sysvshmContext->nodestart && ogm->dst <= sysvshmContext->nodeend && ogm->size < SHMBUFLEN ){
00208                 return 1;
00209         }else{
00210                 return 0;
00211         }
00212 };
00213 
00214 
00215 inline int SysvshmRank(int dst){
00216         return dst - sysvshmContext->nodestart;
00217 }
00218 inline void pushSendQ(SysvshmSendQ *q,OutgoingMsg msg);
00219 inline int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,SysvshmSendQ *dstSendQ);
00220 inline int flushSendQ(int dstRank);
00221 
00222 /***************
00223  *
00224  *Send this message through shared memory
00225  *if you cannot get lock, put it in the sendQ
00226  *Before sending messages pick them from sendQ
00227  *
00228  * ****************************/
00229 
00230 void CmiSendMessageSysvshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot){
00231         struct sembuf sb;
00232         
00233 #if SYSVSHM_STATS
00234         double _startSendTime = CmiWallTimer();
00235 #endif
00236 
00237         
00238         int dstRank = SysvshmRank(ogm->dst);
00239         MEMDEBUG(CmiMemoryCheck());
00240   
00241         DgramHeaderMake(ogm->data,rank,ogm->src,Cmi_charmrun_pid,1, broot);
00242         
00243   
00244         MACHSTATE4(3,"Send Msg Sysvshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
00245 
00246         CmiAssert(dstRank >=0 && dstRank != sysvshmContext->noderank);
00247         
00248         sharedBufData *dstBuf = &(sysvshmContext->sendBufs[dstRank]);
00249 
00250         ACQUIRENW(sysvshmContext->noderank);
00251         if(semop(dstBuf->semid, &sb, 1)<0) {
00255                 pushSendQ(sysvshmContext->sendQs[dstRank],ogm);
00256                 ogm->refcount++;
00257                 MEMDEBUG(CmiMemoryCheck());
00258                 return;
00259         }else{
00260                 /***
00261                  * We got the lock for this buffer
00262                  * first write all the messages in the sendQ and then write this guy
00263                  * */
00264                  if(sysvshmContext->sendQs[dstRank]->numEntries == 0){
00265                                 /* send message user event */
00266                                 int ret = sendMessage(ogm,dstBuf,sysvshmContext->sendQs[dstRank]);
00267                                 MACHSTATE(3,"Sysvshm Send succeeded immediately");
00268                  }else{
00269                                 ogm->refcount+=2;/*this message should not get deleted when the queue is flushed*/
00270                                 pushSendQ(sysvshmContext->sendQs[dstRank],ogm);
00271                                 MACHSTATE3(3,"Sysvshm ogm %p pushed to sendQ length %d refcount %d",ogm,sysvshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
00272                                 int sent = flushSendQ(dstRank);
00273                                 ogm->refcount--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
00274                                 MACHSTATE1(3,"Sysvshm flushSendQ sent %d messages",sent);
00275                  }
00276                  /* unlock the recvbuffer*/
00277                 RELEASE(sysvshmContext->noderank);
00278                 CmiAssert(semop(dstBuf->semid, &sb, 1)>=0);
00279         }
00280 #if SYSVSHM_STATS
00281                 sysvshmContext->sendCount ++;
00282                 sysvshmContext->sendTime += (CmiWallTimer()-_startSendTime);
00283 #endif
00284         MEMDEBUG(CmiMemoryCheck());
00285 
00286 };
00287 
00288 inline void emptyAllRecvBufs();
00289 inline void flushAllSendQs();
00290 
00291 /**********
00292  * Extract all the messages from the recvBuffers you can
00293  * Flush all sendQs
00294  * ***/
00295 inline void CommunicationServerSysvshm(){
00296         
00297 #if SYSVSHM_STATS
00298         double _startCommServerTime =CmiWallTimer();
00299 #endif  
00300         
00301         MEMDEBUG(CmiMemoryCheck());
00302         emptyAllRecvBufs();
00303         flushAllSendQs();
00304 
00305 #if SYSVSHM_STATS
00306         sysvshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
00307 #endif
00308 
00309         MEMDEBUG(CmiMemoryCheck());
00310 };
00311 
00312 static void CmiNotifyStillIdleSysvshm(CmiIdleState *s){
00313         CommunicationServerSysvshm();
00314 }
00315 
00316 
00317 static void CmiNotifyBeginIdleSysvshm(CmiIdleState *s)
00318 {
00319         CmiNotifyStillIdle(s);
00320 }
00321 
00322 
00323 void calculateNodeSizeAndRank(char **argv){
00324         sysvshmContext->nodesize=1;
00325         MACHSTATE(3,"calculateNodeSizeAndRank start");
00326         CmiGetArgIntDesc(argv, "+nodesize", &(sysvshmContext->nodesize),"Number of cores in this node");
00327         MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",sysvshmContext->nodesize);
00328 
00329         sysvshmContext->noderank = _Cmi_mynode % (sysvshmContext->nodesize);
00330         
00331         MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",sysvshmContext->noderank);
00332         
00333         sysvshmContext->nodestart = _Cmi_mynode -sysvshmContext->noderank;
00334         
00335         MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
00336 
00337         sysvshmContext->nodeend = sysvshmContext->nodestart + sysvshmContext->nodesize -1;
00338 
00339         if(sysvshmContext->nodeend >= _Cmi_numnodes){
00340                 sysvshmContext->nodeend = _Cmi_numnodes-1;
00341                 sysvshmContext->nodesize = (sysvshmContext->nodeend - sysvshmContext->nodestart) +1;
00342         }
00343         
00344         MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",sysvshmContext->nodestart,sysvshmContext->nodesize,sysvshmContext->noderank);
00345 }
00346 
00347 void createShmObjectsAndSems(sharedBufData **bufs, int *bufnames,int issend);
00348 /***************
00349  *      calculate the name of the shared objects and semaphores
00350  *      
00351  *      name scheme
00352  *      shared memory: 00..00{rcvbits}{sndbits} as key
00353  *      semaphore : 00..00{rcvbits}{sndbits} as key
00354  *      open these shared objects and semaphores
00355  * *********/
00356 
00357 void setupSharedBuffers(){
00358         int i=0;
00359 
00360         CmiAssert((sysvshmContext->recvbufnames=(int *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00361         CmiAssert((sysvshmContext->sendbufnames=(int *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00362         CmiAssert((sysvshmContext->semarray= (ushort *)malloc(sizeof(int)*(sysvshmContext->nodesize)))!=NULL);
00363 
00364                 
00365         for(i=0;i<sysvshmContext->nodesize;i++){
00366                 sysvshmContext->semarray[i]=1;
00367                 if(i != sysvshmContext->noderank){
00368                         sysvshmContext->sendbufnames[i]=SHMSNDNAME(i);
00369                         sysvshmContext->recvbufnames[i]=SHMRCVNAME(i);
00370                 }
00371         }
00372         
00373         createShmObjectsAndSems(&(sysvshmContext->recvBufs),sysvshmContext->recvbufnames,0);
00374         createShmObjectsAndSems(&(sysvshmContext->sendBufs),sysvshmContext->sendbufnames,1);
00375 
00376         for(i=0;i<sysvshmContext->nodesize;i++){
00377                 if(i != sysvshmContext->noderank){
00378                         CmiAssert(sysvshmContext->sendBufs[i].header->count == 0);
00379                         sysvshmContext->sendBufs[i].header->count = 0;
00380                         sysvshmContext->sendBufs[i].header->bytes = 0;
00381                 }
00382         }
00383 }
00384 
00385 
00386 
00387 void createShmObjectsAndSems(sharedBufData **bufs, int *bufnames,int issend) {
00388         int i;
00389         int name;
00390 
00391         union semun {
00392         int val;
00393         struct semid_ds *buf;
00394         ushort *array;
00395         } arg;
00396         struct semid_ds seminfo;
00397         arg.array=sysvshmContext->semarray;
00398 
00399         *bufs = (sharedBufData *)malloc(sizeof(sharedBufData)*sysvshmContext->nodesize);
00400         
00401         for(i=0;i<sysvshmContext->nodesize;i++){
00402                 if(i != sysvshmContext->noderank){
00403                         if(issend)
00404                                 name=SEMSNDNAME(i);
00405                          else
00406                                 name=SEMRCVNAME(i);
00407 
00408                         if(((*bufs)[i].semid=semget(name,sysvshmContext->nodesize,0666|IPC_CREAT|IPC_EXCL))>=0) {
00409                                 CmiAssert((semctl((*bufs)[i].semid,sysvshmContext->nodesize,SETALL,arg))>=0);
00410                         } else if(errno==EEXIST) {
00411                                 CmiAssert((((*bufs)[i].semid)=semget(name,sysvshmContext->nodesize,0666))>=0);
00412                         } else {
00413                                 tearDownSharedBuffers();
00414                                 CmiPrintf("problem getting sem : %s\n",strerror(errno));
00415                                 CmiAbort("sem\n");
00416                         }
00417 
00418                         (*bufs)[i].shmid=-1;
00419                         (*bufs)[i].shmid=shmget(bufnames[i],SHMBUFLEN+sizeof(sharedBufHeader),0666|IPC_CREAT|IPC_EXCL); /*Attempt to get shmid*/
00420                         if(errno==EEXIST) 
00421                                 (*bufs)[i].shmid=shmget(bufnames[i],SHMBUFLEN+sizeof(sharedBufHeader),0666);
00422                         CmiAssert(((*bufs)[i].shmid)>0);
00423 
00424                         CmiAssert(((*bufs)[i].header=shmat((*bufs)[i].shmid, (void *)0, 0))>0);
00425                         (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
00426                 }else{
00427                         (*bufs)[i].shmid=-1;
00428                         (*bufs)[i].semid=-1;
00429                         (*bufs)[i].header = NULL;
00430                         (*bufs)[i].data = NULL;
00431                 }
00432         }       
00433 }
00434 
00435 
00436 void tearDownSharedBuffers(){
00437         int i,j,ret;
00438         struct shmid_ds arg;
00439         for(i= 0;i<sysvshmContext->nodesize;i++){
00440                 if(i != sysvshmContext->noderank){
00441                         /* Shared memory detach */
00442                         shmdt(sysvshmContext->sendBufs[i].header);
00443                         shmdt(sysvshmContext->recvBufs[i].header);
00444 
00445                         shmctl(sysvshmContext->sendBufs[i].shmid,IPC_STAT,&arg); /* See if anyone is attached */
00446                         if(arg.shm_nattch==0) { /* No one is attached remove id's */
00447                                 shmctl(sysvshmContext->sendBufs[i].shmid,IPC_RMID,NULL);
00448                                 shmctl(sysvshmContext->recvBufs[i].shmid,IPC_RMID,NULL);
00449                                 semctl(sysvshmContext->sendBufs[i].semid,0, IPC_RMID,0);
00450                                 semctl(sysvshmContext->recvBufs[i].semid,0, IPC_RMID,0);
00451                         }
00452                 }
00453         }
00454 };
00455 
00456 
00457 void initSendQ(SysvshmSendQ *q,int size);
00458 
00459 void initAllSendQs(){
00460         int i=0;
00461         sysvshmContext->sendQs = (SysvshmSendQ **) malloc(sizeof(SysvshmSendQ *)*sysvshmContext->nodesize);
00462         for(i=0;i<sysvshmContext->nodesize;i++){
00463                 if(i != sysvshmContext->noderank){
00464                         (sysvshmContext->sendQs)[i] = (SysvshmSendQ *)malloc(sizeof(SysvshmSendQ));
00465                         initSendQ((sysvshmContext->sendQs)[i],SENDQSTARTSIZE);
00466                 }else{
00467                         (sysvshmContext->sendQs)[i] = NULL;
00468                 }
00469         }
00470 };
00471 
00472 
00473 /****************
00474  *copy this message into the sharedBuf
00475  If it does not succeed
00476  *put it into the sendQ 
00477  *NOTE: This method is called only after obtaining the corresponding mutex
00478  * ********/
00479 int sendMessage(OutgoingMsg ogm,sharedBufData *dstBuf,SysvshmSendQ *dstSendQ){
00480 
00481         if(dstBuf->header->bytes+ogm->size <= SHMBUFLEN){
00483                 dstBuf->header->count++;
00484                 memcpy(dstBuf->data+dstBuf->header->bytes,ogm->data,ogm->size);
00485                 dstBuf->header->bytes += ogm->size;
00486                 MACHSTATE4(3,"Sysvshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
00487                 return 1;
00488         }
00489         /***
00490          * Shared Buffer is too full for this message
00491          * **/
00492         printf("send buffer is too full\n");
00493         pushSendQ(dstSendQ,ogm);
00494         ogm->refcount++;
00495         MACHSTATE3(3,"Sysvshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
00496         return 0;
00497 }
00498 
00499 inline OutgoingMsg popSendQ(SysvshmSendQ *q);
00500 
00501 /****
00502  *Try to send all the messages in the sendq to this destination rank
00503  *NOTE: This method is called only after obtaining the corresponding mutex
00504  * ************/
00505 
00506 inline int flushSendQ(int dstRank){
00507         sharedBufData *dstBuf = &(sysvshmContext->sendBufs[dstRank]);
00508         SysvshmSendQ *dstSendQ = sysvshmContext->sendQs[dstRank];
00509         int count=dstSendQ->numEntries;
00510         int sent=0;
00511         while(count > 0){
00512                 OutgoingMsg ogm = popSendQ(dstSendQ);
00513                 ogm->refcount--;
00514                 MACHSTATE4(3,"Sysvshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
00515                 int ret = sendMessage(ogm,dstBuf,dstSendQ);
00516                 if(ret==1){
00517                         sent++;
00518                         GarbageCollectMsg(ogm);
00519                 }
00520                 count--;
00521         }
00522         return sent;
00523 }
00524 
00525 inline void emptyRecvBuf(sharedBufData *recvBuf);
00526 
00527 inline void emptyAllRecvBufs(){
00528         struct sembuf sb;
00529         int i;
00530         int j,ret;
00531         union semun {
00532         int val;
00533         struct semid_ds *buf;
00534         ushort array[1];
00535         } arg;
00536         for(i=0;i<sysvshmContext->nodesize;i++){
00537                 if(i != sysvshmContext->noderank){
00538                         sharedBufData *recvBuf = &(sysvshmContext->recvBufs[i]);
00539                         if(recvBuf->header->count > 0){
00540 
00541 #if SYSVSHM_STATS
00542                                 sysvshmContext->lockRecvCount++;
00543 #endif
00544 
00545                                 ACQUIRE(i);
00546                                 if(semop(recvBuf->semid, &sb, 1)>=0) {
00547                                         MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);
00548                                         emptyRecvBuf(recvBuf);
00549                                         RELEASE(i);
00550                                         CmiAssert((semop(recvBuf->semid, &sb, 1))>=0);
00551                                 }
00552 
00553                         }
00554                 }
00555         }
00556 };
00557 
00558 inline void flushAllSendQs(){
00559         struct sembuf sb;
00560         int i=0;
00561         
00562         for(i=0;i<sysvshmContext->nodesize;i++){
00563                 if(i != sysvshmContext->noderank && sysvshmContext->sendQs[i]->numEntries > 0){
00564                         ACQUIRE(sysvshmContext->noderank);
00565                         if(semop(sysvshmContext->sendBufs[i].semid, &sb, 1)>=0) {
00566                                 MACHSTATE1(3,"flushSendQ %d",i);
00567                                 flushSendQ(i);
00568                                 RELEASE(sysvshmContext->noderank);
00569                                 CmiAssert(semop(sysvshmContext->sendBufs[i].semid, &sb, 1)>=0);
00570                         }
00571 
00572                 }        
00573         }       
00574 };
00575 
00576 void static inline handoverSysvshmMessage(char *newmsg,int total_size,int rank,int broot);
00577 
00578 void emptyRecvBuf(sharedBufData *recvBuf){
00579         int numMessages = recvBuf->header->count;
00580         int i=0;
00581 
00582         char *ptr=recvBuf->data;
00583 
00584         for(i=0;i<numMessages;i++){
00585                 int size;
00586                 int rank, srcpe, seqno, magic, i;
00587                 unsigned int broot;
00588                 char *msg = ptr;
00589                 char *newMsg;
00590 
00591                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00592                 size = CmiMsgHeaderGetLength(msg);
00593         
00594                 newMsg = (char *)CmiAlloc(size);
00595                 memcpy(newMsg,msg,size);
00596 
00597                 handoverSysvshmMessage(newMsg,size,rank,broot);
00598                 
00599                 ptr += size;
00600 
00601                 MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
00602         }
00603         CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
00604         recvBuf->header->count=0;
00605         recvBuf->header->bytes=0;
00606 }
00607 
00608 
00609 void static inline handoverSysvshmMessage(char *newmsg,int total_size,int rank,int broot){
00610         CmiAssert(rank == 0);
00611 #if CMK_BROADCAST_SPANNING_TREE
00612         if (rank == DGRAM_BROADCAST
00613 #if CMK_NODE_QUEUE_AVAILABLE
00614           || rank == DGRAM_NODEBROADCAST
00615 #endif
00616          ){
00617                 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00618                                         }
00619 #elif CMK_BROADCAST_HYPERCUBE
00620         if (rank == DGRAM_BROADCAST
00621 #if CMK_NODE_QUEUE_AVAILABLE
00622           || rank == DGRAM_NODEBROADCAST
00623 #endif
00624          ){
00625                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00626                                         }
00627 #endif
00628 
00629                 switch (rank) {
00630         case DGRAM_BROADCAST: {
00631           CmiPushPE(0, newmsg);
00632           break;
00633       }
00634         default:
00635                                 {
00636                                         
00637           CmiPushPE(rank, newmsg);
00638                                 }
00639         }    /* end of switch */
00640 }
00641 
00642 
00643 /**************************
00644  *sendQ helper functions
00645  * ****************/
00646 
00647 void initSendQ(SysvshmSendQ *q,int size){
00648         q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*size);
00649 
00650         q->size = size;
00651         q->numEntries = 0;
00652 
00653         q->begin = 0;
00654         q->end = 0;
00655 }
00656 
00657 void pushSendQ(SysvshmSendQ *q,OutgoingMsg msg){
00658         if(q->numEntries == q->size){
00659                 /* need to resize */
00660                 OutgoingMsg *oldData = q->data;
00661                 int newSize = q->size<<1;
00662                 q->data = (OutgoingMsg *)malloc(sizeof(OutgoingMsg)*newSize);
00663                 /* copy head to the beginning of the new array */
00664                 
00665                 CmiAssert(q->begin == q->end);
00666 
00667                 CmiAssert(q->begin < q->size);
00668                 memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsg)*(q->size - q->begin));
00669 
00670                 if(q->end != 0){
00671                         memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsg)*(q->end));
00672                 }
00673                 free(oldData);
00674                 q->begin = 0;
00675                 q->end = q->size;
00676                 q->size = newSize;
00677         }
00678         q->data[q->end] = msg;
00679         (q->end)++;
00680         if(q->end >= q->size){
00681                 q->end -= q->size;
00682         }
00683         q->numEntries++;
00684 }
00685 
00686 OutgoingMsg popSendQ(SysvshmSendQ *q){
00687         OutgoingMsg ret;
00688         if(0 == q->numEntries){
00689                 return NULL;
00690         }
00691 
00692         ret = q->data[q->begin];
00693         (q->begin)++;
00694         if(q->begin >= q->size){
00695                 q->begin -= q->size;
00696         }
00697         
00698         q->numEntries--;
00699         return ret;
00700 }

Generated on Sun Jun 29 13:29:06 2008 for Charm++ by  doxygen 1.5.1