00001
00007 #include "StreamingStrategy.h"
00008
00009 #include "pup_cmialloc.h"
00010
00012 CpvDeclare(int, streaming_handler_id);
00017 void StreamingHandlerFn(void *msg) {
00018 StreamingMessage hdr;
00019
00020 ComlibPrintf("[%d] In streaming handler fn\n",CmiMyPe());
00021
00022 PUP_fromCmiAllocMem fp(msg);
00023 fp | hdr;
00024
00025 for(int count = 0; count < hdr.nmsgs; count ++) {
00026 char *msg;
00027 fp.pupCmiAllocBuf((void **)&msg);
00028 int size = SIZEFIELD(msg);
00029 CmiSyncSendAndFree(CmiMyPe(), size, msg);
00030 }
00031 CmiFree(msg);
00032 return;
00033 }
00034
00035 StreamingStrategy::StreamingStrategy(int periodMs, int bufferMax_,
00036 int msgSizeMax_, int bufSizeMax_)
00037 : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_),
00038 bufSizeMax(bufSizeMax_), Strategy() {
00039 streamingMsgBuf = NULL;
00040 streamingMsgCount = NULL;
00041 bufSize = NULL;
00042
00043 idleFlush = CmiTrue;
00044
00045 setType(CONVERSE_STRATEGY);
00046 }
00047
00048 StreamingStrategy::StreamingStrategy(double periodMs, int bufferMax_,
00049 int msgSizeMax_, int bufSizeMax_)
00050 : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_),
00051 bufSizeMax(bufSizeMax_), Strategy() {
00052 streamingMsgBuf = NULL;
00053 streamingMsgCount = NULL;
00054 bufSize = NULL;
00055
00056 idleFlush = CmiTrue;
00057
00058 setType(CONVERSE_STRATEGY);
00059 }
00060
00061 void StreamingStrategy::insertMessage(MessageHolder *cmsg) {
00062
00063 int pe=cmsg->dest_proc;
00064 char *msg = cmsg->getMessage();
00065
00066 int size = cmsg->getSize();
00067
00068 if(size > msgSizeMax) {
00069 ComlibPrintf("[%d] StreamingStrategy::insertMessage: to %d direct send %d\n",CmiMyPe(),pe,size);
00070 CmiSyncSendAndFree(pe, size, msg);
00071 delete cmsg;
00072 return;
00073 }
00074
00075 ComlibPrintf("[%d] StreamingStrategy::insertMessage: buffering t=%g, n=%d, d=%d, s=%d\n",
00076 CmiMyPe(), PERIOD, bufferMax, pe, size);
00077
00078 streamingMsgBuf[pe].enq(cmsg);
00079 streamingMsgCount[pe]++;
00080 bufSize[pe]+=size;
00081 if (streamingMsgCount[pe] >= bufferMax || bufSize[pe] >= bufSizeMax) flushPE(pe);
00082 }
00083
00084 void StreamingStrategy::doneInserting() {
00085 ComlibPrintf("[%d] StreamingStrategy::doneInserting\n", CmiMyPe());
00086
00087
00088 periodicFlush();
00089 }
00090
00092 void StreamingStrategy::flushPE(int pe) {
00093
00094
00095
00096 if(streamingMsgCount[pe] == 0)
00097 return;
00098
00099 MessageHolder *cmsg;
00100 int size = 0;
00101
00102
00103
00104 int msg_count=streamingMsgCount[pe];
00105
00106
00107 if(msg_count == 1) {
00108 cmsg = streamingMsgBuf[pe].deq();
00109 char *msg = cmsg->getMessage();
00110
00111 int size = cmsg->getSize();
00112 CmiSyncSendAndFree(pe, size, msg);
00113 ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n",
00114 CmiMyPe(), pe);
00115 delete cmsg;
00116 streamingMsgCount[pe] = 0;
00117 bufSize[pe] = 0;
00118 return;
00119 }
00120
00121
00122 PUP_cmiAllocSizer sp;
00123 StreamingMessage hdr;
00124
00125 sp | hdr;
00126
00127 int nmsgs = streamingMsgCount[pe];
00128 int count;
00129 for(count = 0; count < nmsgs; count++) {
00130 cmsg = streamingMsgBuf[pe][count];
00131 char *msg = cmsg->getMessage();
00132
00133 size = cmsg->getSize();
00134
00135 sp.pupCmiAllocBuf((void**)&msg, size);
00136 }
00137
00138 char *newmsg = (char *)CmiAlloc(sp.size());
00139 PUP_toCmiAllocMem mp(newmsg);
00140
00141 hdr.srcPE = CmiMyPe();
00142 hdr.nmsgs = nmsgs;
00143 mp | hdr;
00144
00145 for(count = 0; count < nmsgs; count++) {
00146 cmsg = streamingMsgBuf[pe][count];
00147 char *msg = cmsg->getMessage();
00148
00149 size = cmsg->getSize();
00150
00151 mp.pupCmiAllocBuf((void**)&msg, size);
00152 }
00153
00154 for(count = 0; count < nmsgs; count++) {
00155 cmsg = streamingMsgBuf[pe].deq();
00156
00157 CmiFree(cmsg->getMessage());
00158 delete cmsg;
00159 }
00160
00161 streamingMsgCount[pe] = 0;
00162 bufSize[pe] = 0;
00163 CmiSetHandler(newmsg, CpvAccess(streaming_handler_id));
00164 CmiSyncSendAndFree(pe, sp.size(), newmsg);
00165
00166 }
00167
00168 void StreamingStrategy::periodicFlush() {
00169 for (int proc = 0; proc < CmiNumPes(); proc++)
00170 flushPE(proc);
00171 }
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00193 static void call_delayFlush(void *arg,double curWallTime) {
00194 StreamingStrategy *s=(StreamingStrategy *)arg;
00195 s->periodicFlush();
00196 s->registerFlush();
00197 }
00198
00199 void StreamingStrategy::registerFlush(void) {
00200
00201 CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CmiMyPe());
00202 }
00203
00205 static void call_idleFlush(void *arg,double curWallTime) {
00206 StreamingStrategy *s=(StreamingStrategy *)arg;
00207 s->periodicFlush();
00208 }
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232 void StreamingStrategy::pup(PUP::er &p){
00233
00234 Strategy::pup(p);
00235 p | PERIOD;
00236 p | bufferMax;
00237 p | msgSizeMax;
00238
00239 p | bufSizeMax;
00240 p | idleFlush;
00241
00242
00243 if(p.isPacking() || p.isUnpacking()) {
00244 streamingMsgBuf = new CkQ<MessageHolder *>[CmiNumPes()];
00245 streamingMsgCount = new int[CmiNumPes()];
00246 bufSize = new int[CmiNumPes()];
00247 for(int count = 0; count < CmiNumPes(); count ++) {
00248 streamingMsgCount[count] = 0;
00249 bufSize[count] = 0;
00250 }
00251 }
00252
00253
00254 if (p.isPacking() || p.isUnpacking()) registerFlush();
00255 }
00256
00257 PUPable_def(StreamingStrategy)
00258
00259