00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <errno.h>
00004 #include <string.h>
00005
00006 #include "converse.h"
00007 #include "conv-ccs.h"
00008 #include "ccs-server.h"
00009 #include "sockRoutines.h"
00010 #include "queueing.h"
00011
00012 #if CMK_CCS_AVAILABLE
00013
00014
00015
00016
00017
00018
00019
00020 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
00021 if (strlen(name)>=CCS_MAXHANDLER)
00022 CmiAbort("CCS handler names cannot exceed 32 characters");
00023 c->name=strdup(name);
00024 c->fn=NULL;
00025 c->fnOld=NULL;
00026 c->userPtr=NULL;
00027 c->mergeFn=NULL;
00028 c->nCalls=0;
00029 }
00030
00031 static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
00032 c->nCalls++;
00033 if (c->fnOld)
00034 {
00035
00036
00037
00038 char *cmsg = (char *) CmiAlloc(CmiReservedHeaderSize+reqLen);
00039 memcpy(cmsg+CmiReservedHeaderSize, reqData, reqLen);
00040 (c->fnOld)(cmsg);
00041 }
00042 else {
00043 (c->fn)(c->userPtr, reqLen, reqData);
00044 }
00045 }
00046
00047
00048 CpvDeclare(CcsHandlerTable, ccsTab);
00049
00050 CpvStaticDeclare(CcsImplHeader*,ccsReq);
00051
00052 void CcsRegisterHandler(const char *name, CmiHandler fn) {
00053 CcsHandlerRec cp;
00054 initHandlerRec(&cp,name);
00055 cp.fnOld=fn;
00056 *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
00057 }
00058 void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
00059 CcsHandlerRec cp;
00060 initHandlerRec(&cp,name);
00061 cp.fn=fn;
00062 cp.userPtr=ptr;
00063 *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
00064 }
00065 CcsHandlerRec *CcsGetHandler(const char *name) {
00066 return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
00067 }
00068 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
00069 CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
00070 if (rec==NULL) {
00071 CmiAbort("CCS: Unknown CCS handler name.\n");
00072 }
00073 rec->mergeFn=newMerge;
00074 rec->redID=CmiGetGlobalReduction();
00075 }
00076
00077 void * CcsMerge_concat(int *size,void *local,void **remote,int n) {
00078 CcsImplHeader *hdr;
00079 int total = *size;
00080 void *reply;
00081 char *ptr;
00082 int i;
00083 for (i=0; i<n; ++i) {
00084 hdr = (CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize);
00085 total += ChMessageInt(hdr->len);
00086 }
00087 reply = CmiAlloc(total);
00088 memcpy(reply, local, *size);
00089 ((CcsImplHeader*)(((char*)reply)+CmiReservedHeaderSize))->len = ChMessageInt_new(total-CmiReservedHeaderSize-sizeof(CcsImplHeader));
00090 CmiFree(local);
00091 ptr = ((char*)reply)+*size;
00092 for (i=0; i<n; ++i) {
00093 int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiReservedHeaderSize))->len);
00094 memcpy(ptr, ((char*)remote[i])+CmiReservedHeaderSize+sizeof(CcsImplHeader), len);
00095 ptr += len;
00096 }
00097 *size = total;
00098 return reply;
00099 }
00100
00101 #define SIMPLE_REDUCTION(name, dataType, loop) \
00102 void * CcsMerge_##name(int *size,void *local,void **remote,int n) { \
00103 int i, m; \
00104 CcsImplHeader *hdrLocal = (CcsImplHeader*)(((char*)local)+CmiReservedHeaderSize); \
00105 int lenLocal = ChMessageInt(hdrLocal->len); \
00106 int nElem = lenLocal / sizeof(dataType); \
00107 dataType *ret = (dataType *) (hdrLocal+1); \
00108 CcsImplHeader *hdr; \
00109 for (m=0; m<n; ++m) { \
00110 int len; \
00111 dataType *value; \
00112 hdr = (CcsImplHeader*)(((char*)remote[m])+CmiReservedHeaderSize); \
00113 len = ChMessageInt(hdr->len); \
00114 value = (dataType *)(hdr+1); \
00115 CmiAssert(lenLocal == len); \
00116 for (i=0; i<nElem; ++i) loop; \
00117 } \
00118 return local; \
00119 }
00120
00121 SIMPLE_REDUCTION(logical_and, int, ret[i]=(ret[i]&&value[i])?1:0)
00122 SIMPLE_REDUCTION(logical_or, int, ret[i]=(ret[i]||value[i])?1:0)
00123 SIMPLE_REDUCTION(bitvec_and, int, ret[i]&=value[i])
00124 SIMPLE_REDUCTION(bitvec_or, int, ret[i]|=value[i])
00125
00126
00127 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
00128 SIMPLE_REDUCTION(nameBase##_int, int, loop) \
00129 SIMPLE_REDUCTION(nameBase##_float, float, loop) \
00130 SIMPLE_REDUCTION(nameBase##_double, double, loop)
00131
00132 SIMPLE_POLYMORPH_REDUCTION(sum, ret[i]+=value[i])
00133 SIMPLE_POLYMORPH_REDUCTION(product, ret[i]*=value[i])
00134 SIMPLE_POLYMORPH_REDUCTION(max, if (ret[i]<value[i]) ret[i]=value[i])
00135 SIMPLE_POLYMORPH_REDUCTION(min, if (ret[i]>value[i]) ret[i]=value[i])
00136
00137 #undef SIMPLE_REDUCTION
00138 #undef SIMPLE_POLYMORPH_REDUCTION
00139
00140 int CcsEnabled(void)
00141 {
00142 return 1;
00143 }
00144
00145 int CcsIsRemoteRequest(void)
00146 {
00147 return CpvAccess(ccsReq)!=NULL;
00148 }
00149
00150 void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
00151 {
00152 *pip = CpvAccess(ccsReq)->attr.ip;
00153 *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
00154 }
00155
00156 int rep_fw_handler_idx;
00157
00158 CcsDelayedReply CcsDelayReply(void)
00159 {
00160 CcsDelayedReply ret;
00161 int len = sizeof(CcsImplHeader);
00162 if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
00163 len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
00164 ret.hdr = (CcsImplHeader*)malloc(len);
00165 memcpy(ret.hdr, CpvAccess(ccsReq), len);
00166 CpvAccess(ccsReq)=NULL;
00167 return ret;
00168 }
00169
00170 void CcsSendReply(int replyLen, const void *replyData)
00171 {
00172 if (CpvAccess(ccsReq)==NULL)
00173 CmiAbort("CcsSendReply: reply already sent!\n");
00174 CpvAccess(ccsReq)->len = ChMessageInt_new(1);
00175 CcsReply(CpvAccess(ccsReq),replyLen,replyData);
00176 CpvAccess(ccsReq) = NULL;
00177 }
00178
00179 void CcsSendReplyNoError(int replyLen, const void *replyData) {
00180 if (CpvAccess(ccsReq)==NULL) return;
00181 CcsSendReply(replyLen, replyData);
00182 }
00183
00184 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
00185 {
00186 CcsImplHeader *h = d.hdr;
00187 h->len=ChMessageInt_new(1);
00188 CcsReply(h,replyLen,replyData);
00189 free(h);
00190 }
00191
00192 void CcsNoReply()
00193 {
00194 if (CpvAccess(ccsReq)==NULL) return;
00195 CpvAccess(ccsReq)->len = ChMessageInt_new(0);
00196 CcsReply(CpvAccess(ccsReq),0,NULL);
00197 CpvAccess(ccsReq) = NULL;
00198 }
00199
00200 void CcsNoDelayedReply(CcsDelayedReply d)
00201 {
00202 CcsImplHeader *h = d.hdr;
00203 h->len = ChMessageInt_new(0);
00204 CcsReply(h,0,NULL);
00205 free(h);
00206 }
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219 void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
00220 {
00221 char *cmsg;
00222 int reqLen=ChMessageInt(hdr->len);
00223
00224 char *handlerStr=hdr->handler;
00225 CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
00226 if (fn==NULL) {
00227 CmiPrintf("CCS: Unknown CCS handler name '%s' requested. Ignoring...\n",
00228 hdr->handler);
00229 CpvAccess(ccsReq)=hdr;
00230 CcsSendReply(0,NULL);
00231 return;
00232
00233 }
00234
00235
00236 CpvAccess(ccsReq)=hdr;
00237 #if CMK_CHARMDEBUG
00238 if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
00239
00240 int bytes;
00241 if (4==read(conditionalPipe[0], &bytes, 4)) {
00242 char *buf = malloc(bytes);
00243 read(conditionalPipe[0], buf, bytes);
00244 CcsSendReply(bytes,buf);
00245 free(buf);
00246 } else {
00247
00248 CpdEndConditionalDeliver_master();
00249 }
00250 }
00251 else
00252 #endif
00253 {
00254 callHandlerRec(fn,reqLen,reqData);
00255
00256
00257 if (CpvAccess(ccsReq)!=NULL)
00258 CcsSendReply(0,NULL);
00259 }
00260 }
00261
00262 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
00263
00264
00265
00266 static char **bufferedMessages = NULL;
00267 static int CcsNumBufferedMsgs = 0;
00268 #define CCS_MAX_NUM_BUFFERED_MSGS 100
00269
00270 void CcsBufferMessage(char *msg) {
00271 CmiPrintf("Buffering CCS message\n");
00272 CmiAssert(CcsNumBufferedMsgs < CCS_MAX_NUM_BUFFERED_MSGS);
00273 if (CcsNumBufferedMsgs < 0) CmiAbort("Why is a CCS message being buffered now???");
00274 if (bufferedMessages == NULL) bufferedMessages = malloc(sizeof(char*)*CCS_MAX_NUM_BUFFERED_MSGS);
00275 bufferedMessages[CcsNumBufferedMsgs] = msg;
00276 CcsNumBufferedMsgs ++;
00277 }
00278 #endif
00279
00280
00281 int _ccsHandlerIdx = 0;
00282
00283 #if CMK_BIGSIM_CHARM
00284 CpvDeclare(int, _bgCcsHandlerIdx);
00285 CpvDeclare(int, _bgCcsAck);
00286
00287
00288
00289
00290 static void bg_req_fw_handler(char *msg) {
00291 if (CpvAccess(_bgCcsAck) < BgNodeSize()) {
00292 CcsBufferMessage(msg);
00293 return;
00294 }
00295
00296
00297 int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
00298 CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
00299 int destPE = (int)ChMessageInt(hdr->pe);
00300 if (destPE == -1) destPE = 0;
00301 if (destPE < -1) {
00302 ChMessageInt_t *pes_nbo = (ChMessageInt_t *)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
00303 destPE = ChMessageInt(pes_nbo[0]);
00304 }
00305
00306 (((CmiBlueGeneMsgHeader*)msg)->tID) = 0;
00307 (((CmiBlueGeneMsgHeader*)msg)->n) = 0;
00308 (((CmiBlueGeneMsgHeader*)msg)->flag) = 0;
00309 (((CmiBlueGeneMsgHeader*)msg)->t) = 0;
00310 (((CmiBlueGeneMsgHeader*)msg)->hID) = CpvAccess(_bgCcsHandlerIdx);
00311
00312 addBgNodeInbuffer(msg, destPE/CmiNumPes());
00313
00314 }
00315 #define req_fw_handler bg_req_fw_handler
00316 #endif
00317 extern void req_fw_handler(char *msg);
00318
00319 void CcsReleaseMessages() {
00320 #if ! NODE_0_IS_CONVHOST || CMK_BIGSIM_CHARM
00321 #if CMK_BIGSIM_CHARM
00322 if (CpvAccess(_bgCcsAck) == 0 || CpvAccess(_bgCcsAck) < BgNodeSize()) return;
00323 #endif
00324 if (CcsNumBufferedMsgs > 0) {
00325 int i;
00326
00327 for (i=0; i<CcsNumBufferedMsgs; ++i) {
00328 CmiSetHandler(bufferedMessages[i], _ccsHandlerIdx);
00329 CsdEnqueue(bufferedMessages[i]);
00330 }
00331 free(bufferedMessages);
00332 bufferedMessages = NULL;
00333 CcsNumBufferedMsgs = -1;
00334 }
00335 #endif
00336 }
00337
00338
00339
00340 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
00341 {
00342 int reqLen=ChMessageInt(hdr->len);
00343 int destPE = ChMessageInt(hdr->pe);
00344 int len;
00345 char *msg;
00346 if (destPE < -1) reqLen -= destPE*sizeof(int);
00347 len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+reqLen;
00348 msg=(char *)CmiAlloc(len);
00349 memcpy(msg+CmiReservedHeaderSize,hdr,sizeof(CcsImplHeader));
00350 memcpy(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader),data,reqLen);
00351 if (ret_len!=NULL) *ret_len=len;
00352 if (_ccsHandlerIdx != 0) {
00353 CmiSetHandler(msg, _ccsHandlerIdx);
00354 return msg;
00355 } else {
00356 #if NODE_0_IS_CONVHOST
00357 CmiAbort("Why do we need to buffer messages when node 0 is Convhost?");
00358 #else
00359 CcsBufferMessage(msg);
00360 return NULL;
00361 #endif
00362 }
00363 }
00364
00365
00366
00367 static void rep_fw_handler(char *msg)
00368 {
00369 int len;
00370 char *r=msg+CmiReservedHeaderSize;
00371 CcsImplHeader *hdr=(CcsImplHeader *)r;
00372 r+=sizeof(CcsImplHeader);
00373 len=ChMessageInt(hdr->len);
00374 CcsImpl_reply(hdr,len,r);
00375 CmiFree(msg);
00376 }
00377
00378 #if NODE_0_IS_CONVHOST
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00418 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
00419 {
00420 const int repPE=0;
00421 rep->len=ChMessageInt_new(repLen);
00422 if (CmiMyPe()==repPE) {
00423
00424 CcsServer_sendReply(rep,repLen,repData);
00425 } else {
00426
00427 int len=CmiReservedHeaderSize+
00428 sizeof(CcsImplHeader)+repLen;
00429 char *msg=CmiAlloc(len);
00430 char *r=msg+CmiReservedHeaderSize;
00431 *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
00432 memcpy(r,repData,repLen);
00433 CmiSetHandler(msg,rep_fw_handler_idx);
00434 CmiSyncSendAndFree(repPE,len,msg);
00435 }
00436 }
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00451 void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
00452 {
00453 char *msg;
00454 int len,repPE=ChMessageInt(hdr->pe);
00455 if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
00456 #if ! CMK_BIGSIM_CHARM
00457
00458 if (repPE==-CmiNumPes()) CmiPrintf("Invalid processor index in CCS request: are you trying to do a broadcast instead?");
00459 else CmiPrintf("Invalid processor index in CCS request.");
00460 CpvAccess(ccsReq)=hdr;
00461 CcsSendReply(0,NULL);
00462 return;
00463 #endif
00464 }
00465
00466 msg=CcsImpl_ccs2converse(hdr,reqData,&len);
00467 if (repPE >= 0) {
00468
00469
00470 CmiSyncSendAndFree(repPE%CmiNumPes(),len,msg);
00471 } else if (repPE == -1) {
00472
00473
00474 CmiSyncSendAndFree(0,len,msg);
00475 } else {
00476
00477 int firstPE = ChMessageInt(*(ChMessageInt_t*)reqData);
00478
00479
00480 CmiSyncSendAndFree(firstPE%CmiNumPes(),len,msg);
00481 }
00482 }
00483
00484
00485
00486
00487
00488
00489
00490 #include <signal.h>
00491 #include "ccs-server.c"
00492 #include "ccs-auth.c"
00493
00494
00495 void CcsServerCheck(void)
00496 {
00497 while (1==skt_select1(CcsServer_fd(),0)) {
00498 CcsImplHeader hdr;
00499 void *data;
00500
00501 if (CcsServer_recvRequest(&hdr,&data))
00502 {
00503
00504 if (! check_stdio_header(&hdr)) {
00505 CcsImpl_netRequest(&hdr,data);
00506 }
00507 free(data);
00508 }
00509 }
00510 }
00511
00512 #endif
00513
00514 int _isCcsHandlerIdx(int hIdx) {
00515 if (hIdx==_ccsHandlerIdx) return 1;
00516 if (hIdx==rep_fw_handler_idx) return 1;
00517 return 0;
00518 }
00519
00520 void CcsBuiltinsInit(char **argv);
00521
00522 CpvDeclare(int, cmiArgDebugFlag);
00523 CpvDeclare(char *, displayArgument);
00524 CpvDeclare(int, cpdSuspendStartup);
00525
00526 void CcsInit(char **argv)
00527 {
00528 CpvInitialize(CkHashtable_c, ccsTab);
00529 CpvAccess(ccsTab) = CkCreateHashtable_string(sizeof(CcsHandlerRec),5);
00530 CpvInitialize(CcsImplHeader *, ccsReq);
00531 CpvAccess(ccsReq) = NULL;
00532 _ccsHandlerIdx = CmiRegisterHandler((CmiHandler)req_fw_handler);
00533 #if CMK_BIGSIM_CHARM
00534 CpvInitialize(int, _bgCcsHandlerIdx);
00535 CpvAccess(_bgCcsHandlerIdx) = 0;
00536 CpvInitialize(int, _bgCcsAck);
00537 CpvAccess(_bgCcsAck) = 0;
00538 #endif
00539 CpvInitialize(int, cmiArgDebugFlag);
00540 CpvInitialize(char *, displayArgument);
00541 CpvInitialize(int, cpdSuspendStartup);
00542 CpvAccess(cmiArgDebugFlag) = 0;
00543 CpvAccess(displayArgument) = NULL;
00544 CpvAccess(cpdSuspendStartup) = 0;
00545
00546 CcsBuiltinsInit(argv);
00547
00548 rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
00549 #if NODE_0_IS_CONVHOST
00550 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
00551 print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
00552 #endif
00553 {
00554 int ccs_serverPort=0;
00555 char *ccs_serverAuth=NULL;
00556
00557 if (CmiGetArgFlagDesc(argv,"++server", "Create a CCS server port") |
00558 CmiGetArgIntDesc(argv,"++server-port",&ccs_serverPort, "Listen on this TCP/IP port number") |
00559 CmiGetArgStringDesc(argv,"++server-auth",&ccs_serverAuth, "Use this CCS authentication file"))
00560 if (CmiMyPe()==0)
00561 {
00562 CcsServer_new(NULL,&ccs_serverPort,ccs_serverAuth);
00563 CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)CcsServerCheck,NULL);
00564 }
00565 }
00566 #endif
00567
00568 if (CmiGetArgFlagDesc(argv, "+cpd", "Used *only* in conjunction with parallel debugger"))
00569 {
00570 CpvAccess(cmiArgDebugFlag) = 1;
00571 if (CmiGetArgStringDesc(argv, "+DebugDisplay",&(CpvAccess(displayArgument)), "X display for gdb used only in cpd mode"))
00572 {
00573 if (CpvAccess(displayArgument) == NULL)
00574 CmiPrintf("WARNING> NULL parameter for +DebugDisplay\n***");
00575 }
00576 else if (CmiMyPe() == 0)
00577 {
00578
00579 CmiPrintf("WARNING> x term for gdb needs to be specified as +DebugDisplay by debugger\n***\n");
00580 }
00581
00582 if (CmiGetArgFlagDesc(argv, "+DebugSuspend", "Suspend execution at beginning of program")) {
00583 CpvAccess(cpdSuspendStartup) = 1;
00584 }
00585 }
00586
00587 CcsReleaseMessages();
00588 }
00589
00590 #endif
00591