00001
00007 #ifndef STREAMING_STRATEGY
00008 #define STREAMING_STRATEGY
00009
00010 #include "convcomlibstrategy.h"
00011
00012 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
00013 #define MAX_NUM_STREAMING_MESSAGES 1000
00014 #define DEFAULT_TIMEOUT 10
00015
00016 CpvExtern(int, streaming_handler_id);
00017 extern void StreamingHandlerFn(void *msg);
00018
00023 struct StreamingMessage {
00024 char header[CmiReservedHeaderSize];
00025 CmiUInt4 srcPE;
00026 CmiUInt4 nmsgs;
00027 };
00028
00029 PUPbytes(StreamingMessage)
00030
00031
00047 class StreamingStrategy : public Strategy {
00048 protected:
00049 CkQ<MessageHolder *> *streamingMsgBuf;
00050 int *streamingMsgCount;
00051 int *bufSize;
00052 int bufferMax;
00053 int msgSizeMax;
00054 int bufSizeMax;
00055 double PERIOD;
00056
00057 CmiBool idleFlush;
00058
00059
00060
00062 void flushPE(int destPE);
00063
00064 public:
00075 StreamingStrategy(int periodMs=DEFAULT_TIMEOUT,
00076 int bufferMax=MAX_NUM_STREAMING_MESSAGES,
00077 int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE,
00078 int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
00079 StreamingStrategy(double periodMs=DEFAULT_TIMEOUT,
00080 int bufferMax=MAX_NUM_STREAMING_MESSAGES,
00081 int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE,
00082 int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
00083
00084 StreamingStrategy(CkMigrateMessage *m) : Strategy(m) {}
00085
00086 virtual void insertMessage(MessageHolder *msg);
00087 virtual void doneInserting();
00088
00089 virtual void handleMessage(void *msg) {
00090 CmiAbort("[%d] StreamingStrategy::handleMessage should never be called\n");
00091 }
00092
00093
00094
00095 virtual void pup(PUP::er &p);
00096
00097
00098
00099
00100 virtual void disableIdleFlush() { idleFlush = CmiFalse;}
00101
00103 void registerFlush(void);
00105 void periodicFlush();
00106
00107 PUPable_decl(StreamingStrategy);
00108 };
00109 #endif
00110