
00001 00007 #include "PipeBroadcastStrategy.h" 00008 00009 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize) 00010 : PipeBroadcastConverse(_topology, _pipeSize), CharmStrategy() { 00011 ComlibPrintf("Creating charm pipebcast (%x)\n",this); 00012 setType(ARRAY_STRATEGY); 00013 ainfo.setDestinationArray(_aid); 00014 //commonInit(_topology, _pipeSize); 00015 } 00016 00017 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize) 00018 : PipeBroadcastConverse(_topology, _pipeSize), CharmStrategy() { 00019 setType(GROUP_STRATEGY); 00020 //ginfo.setSourceGroup(_gid); 00021 //commonInit(_topology, _pipeSize); 00022 } 00023 00024 CmiFragmentHeader *PipeBroadcastStrategy::getFragmentHeader(char *msg) { 00025 return (CmiFragmentHeader*)EnvToUsr((envelope*)msg); 00026 } 00027 00028 void PipeBroadcastStrategy::deliver(char *msg, int dim) { 00029 envelope *env = (envelope*)msg; 00030 ComlibPrintf("[%d] PipeBroadcastStrategy::deliver\n",CkMyPe()); 00031 CkUnpackMessage(&env); 00032 //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY)); 00033 00034 if (getType() == ARRAY_STRATEGY) { 00035 // deliver the message to the predefined group "ainfo" 00036 ainfo.localBroadcast(env); 00037 } 00038 00039 if (getType() == GROUP_STRATEGY) { 00040 // deliver the message to the predifined group "ginfo" 00041 //CkGroupID gid; 00042 //ginfo.getSourceGroup(gid); 00043 CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), env->getGroupNum()); 00044 } 00045 } 00046 00047 /* 00048 void PipeBroadcastStrategy::commonInit(int _topology, int _pipeSize) { 00049 converseStrategy = new PipeBroadcastConverse(_topology, _pipeSize, this); 00050 } 00051 00052 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize) 00053 : CharmStrategy() { 00054 //isArray = 0; 00055 commonInit(_topology, _pipeSize); 00056 } 00057 00058 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){ 00059 messageBuf->enq(cmsg); 00060 doneInserting(); 00061 } 00062 00063 00064 // routine for interfacing with converse. 00065 // Require only the converse reserved header if forceSplit is true 00066 void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize) { 00067 // set the instance ID to be used by the receiver using the XHandler variable 00068 CmiSetXHandler(env, myInstanceID); 00069 ComlibPrintf("[%d] PipeBroadcast charm, setting instid to %d\n",CkMyPe(),myInstanceID); 00070 00071 if (totalSize > ((PipeBroadcastConverse*)converseStrategy)->getPipeSize()) { 00072 ((PipeBroadcastConverse*)converseStrategy)->conversePipeBcast((char*)env, totalSize); 00073 } else { 00074 // the message fit into the pipe, so send it in a single chunk 00075 ComlibPrintf("[%d] Propagating message in one single chunk (%d)\n",CkMyPe(),CsvAccess(pipeBcastPropagateHandle)); 00076 CmiSetHandler(env, CsvAccess(pipeBcastPropagateHandle)); 00077 env->setSrcPe(CkMyPe()); 00078 ((PipeBroadcastConverse*)converseStrategy)->propagate((char*)env, false, CkMyPe(), totalSize, &envelope::setSrcPe); 00079 } 00080 } 00081 00082 void PipeBroadcastStrategy::doneInserting(){ 00083 ComlibPrintf("[%d] DoneInserting\n",CkMyPe()); 00084 while (!messageBuf->isEmpty()) { 00085 CharmMessageHolder *cmsg = messageBuf->deq(); 00086 // modify the Handler to deliver the message to the propagator 00087 envelope *env = UsrToEnv(cmsg->getCharmMessage()); 00088 00089 delete cmsg; 00090 conversePipeBcast(env, env->getTotalsize()); 00091 } 00092 } 00093 */ 00094 00095 void PipeBroadcastStrategy::pup(PUP::er &p){ 00096 ComlibPrintf("[%d] PipeBroadcastStrategy::pup %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing")); 00097 PipeBroadcastConverse::pup(p); 00098 CharmStrategy::pup(p); 00099 00100 /* 00101 if (p.isUnpacking()) { 00102 converseStrategy = new PipeBroadcastConverse(0,0,this); 00103 } 00104 p | *converseStrategy; 00105 00106 if (p.isUnpacking()) { 00107 //propagateHandle = CmiRegisterHandler((CmiHandler)propagate_handler); 00108 00109 ComlibPrintf("[%d] registered handler single to %d\n",CmiMyPe(),CsvAccess(pipeBcastPropagateHandle)); 00110 messageBuf = new CkQ<CharmMessageHolder *>; 00111 converseStrategy->setHigherLevel(this); 00112 } 00113 */ 00114 } 00115 00116 /* 00117 void PipeBroadcastStrategy::beginProcessing(int x){ 00118 CsvAccess(pipeBcastPropagateHandle) = CkRegisterHandler((CmiHandler)propagate_handler); 00119 } 00120 */ 00121
1.5.5