
00001 // #ifdef filippo 00002 00003 // #include "NodeMulticast.h" 00004 // #include "converse.h" 00005 00006 // #define MAX_BUF_SIZE 165000 00007 // #define MAX_SENDS_PER_BATCH 16 00008 // #define MULTICAST_DELAY 5 00009 00010 // static NodeMulticast *nm_mgr; 00011 00012 // static void call_doneInserting(void *ptr,double curWallTime){ 00013 // NodeMulticast *mgr = (NodeMulticast *)ptr; 00014 // mgr->doneInserting(); 00015 // } 00016 00017 // static void* NodeMulticastHandler(void *msg){ 00018 // ComlibPrintf("In Node MulticastHandler\n"); 00019 // nm_mgr->recvHandler(msg); 00020 // return NULL; 00021 // } 00022 00023 // static void* NodeMulticastCallbackHandler(void *msg){ 00024 // ComlibPrintf("[%d]:In Node MulticastCallbackHandler\n", CkMyPe()); 00025 // register envelope *env = (envelope *)msg; 00026 // CkUnpackMessage(&env); 00027 // //nm_mgr->getCallback().send(EnvToUsr(env)); 00028 00029 // //nm_mgr->getHandler()(env); 00030 // return NULL; 00031 // } 00032 00033 // //Handles multicast by sending only one message to a nodes and making 00034 // //them multicast locally 00035 // void NodeMulticast::setDestinationArray(CkArrayID a, int nelem, 00036 // CkArrayIndex **idx, int ep){ 00037 00038 // mode = ARRAY_MODE; 00039 // messageBuf = NULL; 00040 // pes_per_node = 4; 00041 // if(getenv("RMS_NODES") != NULL) 00042 // pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES")); 00043 00044 // mAid = a; 00045 // nelements = nelem; 00046 // entryPoint = ep; 00047 00048 // numNodes = CkNumPes()/pes_per_node; 00049 // numCurDestPes = CkNumPes(); 00050 // myRank = 0; 00051 // nodeMap = new int[numNodes]; 00052 00053 // ComlibPrintf("In SetDestinationArray %d, %d, %d, %d\n", numNodes, 00054 // pes_per_node, nelements, ep); 00055 00056 // indexVec = new CkVec<CkArrayIndex> [CkNumPes()]; 00057 00058 // for(int count = 0; count < nelements; count++) { 00059 // ComlibPrintf("Before lastKnown %d\n", count); 00060 // int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(*idx[count]); 00061 // ComlibPrintf("After lastKnown %d\n", dest_proc); 00062 // nodeMap[dest_proc/pes_per_node] = 1; 00063 00064 // indexVec[dest_proc].insertAtEnd(*idx[count]); 00065 // } 00066 00067 // ComlibPrintf("After SetDestinationArray\n"); 00068 // } 00069 // /* 00070 // void NodeMulticast::setPeList(int npes, int *pelist, ComlibMulticastHandler handler){ 00071 // mode = PROCESSOR_MODE; 00072 // messageBuf = NULL; 00073 // pes_per_node = 4; 00074 // //if(getenv("RMS_NODES") != NULL) 00075 // //pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES")); 00076 00077 // //cb = callback; 00078 // this->handler = (long)handler; 00079 00080 // numNodes = CkNumPes()/pes_per_node; 00081 // numCurDestPes = npes; 00082 00083 // myRank = 0; 00084 // nodeMap = new int[numNodes]; 00085 00086 // this->npes = npes; 00087 // this->pelist = new int[npes]; 00088 // memcpy(this->pelist, pelist, npes * sizeof(int)); 00089 00090 // ComlibPrintf("In setPeList %d, %d, %d\n", numNodes, 00091 // pes_per_node, npes); 00092 00093 // for(int count = 0; count < npes; count++) 00094 // nodeMap[pelist[count]/pes_per_node] = 1; 00095 00096 // ComlibPrintf("After setPeList\n"); 00097 // } 00098 // */ 00099 00100 // void NodeMulticast::recvHandler(void *msg) { 00101 // register envelope* env = (envelope *)msg; 00102 // void *charm_msg = (void *)EnvToUsr(env); 00103 00104 // env->setUsed(0); 00105 // ComlibPrintf("In receive Handler\n"); 00106 // if(mode == ARRAY_MODE) { 00107 // env->getsetArrayMgr()=mAid; 00108 // env->getsetArrayEp()=entryPoint; 00109 // env->getsetArrayHops()=0; 00110 // CkUnpackMessage(&env); 00111 00112 // for(int count = 0; count < pes_per_node; count ++){ 00113 // int dest_pe = (CkMyPe()/pes_per_node) * pes_per_node + count; 00114 // int size = indexVec[dest_pe].size(); 00115 00116 // ComlibPrintf("[%d], %d elements to send to %d of size %d\n", CkMyPe(), size, dest_pe, env->getTotalsize()); 00117 00118 // CkArrayIndex * idx_arr = indexVec[dest_pe].getVec(); 00119 // for(int itr = 0; itr < size; itr ++) { 00120 // void *newcharmmsg = CkCopyMsg(&charm_msg); 00121 // envelope* newenv = UsrToEnv(newcharmmsg); 00122 // CProxyElement_ArrayBase ap(mAid, idx_arr[itr]); 00123 // newenv->getsetArrayIndex()=idx_arr[itr]; 00124 // ap.ckSend((CkArrayMessage *)newcharmmsg, entryPoint); 00125 // } 00126 // } 00127 // } 00128 // else { 00129 // CkUnpackMessage(&env); 00130 // for(int count = 0; count < pes_per_node; count++) 00131 // if(validRank[count]){ 00132 // void *newcharmmsg; 00133 // envelope* newenv; 00134 00135 // if(count < pes_per_node - 1) { 00136 // newcharmmsg = CkCopyMsg(&charm_msg); 00137 // newenv = UsrToEnv(newcharmmsg); 00138 // } 00139 // else { 00140 // newcharmmsg = charm_msg; 00141 // newenv = UsrToEnv(newcharmmsg); 00142 // } 00143 00144 // CmiSetHandler(newenv, NodeMulticastCallbackHandlerId); 00145 // ComlibPrintf("[%d] In receive Handler (proc mode), sending message to %d at handler %d\n", 00146 // CkMyPe(), (CkMyPe()/pes_per_node) * pes_per_node 00147 // + count, NodeMulticastCallbackHandlerId); 00148 00149 // CkPackMessage(&newenv); 00150 // CmiSyncSendAndFree((CkMyPe()/pes_per_node) *pes_per_node + count, 00151 // newenv->getTotalsize(), (char *)newenv); 00152 // } 00153 // } 00154 // ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(), 00155 // (long) msg - 2*sizeof(int)); 00156 // //CmiFree(msg); 00157 // } 00158 00159 // void NodeMulticast::insertMessage(CharmMessageHolder *cmsg){ 00160 00161 // ComlibPrintf("In insertMessage \n"); 00162 // envelope *env = UsrToEnv(cmsg->getCharmMessage()); 00163 00164 // CmiSetHandler(env, NodeMulticastHandlerId); 00165 // messageBuf->enq(cmsg); 00166 // } 00167 00168 // void NodeMulticast::doneInserting(){ 00169 // CharmMessageHolder *cmsg; 00170 // char *msg; 00171 // register envelope *env; 00172 00173 // ComlibPrintf("NodeMulticast :: doneInserting\n"); 00174 00175 // if(messageBuf->length() > 1) { 00176 // //CkPrintf("NodeMulticast :: doneInserting length > 1\n"); 00177 // /* 00178 // char **msgComps; 00179 // int *sizes, msg_count; 00180 00181 // msgComps = new char*[messageBuf->length()]; 00182 // sizes = new int[messageBuf->length()]; 00183 // msg_count = 0; 00184 // while (!messageBuf->isEmpty()) { 00185 // cmsg = messageBuf->deq(); 00186 // msg = cmsg->getCharmMessage(); 00187 // env = UsrToEnv(msg); 00188 // sizes[msg_count] = env->getTotalsize(); 00189 // msgComps[msg_count] = (char *)env; 00190 // msg_count++; 00191 00192 // delete cmsg; 00193 // } 00194 00195 // for(int count = 0; count < numNodes; count++) 00196 // if(nodeMap[count]) 00197 // CmiMultipleSend(count * pes_per_node + myRank, msg_count, 00198 // sizes, msgComps); 00199 00200 // delete [] msgComps; 00201 // delete [] sizes; 00202 // */ 00203 // } 00204 // else if (messageBuf->length() == 1){ 00205 // static int prevCount = 0; 00206 // int count = 0; 00207 // ComlibPrintf("Sending Node Multicast\n"); 00208 // cmsg = messageBuf->deq(); 00209 // msg = cmsg->getCharmMessage(); 00210 // env = UsrToEnv(msg); 00211 00212 // if(mode == ARRAY_MODE) 00213 // env->getsetArraySrcPe()=CkMyPe(); 00214 // CkPackMessage(&env); 00215 00216 // CmiSetHandler(env, NodeMulticastHandlerId); 00217 // ComlibPrintf("After set handler\n"); 00218 00219 // //CmiPrintf("cursedtpes = %d, %d\n", cmsg->npes, numCurDestPes); 00220 00221 // if((mode != ARRAY_MODE) && cmsg->npes < numCurDestPes) { 00222 // numCurDestPes = cmsg->npes; 00223 // for(count = 0; count < numNodes; count++) 00224 // nodeMap[count] = 0; 00225 00226 // for(count = 0; count < cmsg->npes; count++) 00227 // nodeMap[(cmsg->pelist[count])/pes_per_node] = 1; 00228 // } 00229 00230 // for(count = prevCount; count < numNodes; count++) { 00231 // //int dest_node = count; 00232 // int dest_node = (count + (CkMyPe()/pes_per_node))%numNodes; 00233 // if(nodeMap[dest_node]) { 00234 // void *newcharmmsg; 00235 // envelope* newenv; 00236 00237 // if(count < numNodes - 1) { 00238 // newcharmmsg = CkCopyMsg((void **)&msg); 00239 // newenv = UsrToEnv(newcharmmsg); 00240 // } 00241 // else { 00242 // newcharmmsg = msg; 00243 // newenv = UsrToEnv(newcharmmsg); 00244 // } 00245 00246 // ComlibPrintf("[%d]In cmisyncsend to %d\n", CkMyPe(), 00247 // dest_node * pes_per_node + myRank); 00248 // #if CMK_PERSISTENT_COMM 00249 // if(env->getTotalsize() < MAX_BUF_SIZE) 00250 // CmiUsePersistentHandle(&persistentHandlerArray[dest_node],1); 00251 // #endif 00252 // CkPackMessage(&newenv); 00253 // CmiSyncSendAndFree(dest_node * pes_per_node + myRank, 00254 // newenv->getTotalsize(), (char *)newenv); 00255 // #if CMK_PERSISTENT_COMM 00256 // if(env->getTotalsize() < MAX_BUF_SIZE) 00257 // CmiUsePersistentHandle(NULL, 0); 00258 // #endif 00259 // } 00260 // prevCount ++; 00261 // if((prevCount % MAX_SENDS_PER_BATCH == 0) && 00262 // (prevCount != numNodes)) { 00263 // CcdCallFnAfterOnPE((CcdVoidFn)call_doneInserting, (void *)this, 00264 // MULTICAST_DELAY, CkMyPe()); 00265 // return; 00266 // } 00267 // prevCount = 0; 00268 // } 00269 00270 // ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(), (char *)env - 2*sizeof(int)); 00271 // //CmiFree(env); 00272 // delete cmsg; 00273 // } 00274 // } 00275 00276 // void NodeMulticast::pup(PUP::er &p){ 00277 00278 // CharmStrategy::pup(p); 00279 00280 // p | pes_per_node; 00281 // p | numNodes; 00282 // p | nelements; 00283 // p | entryPoint; 00284 // p | npes; 00285 // p | mode; 00286 // p | numCurDestPes; 00287 // p | mAid; 00288 00289 // if(p.isUnpacking()) { 00290 // nodeMap = new int[numNodes]; 00291 00292 // if(mode == ARRAY_MODE) { 00293 // typedef CkVec<CkArrayIndex> CkVecArrayIndex; 00294 // CkVecArrayIndex *vec = new CkVecArrayIndex[CkNumPes()]; 00295 // indexVec = vec; 00296 // } 00297 00298 // if(mode == PROCESSOR_MODE) 00299 // pelist = new int[npes]; 00300 // } 00301 00302 // p | cb; 00303 // p | handler; 00304 // p(nodeMap, numNodes); 00305 00306 // if(mode == PROCESSOR_MODE) 00307 // p(pelist, npes); 00308 00309 // if(mode == ARRAY_MODE) 00310 // for(int count = 0; count < CkNumPes(); count++) 00311 // p | indexVec[count]; 00312 00313 // if(p.isUnpacking()) { 00314 // messageBuf = new CkQ <CharmMessageHolder *>; 00315 // myRank = CkMyPe() % pes_per_node; 00316 00317 // NodeMulticastHandlerId = CkRegisterHandler((CmiHandler)NodeMulticastHandler); 00318 // NodeMulticastCallbackHandlerId = CkRegisterHandler 00319 // ((CmiHandler)NodeMulticastCallbackHandler); 00320 00321 // nm_mgr = this; 00322 00323 // //validRank[0] = validRank[1] = validRank[2] = validRank[3] = 0; 00324 // memset(validRank, 0, MAX_PES_PER_NODE * sizeof(int)); 00325 // for(int count = 0; count < npes; count ++){ 00326 // if(CkMyPe()/pes_per_node == pelist[count] / pes_per_node) 00327 // validRank[pelist[count] % pes_per_node] = 1; 00328 // } 00329 00330 // #if CMK_PERSISTENT_COMM 00331 // persistentHandlerArray = new PersistentHandle[numNodes]; 00332 // for(int count = 0; count < numNodes; count ++) 00333 // //if(nodeMap[count]) 00334 // persistentHandlerArray[count] = CmiCreatePersistent 00335 // (count * pes_per_node + myRank, MAX_BUF_SIZE); 00336 // #endif 00337 // } 00338 // } 00339 00340 // //PUPable_def(NodeMulticast); 00341 00342 // #endif
1.5.5