
00001 00007 #ifndef PRIO_STREAMING 00008 #define PRIO_STREAMING 00009 00010 #include "ComlibManager.h" 00011 #include "StreamingStrategy.h" 00012 00029 class PrioStreaming : public StreamingStrategy, public CharmStrategy { 00030 protected: 00031 int basePriority; 00032 CkVec<int> minPrioVec; 00033 00034 public: 00050 PrioStreaming(int periodMs=DEFAULT_TIMEOUT, 00051 int bufferMax=MAX_NUM_STREAMING_MESSAGES, 00052 int prio=0, 00053 int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE, 00054 int bufSizeMAX=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES); 00055 PrioStreaming(CkMigrateMessage *m) : StreamingStrategy(m), CharmStrategy(m) {} 00056 00057 void insertMessage(MessageHolder *msg) {insertMessage((CharmMessageHolder*)msg);} 00058 virtual void insertMessage(CharmMessageHolder *msg); 00059 00060 //If new priority is greater than current priority, 00061 //then flush all queues which have relatively high priority messages 00062 inline void setBasePriority(int p) { 00063 if(p > basePriority) { 00064 for(int count =0; count < CkNumPes(); count++) 00065 if(minPrioVec[count] <= p) 00066 flushPE(count); 00067 } 00068 basePriority = p; 00069 } 00070 00071 virtual void pup(PUP::er &p); 00072 PUPable_decl(PrioStreaming); 00073 }; 00074 #endif 00075
1.5.5