00001
00015 #include "convcomlibmanager.h"
00016 #include "routerstrategy.h"
00017 #include "StreamingStrategy.h"
00018 #include "MeshStreamingStrategy.h"
00019 #include "pipebroadcastconverse.h"
00020 #include "converse.h"
00021
00022 int com_debug=0;
00023
00025 CkpvDeclare(ConvComlibManager, conv_com_object);
00026
00027
00028
00029
00030
00031
00032
00033
00038 CkpvDeclare(int, comlib_handler);
00040 void *strategyHandler(void *msg) {
00041 CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
00042 int instid = conv_header->stratid;
00043
00044 #ifndef CMK_OPTIMIZE
00045
00046 if (instid == 0) {
00047 CmiAbort("Comlib strategy ID is zero, did you forget to initialize a variable?\n");
00048 }
00049 #endif
00050
00051
00052 Strategy *strat = ConvComlibGetStrategy(instid);
00053
00054 strat->handleMessage(msg);
00055 return NULL;
00056 }
00057
00088 CkpvDeclare(int, comlib_ready);
00090 void *comlibReadyHandler(void *msg) {
00091 ComlibPrintf("[%d] Received ready acknowledgement\n",CmiMyPe());
00092 CmiAssert(CkpvAccess(conv_com_object).acksReceived > 0);
00093 if (--CkpvAccess(conv_com_object).acksReceived == 0) {
00094
00095 ComlibPrintf("Strategy table propagation finished\n");
00096 CkpvAccess(conv_com_object).busy = CmiFalse;
00097 if (CkpvAccess(conv_com_object).doneCreatingScheduled) {
00098 CkpvAccess(conv_com_object).doneCreating();
00099 }
00100 }
00101 CmiFree(msg);
00102 return NULL;
00103 }
00104
00108 CkpvDeclare(int, comlib_table_received);
00109
00111 void *comlibTableReceivedHandler(void *msg) {
00112 if (CmiMyPe() == 0) {
00113
00114 if (--CkpvAccess(conv_com_object).acksReceived == 0) {
00115 CkpvAccess(conv_com_object).tableReady();
00116
00117
00118 CkpvAccess(conv_com_object).acksReceived = CmiNumPes() - 1;
00119 CmiSyncBroadcastAndFree(CmiReservedHeaderSize, (char*)msg);
00120 } else {
00121 CmiFree(msg);
00122 }
00123 } else {
00124 CkpvAccess(conv_com_object).tableReady();
00125 CmiSetHandler(msg, CkpvAccess(comlib_ready));
00126 CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
00127 }
00128 return NULL;
00129 }
00130
00134 CkpvDeclare(int, comlib_receive_table);
00135
00137 void *comlibReceiveTableHandler(void *msg) {
00138
00139 ComlibPrintf("Received new strategy table\n");
00140 StrategyWrapper sw;
00141 PUP::fromMem pm(((char*)msg)+CmiReservedHeaderSize);
00142 pm|sw;
00143
00144
00145 for (int i=0; i<sw.nstrats; ++i) {
00146 Strategy *current = CkpvAccess(conv_com_object).getStrategy(sw.position[i]);
00147 if (sw.replace[i] && current != NULL) {
00148
00149 delete current;
00150 current = NULL;
00151 CkpvAccess(conv_com_object).decrementNumStrats();
00152 }
00153 if (current == NULL) {
00154
00155
00156 CkpvAccess(conv_com_object).setStrategy(sw.position[i], sw.strategy[i]);
00157 CkpvAccess(conv_com_object).incrementNumStrats();
00158 } else {
00159
00160 delete sw.strategy[i];
00161 }
00162 CkpvAccess(conv_com_object).inSync(sw.position[i]);
00163 }
00164
00165
00166 CmiSetHandler(msg, CkpvAccess(comlib_table_received));
00167 CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
00168 return NULL;
00169 }
00170
00171
00172
00173
00174
00175
00176
00177 ConvComlibManager::ConvComlibManager(): strategyTable(MAX_NUM_STRATS+1){
00178 nstrats = 0;
00179 init_flag = CmiFalse;
00180 acksReceived = 0;
00181 doneCreatingScheduled = CmiFalse;
00182 busy = CmiFalse;
00183 }
00184
00188 int ConvComlibManager::insertStrategy(Strategy *s) {
00189
00190
00191 if(nstrats >= MAX_NUM_STRATS)
00192 CmiAbort("Too Many strategies\n");
00193
00194 int index = ++nstrats;
00195 StrategyTableEntry &st = strategyTable[index];
00196
00197 if(st.strategy != NULL) CmiAbort("Trying to insert a strategy over another one!");
00198
00199 st.strategy = s;
00200 st.isNew = 1;
00201 st.bracketedSetupFinished = 2*CkNumPes();
00202
00203
00204
00205
00206
00207
00208
00209
00210 return index;
00211 }
00212
00213 void ConvComlibManager::doneCreating() {
00214 ComlibPrintf("Called doneCreating\n");
00215 if (busy) {
00216
00217 doneCreatingScheduled = CmiTrue;
00218 return;
00219 }
00220
00221 busy = CmiTrue;
00222 acksReceived = CmiNumPes() - 1;
00223 int count = 0;
00224 for (int i=1; i<=nstrats; ++i) {
00225 if (strategyTable[i].isNew) {
00226 count++;
00227 }
00228 }
00229
00230 if (count > 0) {
00231
00232 StrategyWrapper sw(count);
00233 count = 0;
00234 for (int i=1; i<=nstrats; ++i) {
00235 if (strategyTable[i].isNew) {
00236 sw.position[count] = i;
00237 sw.replace[count] = CmiFalse;
00238 sw.strategy[count] = strategyTable[i].strategy;
00239 count++;
00240 CkpvAccess(conv_com_object).inSync(i);
00241 }
00242 }
00243
00244
00245 PUP::sizer ps;
00246 ps|sw;
00247 char *msg = (char*)CmiAlloc(ps.size() + CmiReservedHeaderSize);
00248 PUP::toMem pm(msg+CmiReservedHeaderSize);
00249
00250
00251 pm|sw;
00252
00253
00254
00255
00256 CmiSetHandler(msg, CkpvAccess(comlib_receive_table));
00257 CmiSyncBroadcastAndFree(ps.size()+CmiReservedHeaderSize, msg);
00258
00259
00260
00261
00262
00263
00264
00265 } else {
00266 busy = CmiFalse;
00267 }
00268 }
00269
00270 void ConvComlibManager::tableReady() {
00271 for (int i=1; i<strategyTable.size(); ++i) {
00272 if (strategyTable[i].isInSync) {
00273 ComlibPrintf("[%d] ConvComlibManager::tableReady Enabling strategy %d\n",CmiMyPe(),i);
00274 strategyTable[i].isInSync = 0;
00275 enableStrategy(i);
00276 }
00277 }
00278 }
00279
00280 #include "ComlibStrategy.h"
00281
00282 void ConvComlibManager::enableStrategy(int i) {
00283 strategyTable[i].isReady = 1;
00284
00285 MessageHolder *mh;
00286 while ((mh=strategyTable[i].tmplist.deq()) != NULL) {
00287 CharmMessageHolder*cmh = (CharmMessageHolder*)mh;
00288
00289 cmh->sec_id = cmh->copy_of_sec_id;
00290
00291 #if DEBUG_MULTICAST
00292 int nelem =cmh->sec_id->_nElems;
00293 CkPrintf("[%d] enableStrategy() pushing message into strategy %d using copy of sec_id stored when enqueuing message (message=%p nelem=%d)\n",CmiMyPe(),i, mh, nelem);
00294 #endif
00295
00296 strategyTable[i].strategy->insertMessage(mh);
00297
00298 }
00299 for (int j=0; j<strategyTable[i].call_doneInserting; ++j) {
00300 strategyTable[i].strategy->doneInserting();
00301 }
00302 strategyTable[i].call_doneInserting = 0;
00303 }
00304
00305
00306
00309 CkpvDeclare(int, RecvdummyHandle);
00310
00311 void recv_dummy(void *msg){
00312 ComlibPrintf("Received Dummy %d\n", CmiMyPe());
00313 CmiFree(msg);
00314 }
00315
00317
00318 extern void propagate_handler_frag(void *);
00319
00320
00326 void initConvComlibManager(){
00327
00328 if(!CkpvInitialized(conv_com_object))
00329 CkpvInitialize(ConvComlibManager, conv_com_object);
00330
00331
00332 if(CkpvAccess(conv_com_object).getInitialized()) {
00333 CmiPrintf("Comlib initialized more than once!\n");
00334 return;
00335 }
00336
00337 CkpvInitialize(int, RecvdummyHandle);
00338 CkpvAccess(RecvdummyHandle) = CkRegisterHandler((CmiHandler)recv_dummy);
00339
00340 CkpvInitialize(int, comlib_receive_table);
00341 CkpvAccess(comlib_receive_table) = CkRegisterHandler((CmiHandler)comlibReceiveTableHandler);
00342 CkpvInitialize(int, comlib_table_received);
00343 CkpvAccess(comlib_table_received) = CkRegisterHandler((CmiHandler)comlibTableReceivedHandler);
00344 CkpvInitialize(int, comlib_ready);
00345 CkpvAccess(comlib_ready) = CkRegisterHandler((CmiHandler)comlibReadyHandler);
00346
00347
00348
00349
00350 CkpvInitialize(int, RouterRecvHandle);
00351 CkpvAccess(RouterRecvHandle) = CkRegisterHandler((CmiHandler)routerRecvManyCombinedMsg);
00352 CkpvInitialize(int, RouterProcHandle);
00353 CkpvAccess(RouterProcHandle) = CkRegisterHandler((CmiHandler)routerProcManyCombinedMsg);
00354 CkpvInitialize(int, RouterDummyHandle);
00355 CkpvAccess(RouterDummyHandle) = CkRegisterHandler((CmiHandler)routerDummyMsg);
00356
00357
00358 CpvInitialize(int, streaming_handler_id);
00359 CpvAccess(streaming_handler_id) = CmiRegisterHandler(StreamingHandlerFn);
00360
00361
00362 CkpvInitialize(int, streaming_column_handler_id);
00363 CkpvAccess(streaming_column_handler_id) = CkRegisterHandler(streaming_column_handler);
00364
00365
00366 CkpvInitialize(int, pipeline_handler);
00367 CkpvInitialize(int, pipeline_frag_handler);
00368 CkpvAccess(pipeline_handler) = CkRegisterHandler((CmiHandler)PipelineHandler);
00369 CkpvAccess(pipeline_frag_handler) = CkRegisterHandler((CmiHandler)PipelineFragmentHandler);
00370
00371
00372 CkpvInitialize(int, comlib_handler);
00373 CkpvAccess(comlib_handler) = CkRegisterHandler((CmiHandler) strategyHandler);
00374
00375
00376
00377 if (CmiMyRank() == 0) {
00378 PUPable_reg(RouterStrategy);
00379 PUPable_reg(StreamingStrategy);
00380 PUPable_reg(MeshStreamingStrategy);
00381 PUPable_reg(PipeBroadcastConverse);
00382 PUPable_reg(MessageHolder);
00383 }
00384 CkpvAccess(conv_com_object).setInitialized();
00385 }
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402 Strategy *ConvComlibGetStrategy(int loc) {
00403
00404
00405 return CkpvAccess(conv_com_object).getStrategy(loc);
00406 }
00407
00408
00409 void ConvComlibScheduleDoneInserting(int loc) {
00410 CkpvAccess(conv_com_object).getStrategyTable(loc)->call_doneInserting++;
00411 }
00412
00413
00414
00415 void ConvComlibManager::insertMessage(MessageHolder* msg, int instid) {
00416 ComlibPrintf("[%d] enqueuing message for strategy %d in tmplist\n",CmiMyPe(),instid);
00417 #ifndef CMK_OPTIMIZE
00418 if (instid == 0) CmiAbort("Trying to send a message through comlib strategy zero, did you forget to initialize zome variable?\n");
00419 #endif
00420 if (isReady(instid)) {
00421 ComlibPrintf("[%d] insertMessage inserting into strategy\n", CmiMyPe());
00422 strategyTable[instid].strategy->insertMessage(msg);
00423 }
00424 else{
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438 #if DEBUG_MULTICAST
00439 int nelem = ((CharmMessageHolder*)msg)->sec_id->_nElems;
00440 void * si = ((CharmMessageHolder*)msg)->sec_id;
00441 ComlibPrintf("[%d] insertMessage inserting into tmplist with msg=%p si=%p nelem=%d\n", CmiMyPe(), msg, si, nelem);
00442 #endif
00443
00444 ComlibPrintf("[%d] msg=%p\n", CkMyPe(), dynamic_cast<CharmMessageHolder*>(msg));
00445 ((CharmMessageHolder*)msg)->saveCopyOf_sec_id();
00446 ComlibPrintf("[%d] insertMessage inserting into tmplist intid=%d\n", CmiMyPe(), instid);
00447 strategyTable[instid].tmplist.enq(msg);
00448 }
00449
00450 }
00451
00452
00453
00454
00455 void ConvComlibManager::printDiagnostics(){
00456
00457
00458
00459 int ready = 0;
00460 int tmplistTotal = 0;
00461
00462 int size = strategyTable.size();
00463
00464 for(int i=0;i<size;i++){
00465 if(strategyTable[i].isReady){
00466 ready++;
00467
00468 } else {
00469
00470 }
00471
00472 int nmsg = strategyTable[i].tmplist.length();
00473
00474 tmplistTotal += nmsg;
00475
00476 }
00477
00478 if(tmplistTotal>0){
00479 CkPrintf("[%d] %d of %d converse strategies are ready (%d msgs buffered)\n", CkMyPe(), ready, size, tmplistTotal);
00480 }
00481 }
00482
00483
00484
00485
00486