00001 #ifndef MESSAGE_PACKER_H
00002 #define MESSAGE_PACKER_H
00003
00014 #include "charm++.h"
00015 #include "envelope.h"
00016 #include "ComlibManager.h"
00017 #include "register.h"
00018 #include "pup_cmialloc.h"
00019
00020 #define MAX_MESSAGE_SIZE 32768
00021
00022 class short_envelope {
00023 public:
00024 UShort epIdx;
00025 UShort size;
00026
00027 CkArrayIndex idx;
00028 char *data;
00029
00030 short_envelope();
00031 ~short_envelope();
00032 inline short_envelope(CkMigrateMessage *){}
00033
00034 void pup(PUP::er &p);
00035 };
00036
00037 inline short_envelope::short_envelope(){
00038 epIdx = 0;
00039 data = NULL;
00040 }
00041
00042 inline short_envelope::~short_envelope(){
00043
00044
00045
00046
00047
00048 }
00049
00050 inline void short_envelope::pup(PUP::er &p){
00051 char nints = 0;
00052
00053 p | epIdx;
00054
00055
00056
00057
00058 if(!p.isUnpacking())
00059 nints = idx.nInts;
00060
00061 p | nints;
00062 idx.nInts = nints;
00063 p((int *)(idx.data()), idx.nInts);
00064
00065 if(p.isUnpacking()) {
00066 p.pupCmiAllocBuf((void **)&data);
00067 size = SIZEFIELD(data);
00068 }
00069 else
00070 p.pupCmiAllocBuf((void **)&data, size);
00071 }
00072
00073 struct CombinedMessage{
00074 char header[CmiReservedHeaderSize];
00075 CkArrayID aid;
00076 unsigned short srcPE;
00077 unsigned short nmsgs;
00078 };
00079
00080 PUPbytes(CombinedMessage)
00081
00082 class MsgPacker {
00083 CkArrayID aid;
00084 short_envelope * msgList;
00085 int nShortMsgs;
00086
00087 public:
00088 MsgPacker();
00089 ~MsgPacker();
00090
00091
00092 MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
00093
00094
00095
00096 MsgPacker(CkQ<char *> &msgq, int n_msgs);
00097
00098 void getMessage(CombinedMessage *&msg, int &size);
00099 static void deliver(CombinedMessage *cmb_msg);
00100 };
00101
00102 inline void MsgPacker::deliver(CombinedMessage *cmb_msg){
00103
00104 CombinedMessage cmb_hdr;
00105
00106 PUP_fromCmiAllocMem fp(cmb_msg);
00107 fp | cmb_hdr;
00108
00109 int nmsgs = cmb_hdr.nmsgs;
00110
00111 ComlibPrintf("In MsgPacker::deliver\n");
00112 CkArrayID aid = cmb_hdr.aid;
00113 int src_pe = cmb_hdr.srcPE;
00114 CkArray *a=(CkArray *)_localBranch(aid);
00115
00116 ArrayElement *a_elem=NULL, *prev_elem=NULL;
00117 CkArrayIndex prev_idx;
00118 prev_idx.nInts = -1;
00119
00120 for(int count = 0; count < nmsgs; count ++){
00121 short_envelope senv;
00122 fp | senv;
00123
00124 int ep = senv.epIdx;
00125 int size = senv.size;
00126
00127 if(senv.idx == prev_idx) {
00128 a_elem = prev_elem;
00129 }
00130 else {
00131 CProxyElement_ArrayBase ap(aid, senv.idx);
00132 a_elem = ap.ckLocal();
00133 }
00134
00135 int msgIdx = _entryTable[ep]->msgIdx;
00136 if(_entryTable[ep]->noKeep && a_elem != NULL) {
00137
00138 senv.data = (char *)_msgTable[msgIdx]->unpack(senv.data);
00139 CkDeliverMessageReadonly(ep, senv.data, a_elem);
00140
00141 prev_elem = a_elem;
00142 prev_idx = senv.idx;
00143 CmiFree(senv.data);
00144 }
00145 else {
00146
00147 envelope *env = _allocEnv(ForArrayEltMsg,
00148 sizeof(envelope) + size);
00149
00150 void *data = EnvToUsr(env);
00151 memcpy(data, senv.data, size);
00152
00153
00154 data = (char *)_msgTable[msgIdx]->unpack(data);
00155
00156 env->getsetArrayMgr() = aid;
00157 env->getsetArrayIndex() = senv.idx;
00158 env->getsetArrayEp() = ep;
00159 env->setPacked(0);
00160 env->getsetArraySrcPe()=src_pe;
00161 env->getsetArrayHops()=1;
00162 env->setQueueing(CK_QUEUEING_FIFO);
00163 env->setUsed(0);
00164 env->setMsgIdx(msgIdx);
00165
00166 env->setTotalsize(sizeof(envelope) + size);
00167
00168
00169
00170
00171
00172
00173 a->deliver((CkArrayMessage *)data, CkDeliver_queue);
00174
00175 prev_elem = a_elem;
00176 prev_idx = senv.idx;
00177 CmiFree(senv.data);
00178 }
00179 }
00180
00181 CmiFree(cmb_msg);
00182 }
00183
00185 #endif
00186