00001
00008 #include <math.h>
00009 #include "pipebroadcastconverse.h"
00010
00011 inline int log_of_2 (int i) {
00012 int m;
00013 for (m=0; i>(1<<m); ++m);
00014 return m;
00015 }
00016
00017
00018 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
00019 return ((const PipeBcastHashKey *)k1)->
00020 compare(*(const PipeBcastHashKey *)k2);
00021 }
00022
00023 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
00024 return ((const PipeBcastHashKey *)v)->hash();
00025 }
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 CkpvDeclare(int, pipeline_handler);
00042 void PipelineFragmentHandler(void *message) {
00043 int instid = CmiGetStrategy(message);
00044 PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
00045 ComlibPrintf("[%d] PipelineFragmentHandler: %d\n",CkMyPe(),instid);
00046
00047 myStrategy->propagate((char*)message, true);
00048 }
00049
00050 CkpvDeclare(int, pipeline_frag_handler);
00056 void PipelineHandler(void *message) {
00057 int instid = CmiGetStrategy(message);
00058 PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
00059 ComlibPrintf("[%d] PipelineHandler: %d\n",CkMyPe(),instid);
00060
00061 myStrategy->propagate((char*)message, false);
00062 }
00063
00064 PipeBroadcastConverse::PipeBroadcastConverse(short _topology, int _pipeSize) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
00065 seqNumber = 0;
00066
00067
00068 ComlibPrintf("[%d] PipeBroadcastConverse constructor: %d %d\n",CkMyPe(),topology, pipeSize);
00069
00070 }
00071
00072 CmiFragmentHeader *PipeBroadcastConverse::getFragmentHeader(char *msg) {
00073 return (CmiFragmentHeader*)(msg+CmiReservedHeaderSize);
00074 }
00075
00076 void PipeBroadcastConverse::propagate(char *msg, int isFragmented) {
00077
00078 int destination, tmp, k;
00079 int num_pes, *dest_pes;
00080
00081
00082
00083
00084 int srcPeNumber, totalSendingSize;
00085 CmiFragmentHeader *frag = NULL;
00086 PipeBcastInfo *info = NULL;
00087 if (isFragmented) {
00088 info = (PipeBcastInfo*)(msg+CmiReservedHeaderSize);
00089 srcPeNumber = info->srcPe;
00090 totalSendingSize = info->messageSize;
00091 } else {
00092 frag = getFragmentHeader(msg);
00093 srcPeNumber = frag->senderPe;
00094 totalSendingSize = frag->msgSize;
00095 }
00096
00097 switch (topology) {
00098 case USE_LINEAR:
00099 if (srcPeNumber == (CkMyPe()+1)%CkNumPes()) break;
00100 destination = (CkMyPe()+1) % CkNumPes();
00101 ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CkMyPe(), destination);
00102 CmiSyncSend(destination, totalSendingSize, msg);
00103 break;
00104 case USE_HYPERCUBE:
00105 tmp = srcPeNumber ^ CkMyPe();
00106 k = log_of_2(CkNumPes()) + 2;
00107 if (tmp) {
00108 do {--k;} while (!(tmp>>k));
00109 }
00110 ComlibPrintf("[%d] tmp=%d, k=%d\n",CkMyPe(),tmp,k);
00111
00112 if (isFragmented) info->srcPe = CkMyPe();
00113 else frag->senderPe = CkMyPe();
00114 dest_pes = (int *)malloc(k*sizeof(int));
00115 --k;
00116 num_pes = HypercubeGetBcastDestinations(CkMyPe(), CkNumPes(), k, dest_pes);
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134 #ifdef CMI_COMLIB_WITH_REFERENCE
00135 for (k=0; k<num_pes; ++k) {
00136
00137 CmiReference(msg);
00138 CmiSyncSendAndFree(dest_pes[k], totalSendingSize, msg);
00139 }
00140 #else
00141 CmiSyncListSend(num_pes, dest_pes, totalSendingSize, msg);
00142 #endif
00143
00144
00145 free(dest_pes);
00146 break;
00147
00148
00149
00150 default:
00151
00152 char error_msg[100];
00153 sprintf(error_msg, "Error, topology %d not known\n",topology);
00154 CmiAbort(error_msg);
00155 }
00156
00157
00158 if (isFragmented) store(msg);
00159 else deliver(msg, totalSendingSize);
00160
00161
00162
00163
00164
00165
00166 }
00167
00168
00169 void PipeBroadcastConverse::store(char* fragment) {
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179 PipeBcastInfo *info = (PipeBcastInfo*)(fragment+CmiReservedHeaderSize);
00180
00181 PipeBcastHashKey key (info->bcastPe, info->seqNumber);
00182 PipeBcastHashObj *position = fragments.get(key);
00183
00184 char *incomingMsg;
00185 if (position) {
00186
00187 ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
00188 incomingMsg = position->message;
00189 memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
00190
00191 if (--position->remaining == 0) {
00192
00193 deliver(incomingMsg, position->dimension);
00194
00195
00196
00197
00198 fragments.remove(key);
00199 delete position;
00200 }
00201
00202 } else {
00203
00204 ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
00205 incomingMsg = (char*)CmiAlloc(info->messageSize);
00206 memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
00207 memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
00208 int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
00209 CmiAssert(remaining > 0);
00210
00211 PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
00212 fragments.put(key) = object;
00213
00214
00215
00216
00217
00218
00219
00220 }
00221 CmiFree(fragment);
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 }
00234
00235 void PipeBroadcastConverse::deliver(char *msg, int dimension) {
00236
00237 CmiFragmentHeader *info = (CmiFragmentHeader*)(msg+CmiReservedHeaderSize);
00238 CmiSetHandler(msg, info->destination);
00239 CmiSyncSendAndFree(CkMyPe(), dimension, msg);
00240
00241
00242
00243
00244
00245
00246
00247
00248 }
00249
00250 void PipeBroadcastConverse::insertMessage(MessageHolder *cmsg){
00251 ComlibPrintf("[%d] PipeBroadcastConverse::insertMessage %d\n",CkMyPe(),topology);
00252 char *msg = cmsg->getMessage();
00253 int size = cmsg->getSize();
00254 if (size < pipeSize) {
00255
00256 CmiSetHandler(msg, CkpvAccess(pipeline_handler));
00257 CmiFragmentHeader *frag = getFragmentHeader(msg);
00258 frag->senderPe = CkMyPe();
00259 frag->msgSize = size;
00260 propagate(msg, false);
00261
00262 } else {
00263
00264
00265 ++seqNumber;
00266 ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),size);
00267
00268 char *sendingMsg;
00269 char *nextChunk = msg+CmiReservedHeaderSize;
00270 int remaining = size-CmiReservedHeaderSize;
00271 int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
00272 int sendingMsgSize;
00273 CmiSetHandler(msg, CkpvAccess(pipeline_frag_handler));
00274
00275
00276 for (int i=0; i<(int)ceil(((double)size-CmiReservedHeaderSize)/reducedPipe); ++i) {
00277 sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipeBcastInfo);
00278 sendingMsg = (char*)CmiAlloc(sendingMsgSize);
00279 memcpy (sendingMsg, msg, CmiReservedHeaderSize);
00280 PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
00281 info->srcPe = CkMyPe();
00282 info->bcastPe = CkMyPe();
00283 info->seqNumber = seqNumber;
00284 info->chunkNumber = i;
00285 info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
00286 info->messageSize = size;
00287 memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, info->chunkSize);
00288
00289 remaining -= info->chunkSize;
00290 nextChunk += info->chunkSize;
00291
00292 propagate(sendingMsg, true);
00293 }
00294
00295 }
00296
00297
00298 delete cmsg;
00299 }
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360 void PipeBroadcastConverse::pup(PUP::er &p){
00361 Strategy::pup(p);
00362 ComlibPrintf("[%d] initial of PipeBroadcastConverse::pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
00363
00364 p | pipeSize;
00365 p | topology;
00366 p | seqNumber;
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383 }
00384
00385 PUPable_def(PipeBroadcastConverse)
00386
00387