conv-com/routerstrategy.C

Go to the documentation of this file.
00001 
00002 #include "routerstrategy.h"
00003 
00004 CkpvDeclare(int, RecvHandle);
00005 CkpvDeclare(int, ProcHandle);
00006 CkpvDeclare(int, DummyHandle);
00007 
00008 //Handlers that call the entry funtions of routers 
00009 //Refer to router.h for details on these entry functions
00010 
00011 //Correspods to Router::ProcManyMsg
00012 void procManyCombinedMsg(char *msg)
00013 {
00014     //comID id;
00015     int instance_id;
00016 
00017     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
00018     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
00019 
00020     //Comid specific
00021     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
00022            , sizeof(int));
00023 
00024     Strategy *s = ConvComlibGetStrategy(instance_id);
00025     ((RouterStrategy *)s)->ProcManyMsg(msg);
00026 }
00027 
00028 //Correspods to Router::DummyEP
00029 void dummyEP(DummyMsg *m)
00030 {
00031     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
00032     
00033     ((RouterStrategy *)s)->DummyEP(m);
00034 }
00035 
00036 //Correspods to Router::RecvManyMsg
00037 void recvManyCombinedMsg(char *msg)
00038 {
00039     //comID id;
00040     int instance_id;
00041     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
00042     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
00043     
00044     //Comid specific
00045     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
00046            , sizeof(int));
00047 
00048     Strategy *s = ConvComlibGetStrategy(instance_id);
00049     ((RouterStrategy *)s)->RecvManyMsg(msg);
00050 }
00051 
00052 
00053 void doneHandler(DummyMsg *m){
00054     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
00055     
00056     ((RouterStrategy *)s)->Done(m);
00057 }
00058 
00059 void RouterStrategy::setReverseMap(){
00060     int pcount;
00061     for(pcount = 0; pcount < CkNumPes(); pcount++)
00062         procMap[pcount] = -1;
00063 
00064     //All processors not in the domain will point to -1
00065     for(pcount = 0; pcount < npes; pcount++) {
00066         if (pelist[pcount] == CkMyPe())
00067             myPe = pcount;
00068 
00069         procMap[pelist[pcount]] = pcount;
00070     }
00071 }
00072 
00073 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
00074                                int *_pelist) 
00075     : Strategy(){
00076     
00077     setType(CONVERSE_STRATEGY);
00078 
00079     CkpvInitialize(int, RecvHandle);
00080     CkpvInitialize(int, ProcHandle);
00081     CkpvInitialize(int, DummyHandle);
00082 
00083     id.instanceID = 0; //Set later in doneInserting
00084     
00085     id.isAllToAll = 0;
00086     id.refno = 0;
00087 
00088     CkpvAccess(RecvHandle) =
00089         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
00090     CkpvAccess(ProcHandle) =
00091         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
00092     CkpvAccess(DummyHandle) = 
00093         CkRegisterHandler((CmiHandler)dummyEP);    
00094 
00095     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
00096 
00097     //Array strategy done handle
00098     doneHandle = handle;
00099 
00100     routerID = stratid;
00101 
00102     npes = _npes;
00103     //pelist = new int[npes];
00104     pelist = _pelist;
00105     //memcpy(pelist, _pelist, sizeof(int) * npes);    
00106 
00107     if(npes <= 1)
00108         routerID = USE_DIRECT;
00109 
00110     myPe = -1;
00111     procMap = new int[CkNumPes()];    
00112     setReverseMap();
00113 
00114     bcast_pemap = NULL;
00115 
00116     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
00117                  myPe, npes);
00118 
00119     if(myPe < 0) {
00120         //I am not part of this strategy
00121         
00122         doneFlag = 0;
00123         router = NULL;
00124         bufferedDoneInserting = 0;
00125         return;        
00126     }
00127 
00128     //Start with all iterations done
00129     doneFlag = 1;
00130     
00131     //No Buffered doneInserting at the begining
00132     bufferedDoneInserting = 0;
00133 
00134     switch(stratid) {
00135     case USE_TREE: 
00136         router = new TreeRouter(npes, myPe);
00137         break;
00138         
00139     case USE_MESH:
00140         router = new GridRouter(npes, myPe);
00141         break;
00142         
00143     case USE_HYPERCUBE:
00144         router = new DimexRouter(npes, myPe);
00145         break;
00146         
00147     case USE_GRID:
00148         router = new D3GridRouter(npes, myPe);
00149         break;
00150         
00151     case USE_PREFIX:
00152         router = new PrefixRouter(npes, myPe);
00153         break;
00154 
00155     case USE_DIRECT: router = NULL;
00156         break;
00157         
00158     default: CmiAbort("Unknown Strategy\n");
00159         break;
00160     }
00161 
00162     if(router) {
00163         router->SetMap(pelist);
00164         router->setDoneHandle(myDoneHandle);
00165         //router->SetID(id);
00166     }
00167 }
00168 
00169 
00170 RouterStrategy::~RouterStrategy() {
00171     //delete [] pelist;
00172 
00173     if(bcast_pemap)
00174         delete [] bcast_pemap;
00175     
00176     delete [] procMap;
00177     if(router)
00178         delete router;
00179 }
00180 
00181 
00182 void RouterStrategy::insertMessage(MessageHolder *cmsg){
00183 
00184     if(myPe < 0)
00185         CmiAbort("insertMessage: mype < 0\n");
00186 
00187     int count = 0;
00188     if(routerID == USE_DIRECT) {
00189         if(cmsg->dest_proc == IS_BROADCAST) {
00190             for(count = 0; count < cmsg->npes-1; count ++)
00191                 CmiSyncSend(cmsg->pelist[count], cmsg->size, 
00192                             cmsg->getMessage());
00193             if(cmsg->npes > 0)
00194                 CmiSyncSendAndFree(cmsg->pelist[cmsg->npes-1], cmsg->size, 
00195                                    cmsg->getMessage());
00196         }
00197         else
00198             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
00199                                cmsg->getMessage());
00200         delete cmsg;
00201     }
00202     else {
00203         if(cmsg->dest_proc >= 0) {
00204             cmsg->pelist = &procMap[cmsg->dest_proc];
00205             cmsg->npes = 1;
00206         }
00207         else if (cmsg->dest_proc == IS_BROADCAST){
00208 
00209             if(bcast_pemap == NULL) {
00210                 bcast_pemap = new int[npes];
00211                 for(count = 0; count < npes; count ++) {
00212                     bcast_pemap[count] = count;
00213                 }
00214             }
00215 
00216             cmsg->pelist = bcast_pemap;
00217             cmsg->npes = npes;
00218         }
00219         
00220         msgQ.push(cmsg);
00221     }
00222 }
00223 
00224 void RouterStrategy::doneInserting(){
00225     
00226     if(myPe < 0)
00227         CmiAbort("insertMessage: mype < 0\n");
00228 
00229     id.instanceID = getInstance();
00230 
00231     //ComlibPrintf("Instance ID = %d\n", getInstance());
00232     ComlibPrintf("%d: DoneInserting %d \n", CkMyPe(), msgQ.length());
00233     
00234     if(doneFlag == 0) {
00235         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
00236                      CkMyPe());
00237         bufferedDoneInserting = 1;
00238         return;
00239     }
00240     
00241     if(routerID == USE_DIRECT) {
00242         DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
00243         memset((char *)m, 0, sizeof(DummyMsg)); 
00244         m->id.instanceID = getInstance();
00245         
00246         Done(m);
00247         return;
00248     }
00249 
00250     doneFlag = 0;
00251     bufferedDoneInserting = 0;
00252 
00253     id.refno ++;
00254 
00255     if(msgQ.length() == 0) {
00256         if(routerID == USE_DIRECT)
00257             return;
00258 
00259         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
00260         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
00261         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
00262         
00263         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
00264                                                      myPe, 
00265                                                      sizeof(DummyMsg));
00266         cmsg->isDummy = 1;
00267         cmsg->pelist = &myPe;
00268         cmsg->npes = 1;
00269         msgQ.push(cmsg);
00270     }
00271 
00272     router->EachToManyMulticastQ(id, msgQ);
00273 
00274     while(!recvQ.isEmpty()) {
00275         char *msg = recvQ.deq();
00276         RecvManyMsg(msg);
00277     }
00278 
00279     while(!procQ.isEmpty()) {
00280         char *msg = procQ.deq();
00281         ProcManyMsg(msg);
00282     }
00283 
00284     while(!dummyQ.isEmpty() > 0) {
00285         DummyMsg *m = dummyQ.deq();
00286         router->DummyEP(m->id, m->magic);
00287         CmiFree(m);
00288     }
00289 }
00290 
00291 void RouterStrategy::Done(DummyMsg *m){
00292 
00293     ComlibPrintf("%d: Finished iteration\n", CkMyPe());
00294 
00295     if(doneHandle > 0) {
00296         CmiSetHandler(m, doneHandle);
00297         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
00298     }
00299     else
00300         CmiFree(m);
00301 
00302     doneFlag = 1;
00303 
00304     if(bufferedDoneInserting)
00305         doneInserting();
00306 }
00307 
00308 
00309 //Implement it later while implementing checkpointing of Comlib
00310 void RouterStrategy::pup(PUP::er &p){}
00311 
00312 PUPable_def(RouterStrategy);

Generated on Sun Jun 29 13:29:11 2008 for Charm++ by  doxygen 1.5.1