00001 #include <stdio.h>
00002 #include <string.h>
00003 #include <stdlib.h>
00004
00005 #include "charm++.h"
00006
00007 #include "receiver.h"
00008
00009
00010 receiver::receiver()
00011 {
00012 msgTbl = CmmNew();
00013 reqTbl = CmmNew();
00014
00015 callback = NULL;
00016 counter = -1;
00017 startwaiting = 0;
00018 }
00019
00020 receiver::receiver(CkMigrateMessage *m)
00021 {
00022 msgTbl = CmmNew();
00023 reqTbl = CmmNew();
00024
00025 callback = NULL;
00026 }
00027
00028 void receiver::pup(PUP::er &p)
00029 {
00030 p(counter);
00031 p(startwaiting);
00032
00033 pupCmmTable(msgTbl, p);
00034 pupCmmTable(reqTbl, p);
00035 }
00036
00037 receiver::~receiver()
00038 {
00039 CmmFree(msgTbl);
00040 CmmFree(reqTbl);
00041 }
00042
00043 #define MYMIN(a,b) (a)<(b)?(a):(b)
00044
00045
00046 void receiver::sendTo(receiverMsg *msg, int tag, char *pointer, int size, int from, int refno)
00047 {
00048 int tags[3], ret_tags[3];
00049
00050
00051 tags[0] = tag; tags[1] = from; tags[2] = refno;
00052 tblEntry *req = (tblEntry *)CmmGet(reqTbl, 3, tags, ret_tags);
00053
00054 if (req) {
00055
00056 memcpy(req->buf, pointer, MYMIN(size, req->size));
00057 delete msg;
00058 delete req;
00059
00060 recvAlready();
00061 }
00062 else {
00063
00064 tags[0] = tag; tags[1] = from; tags[2] = refno;
00065 req = new tblEntry;
00066 req->msg = msg;
00067 req->size = size;
00068 CmmPut(msgTbl, 3, tags, req);
00069 }
00070
00071 }
00072
00073 void receiver::generic(receiverMsg *msg)
00074 {
00075 sendTo(msg, msg->tag, msg->buf, msg->size, msg->sendFrom, msg->refno);
00076 }
00077
00078 void receiver::syncSend(receiverMsg *msg)
00079 {
00080 sendTo(msg, msg->tag, msg->buf, msg->size, msg->sendFrom, msg->refno);
00081 }
00082
00083 extern "C" int typesize(int type, int count)
00084 {
00085 switch(type) {
00086 case CMPI_DOUBLE_PRECISION : return count*sizeof(double);
00087 case CMPI_INTEGER : return count*sizeof(int);
00088 case CMPI_REAL : return count*sizeof(float);
00089 case CMPI_COMPLEX: return 2*count*sizeof(double);
00090 case CMPI_LOGICAL: return 2*count*sizeof(int);
00091 case CMPI_CHARACTER:
00092 case CMPI_BYTE:
00093 case CMPI_PACKED:
00094 default:
00095 return count;
00096 }
00097 }
00098
00099 void receiver::isend(void *buf, int count, int datatype, int dest, int tag, int refno)
00100 {
00101 int size = typesize(datatype, count);
00102 receiverMsg * d = new (size, 0) receiverMsg;
00103 d->tag = tag;
00104 d->sendFrom = thisIndex;
00105 d->refno = refno;
00106 d->size = size;
00107 memcpy(d->buf, buf, size);
00108 CProxy_receiver B(thisArrayID);
00109 B[dest].generic(d);
00110 }
00111
00112 void receiver::irecv(void *buf, int count, int datatype, int source, int tag, int refno)
00113 {
00114 int tags[3], ret_tags[3];
00115 int size = typesize(datatype, count);
00116
00117 tags[0] = tag; tags[1] = source; tags[2] = refno;
00118 tblEntry *req = (tblEntry *)CmmGet(msgTbl, 3, tags, ret_tags);
00119
00120 if (req) {
00121
00122 memcpy(buf, req->msg->buf, MYMIN(size, req->size));
00123 delete req->msg;
00124 delete req;
00125 }
00126 else {
00127
00128
00129 tags[0] = tag; tags[1] = source; tags[2] = refno;
00130 req = new tblEntry;
00131 req->buf = (char *)buf;
00132 req->size = size;
00133 CmmPut(reqTbl, 3, tags, req);
00134 }
00135 }
00136
00137 int receiver::iAlltoAll(void *sendbuf, int sendcount, int sendtype,
00138 void *recvbuf, int recvcount, int recvtype, int refno)
00139 {
00140 int nPe = ckGetArraySize();
00141 int tag = 65535;
00142 int i;
00143 for (i=0; i<nPe; i++)
00144 isend(((char *)sendbuf)+i*typesize(sendtype, sendcount), sendcount, sendtype, i, tag, refno);
00145 for (i=0; i<nPe; i++)
00146 irecv(((char *)recvbuf)+i*typesize(recvtype, recvcount), recvcount, recvtype, i, tag, refno);
00147 return 0;
00148 }
00149
00150 int receiver::iAlltoAllv(void *sendbuf, int *sendcount, int *sdispls, int sendtype, void *recvbuf, int *recvcount, int *rdispls, int recvtype, int refno)
00151 {
00152 int nPe = ckGetArraySize();
00153 int tag = 65535;
00154 int i;
00155 for (i=0; i<nPe; i++)
00156 isend(((char *)sendbuf)+sdispls[i]*typesize(sendtype, 1), sendcount[i], sendtype, i, tag, refno);
00157 for (i=0; i<nPe; i++)
00158 irecv(((char *)recvbuf)+rdispls[i]*typesize(recvtype, 1), recvcount[i], recvtype, i, tag, refno);
00159 return 0;
00160 }
00161
00162 void receiver::iwaitAll(recvCallBack f, void *data, int ref)
00163 {
00164 if (callback != NULL)
00165 {
00166 CkPrintf("iwaitAll wrong!\n");
00167 CkExit();
00168 }
00169 callback = f;
00170 this->cb_data = data;
00171 counter = ref;
00172
00173 startwaiting = 1;
00174
00175 recvAlready();
00176 }
00177
00178 void receiver::iwaitAll(int ref)
00179 {
00180 callback = NULL;
00181 this->cb_data = NULL;
00182 counter = ref;
00183 startwaiting = 1;
00184
00185 recvAlready();
00186 }
00187
00188 void receiver::recvAlready()
00189 {
00190
00191 if (!startwaiting) return;
00192
00193 int tags[3], ret_tags[3];
00194 tags[0] = CmmWildCard; tags[1] = CmmWildCard; tags[2] = counter;
00195 tblEntry *req1 = (tblEntry *)CmmProbe(reqTbl, 3, tags, ret_tags);
00196 tblEntry *req2 = (tblEntry *)CmmProbe(msgTbl, 3, tags, ret_tags);
00197 if (req1 == NULL && req2 == NULL && startwaiting)
00198 {
00199 startwaiting = 0;
00200 CProxy_receiver B(thisArrayID);
00201 B[thisIndex].ready2go();
00202
00203 }
00204 }
00205
00206 void receiver::ready2go()
00207 {
00208
00209
00210
00211
00212
00213
00214
00215 if (callback) {
00216 recvCallBack tmpfn = callback;
00217 callback = NULL;
00218 tmpfn(cb_data);
00219 }
00220 else {
00221
00222 resumeFromWait();
00223 }
00224 }
00225
00226 void receiver::resumeFromWait()
00227 {
00228 CkPrintf("Please write your own resumeFromWait\n");
00229 }
00230
00231 void receiver::pupCmmTable(CmmTable &t, PUP::er &p)
00232 {
00233 tblEntry *msg;
00234 int tags[3], rtags[3];
00235 int num = CmmEntries(t);
00236 p(num);
00237 if (p.isPacking()) {
00238 tags[0] = tags[1] = tags[2] = CmmWildCard;
00239 tblEntry *msg = (tblEntry *)CmmProbe(t, 3, tags, rtags);
00240 while (msg) {
00241 p(rtags, 3);
00242 p(msg->size);
00243 p(msg->buf, msg->size);
00244 msg = (tblEntry *)CmmProbe(t, 3, tags, rtags);
00245 }
00246 }
00247 else {
00248 int s;
00249
00250 for (int i=0; i<num; i++) {
00251 p(rtags, 3);
00252 msg = new tblEntry;
00253 p(msg->size);
00254 msg->buf = new char[msg->size];
00255 p(msg->buf, msg->size);
00256 }
00257 }
00258 }
00259
00260 #include "receiver.def.h"
00261
00262