
00001 // #ifdef filippo 00002 00003 // #include <math.h> 00004 // #include "pipelinestrategy.h" 00005 00006 // inline int log_of_2 (int i) { 00007 // int m; 00008 // for (m=0; i>(1<<m); ++m); 00009 // return m; 00010 // } 00011 00012 // //PipelineHashKey CODE 00013 // int PipelineHashKey::staticCompare(const void *k1,const void *k2,size_t ){ 00014 // return ((const PipelineHashKey *)k1)-> 00015 // compare(*(const PipelineHashKey *)k2); 00016 // } 00017 00018 // CkHashCode PipelineHashKey::staticHash(const void *v,size_t){ 00019 // return ((const PipelineHashKey *)v)->hash(); 00020 // } 00021 00022 // void PipelineStrategy::commonInit(){ 00023 // //log_of_2_inv = 1/log((double)2); 00024 // seqNumber = 0; 00025 // } 00026 00027 // //extern void propagate_handler(void *); 00028 00029 // void deliver_handler(void *message) { 00030 // int instid = CmiGetXHandler(message); 00031 // PipelineStrategy *myStrategy = (PipelineStrategy*)ConvComlibGetStrategy(instid); 00032 // ComlibPrintf("[%d] propagate_handler_frag: calling on instid %d %x\n",CkMyPe(),instid,myStrategy); 00033 // //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid); 00034 // PipelineInfo *info = (PipelineInfo*)(((char*)message)+CmiReservedHeaderSize); 00035 // myStrategy->storing((char*)message); 00036 // } 00037 00038 // void PipelineStrategy::storing(char* fragment) { 00039 // char *complete; 00040 // int isFinished=0; 00041 // int totalDimension; 00042 // //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY)); 00043 00044 // // store the fragment in the hash table until completed 00045 // ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe()); 00046 // PipelineInfo *info = (PipelineInfo*)(fragment+CmiReservedHeaderSize); 00047 00048 // PipelineHashKey key (info->bcastPe, info->seqNumber); 00049 // PipelineHashObj *position = fragments.get(key); 00050 00051 // char *incomingMsg; 00052 // if (position) { 00053 // // the message already exist, add to it 00054 // ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1); 00055 // incomingMsg = position->message; 00056 // memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize); 00057 00058 // if (--position->remaining == 0) { // message completely received 00059 // isFinished = 1; 00060 // complete = incomingMsg; 00061 // totalDimension = position->dimension; 00062 // // delete from the hash table 00063 // fragments.remove(key); 00064 // } 00065 00066 // } else { 00067 // // the message doesn't exist, create it 00068 // ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize); 00069 // incomingMsg = (char*)CmiAlloc(info->messageSize); 00070 // memcpy (incomingMsg, fragment, CmiReservedHeaderSize); 00071 // memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize); 00072 // int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo)))-1; 00073 // if (remaining) { // more than one chunk (it was not forced to be splitted) 00074 // PipelineHashObj *object = new PipelineHashObj(info->messageSize, remaining, incomingMsg); 00075 // fragments.put(key) = object; 00076 // } else { // only one chunk, it was forces to be splitted 00077 // isFinished = 1; 00078 // complete = incomingMsg; 00079 // // nothing to delete from fragments since nothing has been added 00080 // } 00081 // } 00082 // CmiFree(fragment); 00083 00084 // if (isFinished) { 00085 // higherLevel->deliverer(complete, totalDimension); 00086 // } 00087 // } 00088 00089 // void PipelineStrategy::deliverer(char *msg, int dimension) { 00090 // ComlibPrintf("{%d} dest = %d, %d, %x\n",CkMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr); 00091 // if (destinationHandler) { 00092 // CmiSetHandler(msg, destinationHandler); 00093 // CmiSyncSendAndFree(CkMyPe(), dimension, msg); 00094 // } else { 00095 // CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!"); 00096 // } 00097 // } 00098 00099 // PipelineStrategy::PipelineStrategy(int _pipeSize, Strategy *parent) : Strategy(), pipeSize(_pipeSize) { 00100 // if (parent) higherLevel = parent; 00101 // else higherLevel = this; 00102 // seqNumber = 0; 00103 // messageBuf = new CkQ<MessageHolder *>; 00104 // //if (!parent) propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag); 00105 // ComlibPrintf("init: %d (%x)\n",pipeSize,this); 00106 // //if (!parent) ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),propagateHandle_frag); 00107 // } 00108 00109 // void PipelineStrategy::insertMessage(MessageHolder *cmsg){ 00110 // ComlibPrintf("[%d] Pipelined Broadcast with converse strategy\n",CkMyPe()); 00111 // messageBuf->enq(cmsg); 00112 // doneInserting(); 00113 // } 00114 00115 // void PipelineStrategy::doneInserting(){ 00116 // ComlibPrintf("[%d] DoneInserting\n",CkMyPe()); 00117 // while (!messageBuf->isEmpty()) { 00118 // MessageHolder *cmsg = messageBuf->deq(); 00119 // // modify the Handler to deliver the message to the propagator 00120 // char *env = cmsg->getMessage(); 00121 // //CmiSetHandler(env, deliverHandle); 00122 // conversePipeline(env, cmsg->getSize(), cmsg->dest_proc); 00123 // delete cmsg; 00124 // //conversePipeline(env, env->getTotalsize(), false); 00125 // } 00126 // } 00127 00128 // // routine for interfacing with converse. 00129 // // Require only the converse reserved header if forceSplit is true 00130 // void PipelineStrategy::conversePipeline(char *env, int totalSize, int destination) { 00131 // // set the instance ID to be used by the receiver using the XHandler variable 00132 // CmiSetXHandler(env, myInstanceID); 00133 00134 // ++seqNumber; 00135 // // message doesn't fit into the pipe: split it into chunks and propagate them individually 00136 // ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),totalSize); 00137 00138 // char *sendingMsg; 00139 // char *nextChunk = env;//+CmiReservedHeaderSize; 00140 // int remaining = totalSize;//-CmiReservedHeaderSize; 00141 // int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo); 00142 // int sendingMsgSize; 00143 // ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipelineInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipelineInfo)); 00144 // ComlibPrintf("sending %d chunks of size %d, total=%d to handle %d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining,deliverHandle); 00145 // //CmiSetHandler(env, deliverHandle); 00146 // ComlibPrintf("setting env handler to %d\n",deliverHandle); 00147 // for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) { 00148 // sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipelineInfo); 00149 // sendingMsg = (char*)CmiAlloc(sendingMsgSize); 00150 // //memcpy (sendingMsg, env, CmiReservedHeaderSize); 00151 // CmiSetHandler(sendingMsg, deliverHandle); 00152 // PipelineInfo *info = (PipelineInfo*)(sendingMsg+CmiReservedHeaderSize); 00153 // info->srcPe = CkMyPe(); 00154 // info->bcastPe = CkMyPe(); 00155 // info->seqNumber = seqNumber; 00156 // info->chunkNumber = i; 00157 // info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining; 00158 // info->messageSize = totalSize; 00159 // memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipelineInfo), nextChunk, info->chunkSize); 00160 00161 // remaining -= info->chunkSize; 00162 // nextChunk += info->chunkSize; 00163 00164 // //propagate(sendingMsg, true, CkMyPe(), sendingMsgSize, NULL); 00165 // CmiSyncSendAndFree(destination, sendingMsgSize, sendingMsg); 00166 // } 00167 // CmiFree(env); 00168 // } 00169 00170 // void PipelineStrategy::pup(PUP::er &p){ 00171 // Strategy::pup(p); 00172 // ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing")); 00173 00174 // p | pipeSize; 00175 // p | seqNumber; 00176 00177 // ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize); 00178 00179 // if (p.isUnpacking()) { 00180 // //log_of_2_inv = 1/log((double)2); 00181 // messageBuf = new CkQ<MessageHolder *>; 00182 // deliverHandle = CmiRegisterHandler((CmiHandler)deliver_handler); 00183 // //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag); 00184 // //ComlibPrintf("[%d] registered handler to %d\n",CkMyPe(),deliverHandle); 00185 // } 00186 // if (p.isPacking()) { 00187 // delete messageBuf; 00188 // } 00189 // //p|(*messageBuf); 00190 // //p|fragments; 00191 00192 // } 00193 00194 // PUPable_def(PipelineStrategy); 00195 00196 // #endif
1.5.5