00001
00009 #include "ChunkMulticastStrategy.h"
00010 #include <string>
00011 #include <set>
00012 #include <vector>
00013 #include "queueing.h"
00014 #include "ck.h"
00015 #include "spanningTreeStrategy.h"
00016
00017 #define DEBUG 0
00018 #define CHUNK_LL 2048 //minimum chunk size
00019
00020 CkpvExtern(CkGroupID, cmgrID);
00021
00022 ChunkMulticastStrategy::ChunkMulticastStrategy()
00023 : Strategy(), CharmStrategy() {
00024
00025 setType(ARRAY_STRATEGY);
00026
00027
00028 sentCount = 0;
00029 }
00030
00031 ChunkMulticastStrategy::~ChunkMulticastStrategy() {
00032 }
00033
00034 void ChunkMulticastStrategy::pup(PUP::er &p){
00035 Strategy::pup(p);
00036 CharmStrategy::pup(p);
00037 }
00038
00039
00041 void ChunkMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
00042 #if DEBUG
00043 CkPrintf("[%d] ChunkMulticastStrategy::insertMessage\n", CkMyPe());
00044 fflush(stdout);
00045 #endif
00046
00047 if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) {
00048 CkAbort("ChunkMulticastStrategy can only be used with an array section proxy");
00049 }
00050
00051
00052 envelope *env = UsrToEnv(cmsg->getCharmMessage());
00053 int npes = 1;
00054 int pes[1] = {0};
00055
00056
00057
00058
00059 const CkArrayID destArrayID(env->getsetArrayMgr());
00060 int nRemotePes=-1, nRemoteIndices=-1;
00061 ComlibMulticastIndexCount *indicesCount;
00062 int *belongingList;
00063 sinfo.getPeCount(cmsg->sec_id->_nElems, cmsg->sec_id->_elems, destArrayID, nRemotePes, nRemoteIndices, indicesCount, belongingList);
00064
00065
00066 delete [] belongingList;
00067 delete [] indicesCount;
00068
00069 #if DEBUG
00070 CkPrintf("[%d] after TRACE_CREATION_MULTICAST menv->event=%d\n", CkMyPe(), (int)env->getEvent());
00071 #endif
00072
00073
00074 CkUnpackMessage(&env);
00075
00076 int totalSize = cmsg->getSize() -sizeof(envelope) - env->getPrioBytes();
00077 int numChunks;
00078 if (totalSize/CHUNK_LL < nRemotePes) numChunks = totalSize/CHUNK_LL;
00079 else numChunks = nRemotePes;
00080 if (numChunks == 0) numChunks = 1;
00081 int chunkSize, sendingSize;
00082 char **sendingMsgArr = new char*[numChunks];
00083 char *sendingMsg;
00084 envelope *envchunk;
00085 CharmMessageHolder *holder;
00086 ComlibMulticastMsg *commsg;
00087 ChunkInfo *info;
00088
00089
00090 sentCount++;
00091
00092
00093 for(int i = 0; i < numChunks; i++){
00094 chunkSize = totalSize / numChunks;
00095 if (i < totalSize % numChunks)
00096 chunkSize++;
00097 chunkSize = CkMsgAlignLength(chunkSize);
00098 sendingSize = chunkSize+CkMsgAlignLength(sizeof(ChunkInfo));
00099 sendingMsg = (char*)CkAllocBuffer(EnvToUsr(env), sendingSize);
00100 info = (ChunkInfo*)(sendingMsg);
00101 info->srcPe = CkMyPe();
00102 info->chunkNumber = i;
00103 info->numChunks = numChunks;
00104 info->chunkSize = chunkSize;
00105
00106 info->idx = sentCount;
00107 sendingMsgArr[i] = sendingMsg;
00108 }
00109
00110
00111 CkPackMessage(&env);
00112 char *nextChunk = (char*)EnvToUsr(env);
00113 for(int i = 0; i < numChunks; i++){
00114 sendingMsg = sendingMsgArr[i];
00115 info = (ChunkInfo*)(sendingMsg);
00116 CmiMemcpy(sendingMsg+CkMsgAlignLength(sizeof(ChunkInfo)), nextChunk, info->chunkSize);
00117 envchunk = UsrToEnv(sendingMsg);
00118
00119 nextChunk += info->chunkSize;
00120
00121 CkPackMessage(&envchunk);
00122 envchunk->setPacked(1);
00123
00124 holder = cmsg;
00125 holder->data = (char*)envchunk;
00126 holder->size = envchunk->getTotalsize();
00127
00128 _TRACE_CREATION_MULTICAST(envchunk, npes, pes);
00129
00130 commsg = sinfo.getNewMulticastMessage(holder, 0, getInstance());
00131
00132 envchunk = UsrToEnv(commsg);
00133
00134
00135 remoteMulticast(commsg, true, i, numChunks);
00136
00137 }
00138
00139
00140 #if DEBUG
00141
00142 #endif
00143
00144
00145 cmsg->data = (char*)env;
00146 cmsg->size = env->getTotalsize();
00147 localMulticast(cmsg);
00148
00149 for (int i = 0; i < numChunks; i++){
00150 CmiFree(UsrToEnv(sendingMsgArr[i]));
00151 }
00152 delete [] sendingMsgArr;
00153 delete cmsg;
00154 }
00155
00156
00157
00159 void ChunkMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
00160 double start = CmiWallTimer();
00161 CkSectionID *sec_id = cmsg->sec_id;
00162 CkVec< CkArrayIndex > localIndices;
00163 CkArrayID aid(sec_id->_cookie.get_aid());
00164 sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, aid, localIndices);
00165 deliverToIndices(cmsg->getCharmMessage(), localIndices.size(), localIndices.getVec() );
00166
00167
00168 traceUserBracketEvent(10000, start, CmiWallTimer());
00169 }
00170
00171
00172
00173
00174
00179 void ChunkMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE, int chunkNumber, int numChunks) {
00180 double start = CmiWallTimer();
00181
00182 envelope *env = UsrToEnv(multMsg);
00183
00184
00186 int myIndex = -10000;
00187 const int totalDestPEs = multMsg->nPes;
00188 const int myPe = CkMyPe();
00189
00190
00191 if(rootPE){
00192 myIndex = -1;
00193 } else {
00194 for (int i=0; i<totalDestPEs; ++i) {
00195 if(multMsg->indicesCount[i].pe == myPe){
00196 myIndex = i;
00197 break;
00198 }
00199 }
00200 }
00201
00202 if(myIndex == -10000)
00203 CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
00204
00205 int npes;
00206 int *pelist = NULL;
00207
00208 if(totalDestPEs > 0)
00209 determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes, chunkNumber, numChunks );
00210 else {
00211 npes = 0;
00212 }
00213
00214 if(npes == 0) {
00215 #if DEBUG
00216 CkPrintf("[%d] ChunkMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
00217 #endif
00218 traceUserBracketEvent(10001, start, CmiWallTimer());
00219 CmiFree(env);
00220 return;
00221 }
00222
00223
00224 RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
00225
00226
00227 CmiSetHandler(env, CkpvAccess(comlib_handler));
00228 ((CmiMsgHeaderExt *) env)->stratid = getInstance();
00229 CkPackMessage(&env);
00230 double middle = CmiWallTimer();
00231
00232
00233
00234
00235 #if DEBUG
00236 CkPrintf("[%d] remoteMulticast Sending to %d PEs: numChunks = %d\n", CkMyPe(), npes, numChunks);
00237 for(int i=0;i<npes;i++){
00238 CkPrintf("[%d] %d\n", CkMyPe(), pelist[i]);
00239 }
00240 #endif
00241
00242 CkAssert(npes > 0);
00243 CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
00244
00245 delete[] pelist;
00246
00247 double end = CmiWallTimer();
00248 traceUserBracketEvent(10001, start, middle);
00249 traceUserBracketEvent(10002, middle, end);
00250
00251 }
00252
00257 void ChunkMulticastStrategy::handleMessage(void *msg){
00258 #if DEBUG
00259
00260 #endif
00261 envelope *env = (envelope *)msg;
00262
00263
00264 CkUnpackMessage(&env);
00265
00266
00267
00268
00269
00270 ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
00271
00272
00273
00274 RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
00275
00276
00277 int localElems;
00278 envelope *newenv;
00279 CkArrayIndex *local_idx_list;
00280 sinfo.unpack(env, localElems, local_idx_list, newenv);
00281 ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);
00282
00283 ChunkInfo *inf = (ChunkInfo*)newmsg;
00284 std::list< recvBuffer* >::iterator iter;
00285 recvBuffer* buf;
00286 int nrecv = -1;
00287 int cnumber = inf->chunkNumber;
00288 int numChunks;
00289 envelope** recvChunks;
00290 for (iter=recvList.begin(); iter != recvList.end(); iter++){
00291 buf = *iter;
00292 if (inf->srcPe == buf->srcPe && inf->idx == buf->idx){
00293 buf->nrecv++;
00294 nrecv = buf->nrecv;
00295 numChunks = buf->numChunks;
00296 recvChunks = buf->recvChunks;
00297 if (nrecv == numChunks){
00298 delete buf;
00299 recvList.erase(iter);
00300 }
00301 break;
00302 }
00303 }
00304 if ( nrecv == -1){
00305 numChunks = inf->numChunks;
00306 nrecv = 1;
00307 recvChunks = new envelope*[inf->numChunks];
00308 if (numChunks > 1){
00309 buf = new recvBuffer();
00310 buf->numChunks = inf->numChunks;
00311 buf->srcPe = inf->srcPe;
00312 buf->idx = inf->idx;
00313 buf->nrecv = 1;
00314 buf->recvChunks = recvChunks;
00315 recvList.push_back(buf);
00316 }
00317 }
00318 #if DEBUG
00319 CkPrintf("proc %d received %d chunks out of %d for message idx %d src %d chunk # = %d\n", CkMyPe(), nrecv, numChunks, inf->idx, inf->srcPe, inf->chunkNumber);
00320 #endif
00321 recvChunks[inf->chunkNumber] = newenv;
00322 if (nrecv == numChunks){
00323 void *wholemsg;
00324 int totalSize = 0;
00325 ChunkInfo* cinfo;
00326 for (int i = 0; i < numChunks; i++){
00327 cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
00328 totalSize += cinfo->chunkSize;
00329 }
00330 wholemsg = CkAllocBuffer(newmsg, totalSize);
00331 cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[0]));
00332 int offset = 0;
00333 for (int i = 0; i < numChunks; i++){
00334 cinfo = (ChunkInfo*)(EnvToUsr(recvChunks[i]));
00335 CmiMemcpy(((char*)wholemsg)+offset, ((char*)cinfo)+CkMsgAlignLength(sizeof(ChunkInfo)), cinfo->chunkSize);
00336 offset += cinfo->chunkSize;
00337 }
00338 envelope *envc = UsrToEnv(wholemsg);
00339 envc->setPacked(1);
00340 CkUnpackMessage(&envc);
00341 ComlibMulticastMsg *cmmsg = (ComlibMulticastMsg *)EnvToUsr(envc);
00342 deliverToIndices(cmmsg, localElems, local_idx_list );
00343 for (int i = 0; i < numChunks; i++){
00344 CmiFree(recvChunks[i]);
00345 }
00346 delete [] recvChunks;
00347
00348 }
00349
00350 remoteMulticast(multMsg, false, cnumber, numChunks);
00351 if (nrecv == numChunks) {
00352 nrecv = 0;
00353 numChunks = 0;
00354 }
00355 }
00356
00357
00358
00359
00360
00361 void ChunkMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
00362
00363 if(myIndex==-1){
00364
00365
00366 npes = totalDestPEs;
00367
00368 pelist = new int[npes];
00369 for (int i=0; i<npes; ++i) {
00370 pelist[i] = destPEs[i].pe;
00371 }
00372 } else {
00373
00374 npes = 0;
00375 }
00376
00377 }
00378
00379 void ChunkRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
00380 if (myIndex == -1){
00381 npes = 1;
00382 pelist = new int[1];
00383 pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
00384 }
00385 else if (chunkNumber*(totalDestPEs/numChunks) != (myIndex+1) % totalDestPEs){
00386
00387 npes = 1;
00388 pelist = new int[1];
00389 pelist[0] = destPEs[(myIndex+1) % totalDestPEs].pe;
00390 }
00391 else {
00392 npes = 0;
00393 }
00394 }
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406 void ChunkTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
00407 int hop;
00408
00409 if (myIndex == -1){
00410 npes = 1;
00411 pelist = new int[1];
00412 hop = totalDestPEs/numChunks;
00413 pelist[0] = destPEs[chunkNumber*(totalDestPEs/numChunks)].pe;
00414 }
00415 else {
00416 int depth = 1;
00417 int idx = myIndex;
00418
00419 int ipes = totalDestPEs;
00420 while (1){
00421 hop = ipes/numChunks;
00422 if (hop == 0) hop = 1;
00423
00424 if (idx >= hop*(numChunks-1)){
00425 idx = idx - hop*(numChunks-1);
00426 ipes = ipes - hop*(numChunks-1) - 1;
00427 }
00428 else {
00429 idx = idx % hop;
00430 ipes = hop - 1;
00431 }
00432 depth++;
00433 if (idx == 0) break;
00434 else idx--;
00435 }
00436
00437 if ( depth == 2 && ((chunkNumber-1+numChunks)%numChunks)*(totalDestPEs/numChunks) != myIndex){
00438 if (numChunks < ipes) npes = numChunks + 1;
00439 else npes = ipes + 1;
00440 pelist = new int[npes];
00441
00442 if (myIndex == (totalDestPEs/numChunks)*(numChunks-1))
00443 pelist[0] = destPEs[0].pe;
00444 else
00445 pelist[0] = destPEs[(myIndex+(totalDestPEs/numChunks))].pe;
00446 hop = ipes/npes;
00447 if (hop == 0) hop = 1;
00448 for ( int i = 1; i < npes; i++ ){
00449 pelist[i] = destPEs[(i-1)*hop + myIndex + 1].pe;
00450 }
00451 }
00452
00453 else {
00454
00455 if (ipes <= 0){
00456 npes = 0;
00457 return;
00458 }
00459 if (numChunks < ipes) npes = numChunks;
00460 else npes = ipes ;
00461
00462 pelist = new int[npes];
00463 hop = ipes/npes;
00464 if (hop == 0) hop = 1;
00465 for ( int i = 0; i < npes; i++ ){
00466 pelist[i] = destPEs[i*hop + myIndex + 1].pe;
00467 }
00468 }
00469 }
00470 }
00471
00472
00473 void ChunkPipeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes, int chunkNumber, int numChunks) {
00474
00475 int *allpelist;
00476 CkPrintf("myindex = %d\n", myIndex);
00477 if (myIndex == -1) {
00478 allpelist = new int[totalDestPEs+1];
00479 allpelist[0] = CkMyPe();
00480 for (int i = 1; i < totalDestPEs; i++){
00481 allpelist[i] = destPEs[i-1].pe;
00482 }
00483 } else {
00484 allpelist = new int[totalDestPEs];
00485 for (int i = myIndex; i < totalDestPEs + myIndex; i++){
00486 allpelist[i-myIndex] = destPEs[i%totalDestPEs].pe;
00487 }
00488 }
00489 topo::SpanningTreeVertex *nextGenInfo;
00490 nextGenInfo = topo::buildSpanningTreeGeneration(allpelist, allpelist + totalDestPEs, degree);
00491 npes = nextGenInfo->childIndex.size();
00492 pelist = new int[npes];
00493 for (int i = 0; i < npes; i++){
00494 pelist[i] = nextGenInfo->childIndex[i];
00495 }
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546 }
00547