00001
00022 #include "MulticastStrategy.h"
00023
00024 CkpvExtern(CkGroupID, cmgrID);
00025
00026
00027 MulticastStrategy::MulticastStrategy()
00028 : Strategy(), CharmStrategy() {
00029
00030 ComlibPrintf("MulticastStrategy constructor\n");
00031
00032 setType(ARRAY_STRATEGY);
00033 }
00034
00035
00036 MulticastStrategy::~MulticastStrategy() {
00037
00038 ComlibPrintf("MulticastStrategy destructor\n");
00039
00040 if(getLearner() != NULL)
00041 delete getLearner();
00042
00043 CkHashtableIterator *ht_iterator = sec_ht.iterator();
00044 ht_iterator->seekStart();
00045 while(ht_iterator->hasNext()){
00046 void **data;
00047 data = (void **)ht_iterator->next();
00048 ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
00049 if(obj != NULL)
00050 delete obj;
00051 }
00052 }
00053
00054 #if 0
00055 void rewritePEs(CharmMessageHolder *cmsg){
00056 ComlibPrintf("[%d] rewritePEs insertMessage \n",CkMyPe());
00057
00058 CkAssert(cmsg->dest_proc == IS_SECTION_MULTICAST);
00059
00060 void *m = cmsg->getCharmMessage();
00061 envelope *env = UsrToEnv(m);
00062
00063 ComlibMulticastMsg *msg = (ComlibMulticastMsg *)m;
00064
00065 }
00066 #endif
00067
00068 void MulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
00069
00070 ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n", CkMyPe());
00071
00072
00073
00074 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
00075
00076
00077 if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) {
00078 ComlibPrintf("[%d] Comlib Section Multicast: looking up cur_sec_id\n",CkMyPe());
00079
00080 CkSectionID *sid = cmsg->sec_id;
00081
00082
00083
00084
00085
00086
00087 int cur_sec_id = sid->getSectionID();
00088
00089 if(cur_sec_id > 0) {
00090 sinfo.processOldSectionMessage(cmsg);
00091 ComlibPrintf("Array section id was %d, but now is %d\n", cur_sec_id, sid->getSectionID());
00092 CkAssert(cur_sec_id == sid->getSectionID());
00093
00094 ComlibPrintf("[%d] Comlib Section Multicast: insertMessage: cookiePE=%d\n",CkMyPe(),sid->_cookie.get_pe());
00095 ComlibSectionHashKey key(CkMyPe(), cur_sec_id);
00096 ComlibSectionHashObject *obj = sec_ht.get(key);
00097
00098 if(obj == NULL) {
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 }
00111
00112
00113
00114
00115
00116
00117 if (obj != NULL && CkMyPe() == sid->_cookie.get_pe() && !obj->isOld) {
00118 envelope *env = UsrToEnv(cmsg->getCharmMessage());
00119 localMulticast(env, obj, (CkMcastBaseMsg*)cmsg->getCharmMessage());
00120 remoteMulticast(env, obj);
00121
00122 delete cmsg;
00123 return;
00124 }
00125 }
00126
00127
00128
00129
00130 ComlibPrintf("[%d] MulticastStrategy, creating a new multicast path\n", CkMyPe());
00131
00132
00133 ComlibMulticastMsg *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting(), getInstance());
00134
00135
00136 ComlibSectionHashObject *obj = NULL;
00137
00138
00139
00140 if(newmsg !=NULL){
00141
00142 ComlibPrintf("[%d] calling insertSectionID\n", CkMyPe());
00143 ComlibSectionHashObject *obj_inserted = insertSectionID(sid, newmsg->nPes, newmsg->indicesCount);
00144
00145 envelope *newenv = UsrToEnv(newmsg);
00146 CkPackMessage(&newenv);
00147
00148 ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
00149
00150 obj = sec_ht.get(key);
00151 ComlibPrintf("[%d] looking up key sid->_cookie.sInfo.cInfo.id=%d. Found obj=%p\n", CkMyPe(), (int)sid->_cookie.info.sInfo.cInfo.id, obj);
00152 CkAssert(obj_inserted == obj);
00153
00154
00155
00156 if(obj == NULL){
00157 CkPrintf("[%d] WARNING: Cannot Find ComlibRectSectionHashObject object in hash table sec_ht!\n", CkMyPe());
00158 CkAbort("Cannot Find object. sec_ht.get(key)==NULL");
00159
00160 } else {
00161
00162 char *msg = cmsg->getCharmMessage();
00163 localMulticast(UsrToEnv(msg), obj, (CkMcastBaseMsg*)msg);
00164 CkFreeMsg(msg);
00165
00166 if (newmsg != NULL) {
00167 remoteMulticast(UsrToEnv(newmsg), obj);
00168 }
00169
00170 }
00171 }
00172
00173 }
00174 else
00175 CkAbort("Section multicast cannot be used without a section proxy");
00176
00177 delete cmsg;
00178 }
00179
00180 ComlibSectionHashObject * MulticastStrategy::insertSectionID(CkSectionID *sid, int npes, ComlibMulticastIndexCount* pelist) {
00181
00182 ComlibPrintf("[%d] MulticastStrategy:insertSectionID\n",CkMyPe());
00183 ComlibPrintf("[%d] MulticastStrategy:insertSectionID sid->_cookie.sInfo.cInfo.id=%d \n",CkMyPe(), (int)sid->_cookie.info.sInfo.cInfo.id);
00184
00185
00186
00187 ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
00188
00189 ComlibSectionHashObject *obj = NULL;
00190 obj = sec_ht.get(key);
00191
00192 if(obj != NULL) {
00193 ComlibPrintf("MulticastStrategy:insertSectionID: Deleting old object on proc %d for id %d\n",
00194 CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
00195 delete obj;
00196 }
00197
00198 ComlibPrintf("[%d] Creating new ComlibSectionHashObject in insertSectionID\n", CkMyPe());
00199 obj = new ComlibSectionHashObject();
00200 CkArrayID aid(sid->_cookie.get_aid());
00201 sinfo.getLocalIndices(sid->_nElems, sid->_elems, aid, obj->indices);
00202
00203 createObjectOnSrcPe(obj, npes, pelist);
00204 sec_ht.put(key) = obj;
00205 ComlibPrintf("[%d] Inserting object %p into sec_ht\n", CkMyPe(), obj);
00206 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
00207
00208 return obj;
00209
00210
00211 }
00212
00213
00214 extern void CmiReference(void *);
00215
00216
00217
00218 void MulticastStrategy::localMulticast(envelope *env,
00219 ComlibSectionHashObject *obj,
00220 CkMcastBaseMsg *base) {
00221
00222
00223
00224 int nIndices = obj->indices.size();
00225
00226 if(obj->msg != NULL) {
00227 CmiFree(obj->msg);
00228 obj->msg = NULL;
00229 }
00230
00231 ComlibPrintf("[%d] localMulticast nIndices=%d\n", CkMyPe(), nIndices);
00232
00233 if(nIndices > 0) {
00234 void *msg = EnvToUsr(env);
00235 void *msg1 = msg;
00236
00237 msg1 = CkCopyMsg(&msg);
00238
00239 CmiReference(UsrToEnv(msg1));
00240 obj->msg = (void *)UsrToEnv(msg1);
00241
00242 int reply = ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
00243 if (reply > 0) {
00244
00245 CkMcastBaseMsg *errorMsg = sinfo.getNewDeliveryErrorMsg(base);
00246 envelope *errorEnv = UsrToEnv(errorMsg);
00247 CmiSetHandler(errorEnv, CkpvAccess(comlib_handler));
00248 ((CmiMsgHeaderExt *) errorEnv)->stratid = getInstance();
00249 CmiSyncSendAndFree(env->getSrcPe(), errorEnv->getTotalsize(), (char*)errorEnv);
00250 }
00251 }
00252
00253
00254
00255 }
00256
00257
00258
00259
00260
00261 void MulticastStrategy::remoteMulticast(envelope *env,
00262 ComlibSectionHashObject *obj) {
00263
00264
00265
00266 int npes = obj->npes;
00267 int *pelist = obj->pelist;
00268
00269 if(npes == 0) {
00270 CmiFree(env);
00271 return;
00272 }
00273
00274
00275 CmiSetHandler(env, CkpvAccess(comlib_handler));
00276
00277 ((CmiMsgHeaderExt *) env)->stratid = getInstance();
00278
00279
00280 RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
00281
00282 CkPackMessage(&env);
00283
00284
00285 ComlibPrintf("[%d] remoteMulticast Sending to %d PEs: \n", CkMyPe(), npes);
00286 for(int i=0;i<npes;i++){
00287 ComlibPrintf("[%d] %d\n", CkMyPe(), pelist[i]);
00288 }
00289
00290 CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
00291
00292
00293
00294
00295 }
00296
00297 void MulticastStrategy::pup(PUP::er &p){
00298 Strategy::pup(p);
00299 CharmStrategy::pup(p);
00300 }
00301
00302
00303 void MulticastStrategy::handleMessage(void *msg){
00304
00305
00306
00307
00308 envelope *env = (envelope *)msg;
00309 RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
00310
00311
00312 CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
00313 if (cbmsg->magic != _SECTION_MAGIC) CkAbort("MulticastStrategy received bad message! Did you forget to inherit from CkMcastBaseMsg?\n");
00314
00315 int status = cbmsg->_cookie.info.sInfo.cInfo.status;
00316 ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
00317
00318 if(status == COMLIB_MULTICAST_NEW_SECTION)
00319 handleNewMulticastMessage(env);
00320 else if (status == COMLIB_MULTICAST_SECTION_ERROR) {
00321
00322
00323
00324
00325 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
00326 cbmsg->_cookie.info.sInfo.cInfo.id);
00327
00328 ComlibSectionHashObject *obj;
00329 obj = sec_ht.get(key);
00330
00331 if(obj == NULL)
00332 CkAbort("Destination indices is NULL\n");
00333
00334
00335 obj->isOld = 1;
00336 } else if (status == COMLIB_MULTICAST_OLD_SECTION) {
00337
00338 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
00339 cbmsg->_cookie.info.sInfo.cInfo.id);
00340
00341 ComlibSectionHashObject *obj;
00342 obj = sec_ht.get(key);
00343
00344 if(obj == NULL)
00345 CkAbort("Destination indices is NULL\n");
00346
00347 localMulticast(env, obj, cbmsg);
00348 remoteMulticast(env, obj);
00349 } else {
00350 CkAbort("Multicast message status is zero\n");
00351 }
00352
00353
00354
00355 }
00356
00357
00358 void MulticastStrategy::handleNewMulticastMessage(envelope *env) {
00359
00360
00361
00362 ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
00363 ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
00364
00365 CkUnpackMessage(&env);
00366
00367 int localElems;
00368 envelope *newenv;
00369 CkArrayIndex *local_idx_list;
00370
00371
00372 sinfo.unpack(env, localElems, local_idx_list, newenv);
00373
00374 ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
00375 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
00376 cbmsg->_cookie.info.sInfo.cInfo.id);
00377
00378 ComlibSectionHashObject *old_obj = NULL;
00379
00380 old_obj = sec_ht.get(key);
00381 if(old_obj != NULL) {
00382 delete old_obj;
00383 }
00384
00385
00386
00387
00388
00389
00390
00391 ComlibPrintf("[%d] Creating new ComlibSectionHashObject in handleNewMulticastMessage\n", CkMyPe());
00392 ComlibSectionHashObject *new_obj = new ComlibSectionHashObject();
00393 new_obj->indices.resize(0);
00394 for (int i=0; i<localElems; ++i) new_obj->indices.insertAtEnd(local_idx_list[i]);
00395
00396 createObjectOnIntermediatePe(new_obj, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.get_pe());
00397
00398 ComlibPrintf("[%d] Inserting object into sec_ht\n", CkMyPe());
00399 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
00400
00401 sec_ht.put(key) = new_obj;
00402
00403
00404
00405
00406
00407
00408
00409 localMulticast(newenv, new_obj, cbmsg);
00410 remoteMulticast(env, new_obj);
00411 CmiFree(newenv);
00412
00413 }
00414