00001
00007 #include "PrioStreaming.h"
00008
00009
00010 PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio,
00011 int msgSizeMax_, int bufSizeMax_)
00012 : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), CharmStrategy(), basePriority(prio)
00013 {
00014 }
00015
00016 void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
00017
00018 ComlibPrintf("Prio Straming: InsertMessage %d, %d\n",
00019 PERIOD, bufferMax);
00020
00021 int pe=cmsg->dest_proc;
00022 char* msg = cmsg->getCharmMessage();
00023 envelope *env = UsrToEnv(msg);
00024 int msg_prio = *(int*)env->getPrioPtr();
00025
00026 if(streamingMsgCount[pe] == 0)
00027 minPrioVec[pe] = msg_prio;
00028 else if(minPrioVec[pe] > msg_prio)
00029 minPrioVec[pe] = msg_prio;
00030
00031 streamingMsgBuf[pe].enq(cmsg);
00032 streamingMsgCount[pe]++;
00033 bufSize[pe]+=cmsg->getSize();
00034
00035 if(msg_prio <= basePriority)
00036 flushPE(pe);
00037
00038 if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax)
00039 flushPE(pe);
00040 }
00041
00042 void PrioStreaming::pup(PUP::er &p){
00043
00044 StreamingStrategy::pup(p);
00045 CharmStrategy::pup(p);
00046 p | basePriority;
00047
00048 if(p.isPacking() || p.isUnpacking())
00049 minPrioVec.resize(CkNumPes());
00050 }
00051