00001
00002 #include "routerstrategy.h"
00003
00004 CkpvDeclare(int, RecvHandle);
00005 CkpvDeclare(int, ProcHandle);
00006 CkpvDeclare(int, DummyHandle);
00007
00008
00009
00010
00011
00012 void procManyCombinedMsg(char *msg)
00013 {
00014
00015 int instance_id;
00016
00017 ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
00018
00019
00020
00021 memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
00022 , sizeof(int));
00023
00024 Strategy *s = ConvComlibGetStrategy(instance_id);
00025 ((RouterStrategy *)s)->ProcManyMsg(msg);
00026 }
00027
00028
00029 void dummyEP(DummyMsg *m)
00030 {
00031 Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
00032
00033 ((RouterStrategy *)s)->DummyEP(m);
00034 }
00035
00036
00037 void recvManyCombinedMsg(char *msg)
00038 {
00039
00040 int instance_id;
00041 ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
00042
00043
00044
00045 memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
00046 , sizeof(int));
00047
00048 Strategy *s = ConvComlibGetStrategy(instance_id);
00049 ((RouterStrategy *)s)->RecvManyMsg(msg);
00050 }
00051
00052
00053 void doneHandler(DummyMsg *m){
00054 Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
00055
00056 ((RouterStrategy *)s)->Done(m);
00057 }
00058
00059 void RouterStrategy::setReverseMap(){
00060 int pcount;
00061 for(pcount = 0; pcount < CkNumPes(); pcount++)
00062 procMap[pcount] = -1;
00063
00064
00065 for(pcount = 0; pcount < npes; pcount++) {
00066 if (pelist[pcount] == CkMyPe())
00067 myPe = pcount;
00068
00069 procMap[pelist[pcount]] = pcount;
00070 }
00071 }
00072
00073 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes,
00074 int *_pelist)
00075 : Strategy(){
00076
00077 setType(CONVERSE_STRATEGY);
00078
00079 CkpvInitialize(int, RecvHandle);
00080 CkpvInitialize(int, ProcHandle);
00081 CkpvInitialize(int, DummyHandle);
00082
00083 id.instanceID = 0;
00084
00085 id.isAllToAll = 0;
00086 id.refno = 0;
00087
00088 CkpvAccess(RecvHandle) =
00089 CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
00090 CkpvAccess(ProcHandle) =
00091 CkRegisterHandler((CmiHandler)procManyCombinedMsg);
00092 CkpvAccess(DummyHandle) =
00093 CkRegisterHandler((CmiHandler)dummyEP);
00094
00095 myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);
00096
00097
00098 doneHandle = handle;
00099
00100 routerID = stratid;
00101
00102 npes = _npes;
00103
00104 pelist = _pelist;
00105
00106
00107 if(npes <= 1)
00108 routerID = USE_DIRECT;
00109
00110 myPe = -1;
00111 procMap = new int[CkNumPes()];
00112 setReverseMap();
00113
00114 bcast_pemap = NULL;
00115
00116 ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid,
00117 myPe, npes);
00118
00119 if(myPe < 0) {
00120
00121
00122 doneFlag = 0;
00123 router = NULL;
00124 bufferedDoneInserting = 0;
00125 return;
00126 }
00127
00128
00129 doneFlag = 1;
00130
00131
00132 bufferedDoneInserting = 0;
00133
00134 switch(stratid) {
00135 case USE_TREE:
00136 router = new TreeRouter(npes, myPe);
00137 break;
00138
00139 case USE_MESH:
00140 router = new GridRouter(npes, myPe);
00141 break;
00142
00143 case USE_HYPERCUBE:
00144 router = new DimexRouter(npes, myPe);
00145 break;
00146
00147 case USE_GRID:
00148 router = new D3GridRouter(npes, myPe);
00149 break;
00150
00151 case USE_PREFIX:
00152 router = new PrefixRouter(npes, myPe);
00153 break;
00154
00155 case USE_DIRECT: router = NULL;
00156 break;
00157
00158 default: CmiAbort("Unknown Strategy\n");
00159 break;
00160 }
00161
00162 if(router) {
00163 router->SetMap(pelist);
00164 router->setDoneHandle(myDoneHandle);
00165
00166 }
00167 }
00168
00169
00170 RouterStrategy::~RouterStrategy() {
00171
00172
00173 if(bcast_pemap)
00174 delete [] bcast_pemap;
00175
00176 delete [] procMap;
00177 if(router)
00178 delete router;
00179 }
00180
00181
00182 void RouterStrategy::insertMessage(MessageHolder *cmsg){
00183
00184 if(myPe < 0)
00185 CmiAbort("insertMessage: mype < 0\n");
00186
00187 int count = 0;
00188 if(routerID == USE_DIRECT) {
00189 if(cmsg->dest_proc == IS_BROADCAST) {
00190 for(count = 0; count < cmsg->npes-1; count ++)
00191 CmiSyncSend(cmsg->pelist[count], cmsg->size,
00192 cmsg->getMessage());
00193 if(cmsg->npes > 0)
00194 CmiSyncSendAndFree(cmsg->pelist[cmsg->npes-1], cmsg->size,
00195 cmsg->getMessage());
00196 }
00197 else
00198 CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size,
00199 cmsg->getMessage());
00200 delete cmsg;
00201 }
00202 else {
00203 if(cmsg->dest_proc >= 0) {
00204 cmsg->pelist = &procMap[cmsg->dest_proc];
00205 cmsg->npes = 1;
00206 }
00207 else if (cmsg->dest_proc == IS_BROADCAST){
00208
00209 if(bcast_pemap == NULL) {
00210 bcast_pemap = new int[npes];
00211 for(count = 0; count < npes; count ++) {
00212 bcast_pemap[count] = count;
00213 }
00214 }
00215
00216 cmsg->pelist = bcast_pemap;
00217 cmsg->npes = npes;
00218 }
00219
00220 msgQ.push(cmsg);
00221 }
00222 }
00223
00224 void RouterStrategy::doneInserting(){
00225
00226 if(myPe < 0)
00227 CmiAbort("insertMessage: mype < 0\n");
00228
00229 id.instanceID = getInstance();
00230
00231
00232 ComlibPrintf("%d: DoneInserting %d \n", CkMyPe(), msgQ.length());
00233
00234 if(doneFlag == 0) {
00235 ComlibPrintf("%d:Waiting for previous iteration to Finish\n",
00236 CkMyPe());
00237 bufferedDoneInserting = 1;
00238 return;
00239 }
00240
00241 if(routerID == USE_DIRECT) {
00242 DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
00243 memset((char *)m, 0, sizeof(DummyMsg));
00244 m->id.instanceID = getInstance();
00245
00246 Done(m);
00247 return;
00248 }
00249
00250 doneFlag = 0;
00251 bufferedDoneInserting = 0;
00252
00253 id.refno ++;
00254
00255 if(msgQ.length() == 0) {
00256 if(routerID == USE_DIRECT)
00257 return;
00258
00259 DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
00260 ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
00261 CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
00262
00263 MessageHolder *cmsg = new MessageHolder((char *)dummymsg,
00264 myPe,
00265 sizeof(DummyMsg));
00266 cmsg->isDummy = 1;
00267 cmsg->pelist = &myPe;
00268 cmsg->npes = 1;
00269 msgQ.push(cmsg);
00270 }
00271
00272 router->EachToManyMulticastQ(id, msgQ);
00273
00274 while(!recvQ.isEmpty()) {
00275 char *msg = recvQ.deq();
00276 RecvManyMsg(msg);
00277 }
00278
00279 while(!procQ.isEmpty()) {
00280 char *msg = procQ.deq();
00281 ProcManyMsg(msg);
00282 }
00283
00284 while(!dummyQ.isEmpty() > 0) {
00285 DummyMsg *m = dummyQ.deq();
00286 router->DummyEP(m->id, m->magic);
00287 CmiFree(m);
00288 }
00289 }
00290
00291 void RouterStrategy::Done(DummyMsg *m){
00292
00293 ComlibPrintf("%d: Finished iteration\n", CkMyPe());
00294
00295 if(doneHandle > 0) {
00296 CmiSetHandler(m, doneHandle);
00297 CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
00298 }
00299 else
00300 CmiFree(m);
00301
00302 doneFlag = 1;
00303
00304 if(bufferedDoneInserting)
00305 doneInserting();
00306 }
00307
00308
00309
00310 void RouterStrategy::pup(PUP::er &p){}
00311
00312 PUPable_def(RouterStrategy);