00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <errno.h>
00004 #include <string.h>
00005
00006 #include "converse.h"
00007 #include "conv-ccs.h"
00008 #include "ccs-server.h"
00009 #include "sockRoutines.h"
00010 #include "queueing.h"
00011
00012 #ifdef _WIN32
00013 # include <io.h>
00014 # define read _read
00015 #endif
00016
00017 void CpdEndConditionalDeliver_master(void);
00018
00019 #if CMK_CCS_AVAILABLE
00020
00021 int CcsReply(CcsImplHeader *rep,int repLen,const void *repData);
00022
00023
00024
00025
00026
00027
00028
00029 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
00030 if (strlen(name)>=CCS_MAXHANDLER)
00031 CmiAbort("CCS handler names cannot exceed 32 characters");
00032 c->name=strdup(name);
00033 c->fn=NULL;
00034 c->fnOld=NULL;
00035 c->userPtr=NULL;
00036 c->mergeFn=NULL;
00037 c->nCalls=0;
00038 }
00039
00040 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
00041 c->nCalls++;
00042 if (c->fnOld)
00043 {
00044
00045
00046
00047 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
00048 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
00049 (c->fnOld)(cmsg);
00050 }
00051 else {
00052 (c->fn)(c->userPtr, reqLen, reqData);
00053 }
00054 }
00055
00056
00057 CpvDeclare(CcsHandlerTable, ccsTab);
00058
00059 CpvStaticDeclare(CcsImplHeader*,ccsReq);
00060
00061 void CcsRegisterHandler(const char *name, CmiHandler fn) {
00062 CcsHandlerRec cp;
00063 initHandlerRec(&cp,name);
00064 cp.fnOld=fn;
00065 *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
00066 }
00067 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
00068 CcsHandlerRec cp;
00069 initHandlerRec(&cp,name);
00070 cp.fn=fn;
00071 cp.userPtr=ptr;
00072 *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
00073 }
00074 CcsHandlerRec *CcsGetHandler(const char *name) {
00075 return (CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
00076 }
00077 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
00078 CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
00079 if (rec==NULL) {
00080 CmiAbort("CCS: Unknown CCS handler name.\n");
00081 }
00082 rec->mergeFn=newMerge;
00083 rec->redID=CmiGetGlobalReduction();
00084 }
00085
00086 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
00087 CcsImplHeader *hdr;
00088 int total = *size;
00089 void *reply;
00090 char *ptr;
00091 int i;
00092 for (i=0; i<n; ++i) {
00093 hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
00094 total += ChMessageInt(hdr->len);
00095 }
00096 reply = CmiAlloc(total);
00097 memcpy(reply, local, *size);
00098 ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
00099 CmiFree(local);
00100 ptr = ((char*)reply)+*size;
00101 for (i=0; i<n; ++i) {
00102 int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
00103 memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
00104 ptr += len;
00105 }
00106 *size = total;
00107 return reply;
00108 }
00109
00110 #define SIMPLE_REDUCTION(name, dataType, loop) \
00111 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
00112 int i, m; \
00113 CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
00114 int lenLocal = ChMessageInt(hdrLocal->len); \
00115 int nElem = lenLocal / sizeof(dataType); \
00116 dataType *ret = (dataType *) (hdrLocal+1); \
00117 CcsImplHeader *hdr; \
00118 for (m=0; m<n; ++m) { \
00119 int len; \
00120 dataType *value; \
00121 hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
00122 len = ChMessageInt(hdr->len); \
00123 value = (dataType *)(hdr+1); \
00124 CmiAssert(lenLocal == len); \
00125 for (i=0; i<nElem; ++i) loop; \
00126 } \
00127 return local; \
00128 }
00129
00130 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
00131 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
00132 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
00133 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
00134
00135
00136 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
00137 SIMPLE_REDUCTION(nameBase##_int, int, loop) \
00138 SIMPLE_REDUCTION(nameBase##_float, float, loop) \
00139 SIMPLE_REDUCTION(nameBase##_double, double, loop)
00140
00141 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
00142 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
00143 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
00144 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
00145
00146 #undef SIMPLE_REDUCTION
00147 #undef SIMPLE_POLYMORPH_REDUCTION
00148
00149 int CcsEnabled(void)
00150 {
00151 return 1;
00152 }
00153
00154 int CcsIsRemoteRequest(void)
00155 {
00156 return CpvAccess(ccsReq)!=NULL;
00157 }
00158
00159 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
00160 {
00161 *pip = CpvAccess(ccsReq)->attr.ip;
00162 *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
00163 }
00164
00165 extern int rep_fw_handler_idx;
00166 int rep_fw_handler_idx;
00167
00168 CcsDelayedReply CcsDelayReply(void)
00169 {
00170 CcsDelayedReply ret;
00171 int len = sizeof(CcsImplHeader);
00172 if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
00173 len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
00174 ret.hdr = (CcsImplHeader*)malloc(len);
00175 memcpy(ret.hdr, CpvAccess(ccsReq), len);
00176 CpvAccess(ccsReq)=NULL;
00177 return ret;
00178 }
00179
00180 void CcsSendReply(int replyLen, const void *replyData)
00181 {
00182 if (CpvAccess(ccsReq)==NULL)
00183 CmiAbort("CcsSendReply: reply already sent!\n");
00184 CpvAccess(ccsReq)->len = ChMessageInt_new(1);
00185 CcsReply(CpvAccess(ccsReq),replyLen,replyData);
00186 CpvAccess(ccsReq) = NULL;
00187 }
00188
00189 void CcsSendReplyNoError(int replyLen, const void *replyData) {
00190 if (CpvAccess(ccsReq)==NULL) return;
00191 CcsSendReply(replyLen, replyData);
00192 }
00193
00194 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
00195 {
00196 CcsImplHeader *h = d.hdr;
00197 h->len=ChMessageInt_new(1);
00198 CcsReply(h,replyLen,replyData);
00199 free(h);
00200 }
00201
00202 void CcsNoReply(void)
00203 {
00204 if (CpvAccess(ccsReq)==NULL) return;
00205 CpvAccess(ccsReq)->len = ChMessageInt_new(0);
00206 CcsReply(CpvAccess(ccsReq),0,NULL);
00207 CpvAccess(ccsReq) = NULL;
00208 }
00209
00210 void CcsNoDelayedReply(CcsDelayedReply d)
00211 {
00212 CcsImplHeader *h = d.hdr;
00213 h->len = ChMessageInt_new(0);
00214 CcsReply(h,0,NULL);
00215 free(h);
00216 }
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
00230 {
00231 char *cmsg;
00232 int reqLen=ChMessageInt(hdr->len);
00233
00234 char *handlerStr=hdr->handler;
00235 CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
00236 if (fn==NULL) {
00237 CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
00238 hdr->handler);
00239 CpvAccess(ccsReq)=hdr;
00240 CcsSendReply(0,NULL);
00241 return;
00242
00243 }
00244
00245
00246 CpvAccess(ccsReq)=hdr;
00247 #if CMK_CHARMDEBUG
00248 if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
00249
00250 int bytes;
00251 if (4==read(conditionalPipe[0], &bytes, 4)) {
00252 char *buf = (char *)malloc(bytes);
00253 if (bytes == read(conditionalPipe[0], buf, bytes))
00254 {
00255 CcsSendReply(bytes,buf);
00256 free(buf);
00257 }
00258 else
00259 {
00260 free(buf);
00261 CpdEndConditionalDeliver_master();
00262 }
00263 } else {
00264
00265 CpdEndConditionalDeliver_master();
00266 }
00267 }
00268 else
00269 #endif
00270 {
00271 callHandlerRec(fn,reqLen,reqData);
00272
00273
00274 if (CpvAccess(ccsReq)!=NULL)
00275 CcsSendReply(0,NULL);
00276 }
00277 }
00278
00279 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
00280
00281
00282
00283 static char **bufferedMessages = NULL;
00284 static int CcsNumBufferedMsgs = 0;
00285 #define CCS_MAX_NUM_BUFFERED_MSGS 100
00286
00287 void CcsBufferMessage(char *msg) {
00288 CmiPrintf("Buffering CCS message\n");
00289 CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
00290 if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
00291 if (bufferedMessages == NULL) bufferedMessages = (char **)malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
00292 bufferedMessages[CcsNumBufferedMsgs] = msg;
00293 CcsNumBufferedMsgs ++;
00294 }
00295 #endif
00296
00297
00298 int _ccsHandlerIdx = 0;
00299
00300 #if CMK_BIGSIM_CHARM
00301 CpvDeclare(int, _bgCcsHandlerIdx);
00302 CpvDeclare(int, _bgCcsAck);
00303 extern "C" int BgNodeSize(void);
00304 extern void addBgNodeInbuffer(char *, int);
00305
00306
00307
00308
00309 static void bg_req_fw_handler(char *msg) {
00310
00311 int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
00312 CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
00313 int destPE = (int)ChMessageInt(hdr->pe);
00314 if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
00315 CcsBufferMessage(msg);
00316 return;
00317 }
00318
00319 if (destPE == -1) destPE = 0;
00320 if (destPE < -1) {
00321 ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
00322 destPE = ChMessageInt(pes_nbo[0]);
00323 }
00324
00325 (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
00326 (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
00327 (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
00328 (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
00329 (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
00330
00331 addBgNodeInbuffer(msg, destPE/CmiNumPes());
00332
00333 }
00334 #define req_fw_handler bg_req_fw_handler
00335 #endif
00336 void req_fw_handler(char *msg);
00337
00338 void CcsReleaseMessages(void) {
00339 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
00340 #if CMK_BIGSIM_CHARM
00341 if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
00342 #endif
00343 if (CcsNumBufferedMsgs > 0) {
00344 int i;
00345
00346 for (i=0; i<CcsNumBufferedMsgs; ++i) {
00347 CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
00348 CsdEnqueue(bufferedMessages[i]);
00349 }
00350 free(bufferedMessages);
00351 bufferedMessages = NULL;
00352 CcsNumBufferedMsgs = -1;
00353 }
00354 #endif
00355 }
00356
00357
00358
00359 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
00360 {
00361 int reqLen=ChMessageInt(hdr->len);
00362 int destPE = ChMessageInt(hdr->pe);
00363 int len;
00364 char *msg;
00365 if (destPE < -1) reqLen -= destPE*sizeof(int);
00366 len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
00367 msg=(char *)CmiAlloc(len);
00368 memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
00369 memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
00370 if (ret_len!=NULL) *ret_len=len;
00371 if (_ccsHandlerIdx != 0) {
00372 CmiSetHandler(msg, _ccsHandlerIdx);
00373 return msg;
00374 } else {
00375 #if NODE_0_IS_CONVHOST
00376 CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
00377 #else
00378 CcsBufferMessage(msg);
00379 #endif
00380 return NULL;
00381 }
00382 }
00383
00384
00385
00386 static void rep_fw_handler(char *msg)
00387 {
00388 int len;
00389 char *r=msg+CmiReservedHeaderSize;
00390 CcsImplHeader *hdr=(CcsImplHeader *)r;
00391 r+=sizeof(CcsImplHeader);
00392 len=ChMessageInt(hdr->len);
00393 CcsImpl_reply(hdr,len,r);
00394 CmiFree(msg);
00395 }
00396
00397 #if NODE_0_IS_CONVHOST
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00437 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
00438 {
00439 const int repPE=0;
00440 rep->len=ChMessageInt_new(repLen);
00441 if (CmiMyPe()==repPE) {
00442
00443 CcsServer_sendReply(rep,repLen,repData);
00444 } else {
00445
00446 int len=CmiReservedHeaderSize+
00447 sizeof(CcsImplHeader)+repLen;
00448 char *msg = (char *)CmiAlloc(len);
00449 char *r=msg+CmiReservedHeaderSize;
00450 *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
00451 memcpy(r,repData,repLen);
00452 CmiSetHandler(msg,rep_fw_handler_idx);
00453 CmiSyncSendAndFree(repPE,len,msg);
00454 }
00455 }
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00470 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
00471 {
00472 char *msg;
00473 int len,repPE=ChMessageInt(hdr->pe);
00474 if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
00475 #if ! CMK_BIGSIM_CHARM
00476
00477 if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
00478 else CmiPrintf("Invalid processor index in CCS request.");
00479 CpvAccess(ccsReq)=hdr;
00480 CcsSendReply(0,NULL);
00481 return;
00482 #endif
00483 }
00484
00485 msg=CcsImpl_ccs2converse(hdr,reqData,&len);
00486 if (repPE >= 0) {
00487
00488
00489 CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
00490 } else if (repPE == -1) {
00491
00492
00493 CmiSyncSendAndFree(0,len,msg);
00494 } else {
00495
00496 int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
00497
00498
00499 CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
00500 }
00501 }
00502
00503
00504
00505
00506
00507
00508
00509 #include <signal.h>
00510 #include "ccs-server.C"
00511 #include "ccs-auth.C"
00512
00513
00514 void CcsServerCheck(void)
00515 {
00516 while (1==skt_select1(CcsServer_fd(),0)) {
00517 CcsImplHeader hdr;
00518 void *data;
00519
00520 if (CcsServer_recvRequest(&hdr,&data))
00521 {
00522
00523 if (! check_stdio_header(&hdr)) {
00524 CcsImpl_netRequest(&hdr,data);
00525 }
00526 free(data);
00527 }
00528 }
00529 }
00530
00531 #endif
00532
00533 int _isCcsHandlerIdx(int hIdx) {
00534 if (hIdx==_ccsHandlerIdx) return 1;
00535 if (hIdx==rep_fw_handler_idx) return 1;
00536 return 0;
00537 }
00538
00539 void CcsBuiltinsInit(char **argv);
00540
00541 CpvDeclare(int, cmiArgDebugFlag);
00542 CpvDeclare(char *, displayArgument);
00543 CpvCExtern(int, cpdSuspendStartup);
00544 CpvDeclare(int, cpdSuspendStartup);
00545
00546 void CcsInit(char **argv)
00547 {
00548 CpvInitialize(CkHashtable_c, ccsTab);
00549 CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
00550 CpvInitialize(CcsImplHeader *, ccsReq);
00551 CpvAccess(ccsReq) = NULL;
00552 CmiAssignOnce(&_ccsHandlerIdx,CmiRegisterHandler((CmiHandler)req_fw_handler));
00553 #if CMK_BIGSIM_CHARM
00554 CpvInitialize(int, _bgCcsHandlerIdx);
00555 CpvAccess(_bgCcsHandlerIdx) = 0;
00556 CpvInitialize(int, _bgCcsAck);
00557 CpvAccess(_bgCcsAck) = 0;
00558 #endif
00559 CpvInitialize(char *, displayArgument);
00560 CpvInitialize(int, cpdSuspendStartup);
00561 CpvAccess(displayArgument) = NULL;
00562 CpvAccess(cpdSuspendStartup) = 0;
00563
00564 CcsBuiltinsInit(argv);
00565
00566 CmiAssignOnce(&rep_fw_handler_idx, CmiRegisterHandler((CmiHandler)rep_fw_handler));
00567 #if NODE_0_IS_CONVHOST
00568 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
00569 CmiAssignOnce(&print_fw_handler_idx, CmiRegisterHandler((CmiHandler)print_fw_handler));
00570 #endif
00571 {
00572 int ccs_serverPort=0;
00573 char *ccs_serverAuth=NULL;
00574
00575 if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") |
00576 CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
00577 CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file"))
00578 if (CmiMyPe()==0)
00579 {
00580 CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
00581 CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
00582 }
00583 }
00584 #endif
00585
00586 if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
00587 {
00588 if(CmiMyRank() == 0) CpvAccess(cmiArgDebugFlag) = 1;
00589 if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
00590 {
00591 if (CpvAccess(displayArgument) == NULL)
00592 CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
00593 }
00594 else if (CmiMyPe() == 0)
00595 {
00596
00597 CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
00598 }
00599
00600 if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
00601 if(CmiMyRank() == 0) CpvAccess(cpdSuspendStartup) = 1;
00602 }
00603 }
00604
00605 CcsReleaseMessages();
00606 }
00607
00608 #endif
00609