00001
00002
00003
00004
00005 #include <stdio.h>
00006 #include <stdlib.h>
00007 #include <errno.h>
00008 #include <string.h>
00009 #include <limits.h>
00010
00011 #include "converse.h"
00012 #include "ckhashtable.h"
00013 #include "pup.h"
00014 #include "pup_toNetwork.h"
00015 #include "debug-conv++.h"
00016 #include "conv-ccs.h"
00017 #include "sockRoutines.h"
00018 #include "queueing.h"
00019 #include "ccs-builtins.h"
00020
00021 #ifdef __MINGW_H
00022 #include "process.h"
00023 #endif
00024
00025 #if CMK_CCS_AVAILABLE
00026
00027 void ccs_getinfo(char *msg);
00028
00029
00030
00031
00032
00033
00034
00035 typedef struct killPortStruct{
00036 skt_ip_t ip;
00037 unsigned int port;
00038 struct killPortStruct *next;
00039 } killPortStruct;
00040
00041 static killPortStruct *killList=NULL;
00042
00043 static void ccs_killport(char *msg)
00044 {
00045 killPortStruct *oldList=killList;
00046 int port=ChMessageInt(*(ChMessageInt_t *)(msg+CmiReservedHeaderSize));
00047 skt_ip_t ip;
00048 unsigned int connPort;
00049 CcsCallerId(&ip,&connPort);
00050 killList=(killPortStruct *)malloc(sizeof(killPortStruct));
00051 killList->ip=ip;
00052 killList->port=port;
00053 killList->next=oldList;
00054 CmiFree(msg);
00055 }
00056
00057 static int noMoreErrors(SOCKET skt, int c, const char *m) {return -1;}
00058 void CcsImpl_kill(void)
00059 {
00060 skt_set_abort(noMoreErrors);
00061 while (killList!=NULL)
00062 {
00063 SOCKET fd=skt_connect(killList->ip,killList->port,20);
00064 if (fd!=INVALID_SOCKET) {
00065 skt_sendN(fd,"die\n",strlen("die\n")+1);
00066 skt_close(fd);
00067 }
00068 killList=killList->next;
00069 }
00070 }
00071
00072
00073
00074
00075
00076
00077 #include <signal.h>
00078
00079 static void ccs_killpe(char *msg) {
00080 #if CMK_HAS_KILL
00081 kill(getpid(), 9);
00082 #else
00083 CmiAbort("ccs_killpe() not supported!");
00084 #endif
00085 }
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 static void CpdListBoundsCheck(CpdListAccessor *l,int &lo,int &hi)
00111 {
00112 if (l->checkBoundary()) {
00113 int len=l->getLength();
00114 if (lo<0) lo=0;
00115 if (hi>len) hi=len;
00116 }
00117 }
00118
00119 typedef CkHashtableTslow<const char *,CpdListAccessor *> CpdListTable_t;
00120 CpvStaticDeclare(CpdListTable_t *,cpdListTable);
00121
00125 static CpdListAccessor *CpdListLookup(const char *path)
00126 {
00127 CpdListAccessor *acc=CpvAccess(cpdListTable)->get(path);
00128 if (acc==NULL) {
00129 CmiError("CpdListAccessor> Unrecognized list path '%s'\n",path);
00130 return NULL;
00131 }
00132 return acc;
00133 }
00134
00140 static CpdListAccessor *CpdListLookup(const ChMessageInt_t *lenAndPath)
00141 {
00142 static const int CpdListMaxLen=80;
00143 int len=ChMessageInt(lenAndPath[0]);
00144 const char *path=(const char *)(lenAndPath+1);
00145 char pathBuf[CpdListMaxLen+1];
00146 if ((len<0) || (len>CpdListMaxLen)) {
00147 CmiError("CpdListAccessor> Invalid list path length %d!\n",len);
00148 return NULL;
00149 }
00150 strncpy(pathBuf,path,len);
00151 pathBuf[len]=0;
00152 return CpdListLookup(pathBuf);
00153 }
00154
00155
00156
00157
00158 static void CpdList_ccs_list_len(char *msg)
00159 {
00160 const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
00161 CpdListAccessor *acc=CpdListLookup(req);
00162 if (acc!=NULL) {
00163 ChMessageInt_t reply=ChMessageInt_new(acc->getLength());
00164 CcsSendReply(sizeof(reply),(void *)&reply);
00165 }
00166 CmiFree(msg);
00167 }
00168
00169
00170
00171
00172
00173
00174
00175
00176 static CpdListAccessor *CpdListHeader_ccs_list_items(char *msg,
00177 CpdListItemsRequest &h)
00178 {
00179 int msgLen=CmiSize((void *)msg)-CmiReservedHeaderSize;
00180 CpdListAccessor *ret=NULL;
00181 const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
00182 h.lo=ChMessageInt(req[0]);
00183 h.hi=ChMessageInt(req[1]);
00184 h.extraLen=ChMessageInt(req[2]);
00185 if (h.extraLen>=0
00186 && ((int)(3*sizeof(ChMessageInt_t)+h.extraLen))<msgLen) {
00187 h.extra=(void *)(req+3);
00188 ret=CpdListLookup((ChMessageInt_t *)(h.extraLen+(char *)h.extra));
00189 if (ret!=NULL) CpdListBoundsCheck(ret,h.lo,h.hi);
00190 }
00191 return ret;
00192 }
00193
00194
00195
00196 static void pupCpd(PUP::er &p, CpdListAccessor *acc, CpdListItemsRequest &req)
00197 {
00198 p.syncComment(PUP::sync_begin_array,"CpdList");
00199 acc->pup(p,req);
00200 p.syncComment(PUP::sync_end_array);
00201 }
00202
00203 static void CpdList_ccs_list_items_txt(char *msg)
00204 {
00205 CpdListItemsRequest req;
00206 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00207 if(acc == NULL) CmiPrintf("ccs-builtins> Null Accessor--bad list name (txt)\n");
00208 if (acc!=NULL) {
00209 int bufLen;
00210 {
00211 PUP::sizerText p; pupCpd(p,acc,req); bufLen=p.size();
00212 }
00213 char *buf=new char[bufLen];
00214 {
00215 PUP::toText p(buf); pupCpd(p,acc,req);
00216 if (p.size()!=bufLen)
00217 CmiError("ERROR! Sizing/packing length mismatch for %s list pup function!\n",
00218 acc->getPath());
00219 }
00220 CcsSendReply(bufLen,(void *)buf);
00221 delete[] buf;
00222 }
00223 CmiFree(msg);
00224 }
00225
00226 static void CpdList_ccs_list_items_set(char *msg)
00227 {
00228 CpdListItemsRequest req;
00229 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00230 if(acc == NULL) CmiPrintf("ccs-builtins> Null Accessor--bad list name (set)\n");
00231 else {
00232 PUP_toNetwork_unpack p(req.extra);
00233 pupCpd(p,acc,req);
00234 if (p.size()!=req.extraLen)
00235 CmiPrintf("Size mismatch during ccs_list_items.set: client sent %d bytes, but %d bytes used!\n",
00236 req.extraLen,p.size());
00237 }
00238 CmiFree(msg);
00239 }
00240
00242 void CpdMachineArchitecture(char *msg) {
00243 char reply[8];
00244 reply[0]=CHARMDEBUG_MAJOR;
00245 reply[1]=CHARMDEBUG_MINOR;
00246
00247 reply[2] = 0;
00248 if (sizeof(char*) == 4) reply[2] = 1;
00249 else if (sizeof(char*) == 8) reply[2] = 2;
00250
00251 reply[3] = 0;
00252 int value = 1;
00253 char firstByte = *((char*)&value);
00254 if (firstByte == 1) reply[3] = 1;
00255 else reply[3] = 2;
00256
00257 #if CMK_BIGSIM_CHARM
00258 reply[3] |= 4;
00259 #endif
00260
00261 reply[4] = sizeof(int);
00262
00263 reply[5] = sizeof(long);
00264 #if CMK_LONG_LONG_DEFINED
00265
00266 reply[6] = sizeof(long long);
00267 #else
00268
00269
00270
00271 reply[6] = 0;
00272 #endif
00273
00274 reply[7] = sizeof(bool);
00275 CcsSendReply(8, (void*)reply);
00276 CmiFree(msg);
00277 }
00278
00279 static void CpdList_ccs_list_items_fmt(char *msg)
00280 {
00281 CpdListItemsRequest req;
00282 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00283 if (acc!=NULL) {
00284 int bufLen;
00285 {
00286 PUP_toNetwork_sizer ps;
00287 PUP_fmt p(ps);
00288 pupCpd(p,acc,req);
00289 bufLen=ps.size();
00290 }
00291 char *buf=new char[bufLen];
00292 {
00293 PUP_toNetwork_pack pp(buf);
00294 PUP_fmt p(pp);
00295 pupCpd(p,acc,req);
00296 if (pp.size()!=bufLen)
00297 CmiError("ERROR! Sizing/packing length mismatch for %s list pup function (%d sizing, %d packing)\n",
00298 acc->getPath(),bufLen,pp.size());
00299 }
00300 CcsSendReply(bufLen,(void *)buf);
00301 delete[] buf;
00302 }
00303 CmiFree(msg);
00304 }
00305
00306
00307
00308
00309
00310 class CpdList_introspect : public CpdListAccessor {
00311 CpdListTable_t *tab;
00312 public:
00313 CpdList_introspect(CpdListTable_t *tab_) :tab(tab_) { }
00314 virtual const char *getPath(void) const { return "converse/lists";}
00315 virtual size_t getLength(void) const {
00316 size_t len=0;
00317 CkHashtableIterator *it=tab->iterator();
00318 while (NULL!=it->next()) len++;
00319 delete it;
00320 return len;
00321 }
00322 virtual void pup(PUP::er &p,CpdListItemsRequest &req) {
00323 CkHashtableIterator *it=tab->iterator();
00324 void *objp;
00325 int curObj=0;
00326 while (NULL!=(objp=it->next())) {
00327 if (curObj>=req.lo && curObj<req.hi) {
00328 CpdListAccessor *acc=*(CpdListAccessor **)objp;
00329 char *pathName=(char *)acc->getPath();
00330 beginItem(p,curObj);
00331 p.comment("name");
00332 p(pathName,strlen(pathName));
00333 }
00334 curObj++;
00335 }
00336 }
00337 };
00338
00339
00340
00341
00342 #endif
00343
00344
00345
00346
00347 void CpdListRegister(CpdListAccessor *acc)
00348 #if CMK_CCS_AVAILABLE
00349 {
00350 CpvAccess(cpdListTable)->put(acc->getPath())=acc;
00351 }
00352 #else
00353 { }
00354 #endif
00355
00356 void CpdListRegister_c(const char *path,
00357 CpdListLengthFn_c len,void *lenParam,
00358 CpdListItemsFn_c items,void *itemsParam,int checkBoundary)
00359 #if CMK_CCS_AVAILABLE
00360 {
00361 CpdListRegister(new CpdListAccessor_c(path,
00362 len,lenParam,items,itemsParam,checkBoundary!=0?true:false));
00363 }
00364 #else
00365 { }
00366 #endif
00367
00368 #if CMK_CCS_AVAILABLE
00369
00370
00371
00372
00373 static void CpdListInit(void) {
00374 CpvInitialize(CpdListTable_t *,cpdListTable);
00375 CpvAccess(cpdListTable)=new CpdListTable_t(31,0.5,
00376 CkHashFunction_string,CkHashCompare_string);
00377 CpdListRegister(new CpdList_introspect(CpvAccess(cpdListTable)));
00378
00379 CcsRegisterHandler("ccs_list_len",(CmiHandler)CpdList_ccs_list_len);
00380 CcsRegisterHandler("ccs_list_items.txt",(CmiHandler)CpdList_ccs_list_items_txt);
00381 CcsRegisterHandler("ccs_list_items.fmt",(CmiHandler)CpdList_ccs_list_items_fmt);
00382 CcsRegisterHandler("ccs_list_items.set",(CmiHandler)CpdList_ccs_list_items_set);
00383 CcsRegisterHandler("debug/converse/arch",(CmiHandler)CpdMachineArchitecture);
00384 }
00385
00386 #if CMK_WEB_MODE
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408 #if 0
00409 # define WEBDEBUG(x) CmiPrintf x
00410 #else
00411 # define WEBDEBUG(x)
00412 #endif
00413
00414 #define WEB_INTERVAL 1000
00415 #define MAXFNS 20
00416
00417 typedef struct {
00418 char hdr[CmiReservedHeaderSize];
00419 int fromPE;
00420 int perfData[MAXFNS];
00421 } CWeb_CollectedData;
00422
00423
00424 static int hasApplet=0;
00425 static CcsDelayedReply appletReply;
00426
00427 typedef int (*CWebFunction)(void);
00428 static CWebFunction CWebPerformanceFunctionArray[MAXFNS];
00429 static int CWebNoOfFns;
00430 static int CWeb_ReduceIndex;
00431 static int CWeb_CollectIndex;
00432
00433
00434
00435 static int collectedCount;
00436 static CWeb_CollectedData **collectedValues;
00437
00438 static void CWeb_Deliver(void)
00439 {
00440 int i,j;
00441
00442 if (hasApplet) {
00443 WEBDEBUG(("CWeb_Deliver to applet\n"));
00444
00445 char *reply=(char *)malloc(6+14*CmiNumPes()*CWebNoOfFns);
00446 sprintf(reply,"perf");
00447
00448 for(i=0; i<CmiNumPes(); i++){
00449 for (j=0;j<CWebNoOfFns;j++)
00450 {
00451 char buf[20];
00452 sprintf(buf," %d",collectedValues[i]->perfData[j]);
00453 strcat(reply,buf);
00454 }
00455 }
00456 CcsSendDelayedReply(appletReply,strlen(reply) + 1, reply);
00457 free(reply);
00458 hasApplet=0;
00459 }
00460 else
00461 WEBDEBUG(("CWeb_Deliver (NO APPLET)\n"));
00462
00463
00464 for(i = 0; i < CmiNumPes(); i++){
00465 CmiFree(collectedValues[i]);
00466 collectedValues[i] = 0;
00467 }
00468 collectedCount = 0;
00469 }
00470
00471
00472
00473 static void CWeb_Reduce(void *msg){
00474 CWeb_CollectedData *cur,*prev;
00475 int src;
00476 if(CmiMyPe() != 0){
00477 CmiAbort("CWeb performance data sent to wrong processor...\n");
00478 }
00479 WEBDEBUG(("CWeb_Reduce"));
00480 cur=(CWeb_CollectedData *)msg;
00481 src=cur->fromPE;
00482 prev = collectedValues[src];
00483 collectedValues[src] = cur;
00484 if(prev == 0) collectedCount++;
00485 else CmiFree(prev);
00486
00487 if(collectedCount == CmiNumPes()){
00488 CWeb_Deliver();
00489 }
00490 }
00491
00492
00493
00494
00495 static void CWeb_Collect(void)
00496 {
00497 CWeb_CollectedData *msg;
00498 int i;
00499
00500 WEBDEBUG(("CWeb_Collect on %d\n",CmiMyPe()));
00501 msg = (CWeb_CollectedData *)CmiAlloc(sizeof(CWeb_CollectedData));
00502 msg->fromPE = CmiMyPe();
00503
00504
00505 for(i = 0; i < CWebNoOfFns; i++)
00506 msg->perfData[i] = CWebPerformanceFunctionArray[i] ();
00507
00508
00509 CmiSetHandler(msg, CWeb_ReduceIndex);
00510 CmiSyncSendAndFree(0, sizeof(CWeb_CollectedData), msg);
00511
00512
00513 CcdCallFnAfter((CcdVoidFn)CWeb_Collect, 0, WEB_INTERVAL);
00514 }
00515
00516 void CWebPerformanceRegisterFunction(CWebFunction fn)
00517 {
00518 if (CmiMyRank()!=0) return;
00519 if (CWebNoOfFns>=MAXFNS) CmiAbort("Registered too many CWebPerformance functions!");
00520 CWebPerformanceFunctionArray[CWebNoOfFns] = fn;
00521 CWebNoOfFns++;
00522 }
00523
00524
00525
00526
00527 static void CWebHandler(void){
00528 if(CcsIsRemoteRequest()) {
00529 static int startedCollection=0;
00530
00531 WEBDEBUG(("CWebHandler request on %d\n",CmiMyPe()));
00532 hasApplet=1;
00533 appletReply=CcsDelayReply();
00534
00535 if(startedCollection == 0){
00536 WEBDEBUG(("Starting data collection on every processor\n"));
00537 int i;
00538 startedCollection=1;
00539 collectedCount=0;
00540 collectedValues = (CWeb_CollectedData **)malloc(sizeof(void *) * CmiNumPes());
00541 for(i = 0; i < CmiNumPes(); i++)
00542 collectedValues[i] = 0;
00543
00544
00545 for(i = 0; i < CmiNumPes(); i++){
00546 char *msg = (char *)CmiAlloc(CmiReservedHeaderSize);
00547 CmiSetHandler(msg, CWeb_CollectIndex);
00548 CmiSyncSendAndFree(i, CmiReservedHeaderSize,msg);
00549 }
00550 }
00551 }
00552 }
00553
00558 struct CWebModeStats {
00559 public:
00560 double beginTime;
00561 double startTime;
00562 double usedTime;
00563 int PROCESSING;
00564 };
00565 CpvStaticDeclare(CWebModeStats *,cwebStats);
00566
00567
00568
00569 static void usageReset(CWebModeStats *stats,double curWallTime)
00570 {
00571 stats->beginTime=curWallTime;
00572 stats->usedTime = 0.;
00573 }
00574
00575
00576
00577 static void usageStart(CWebModeStats *stats,double curWallTime)
00578 {
00579 stats->startTime = curWallTime;
00580 stats->PROCESSING = 1;
00581 }
00582
00583
00584
00585 static void usageStop(CWebModeStats *stats,double curWallTime)
00586 {
00587 stats->usedTime += curWallTime - stats->startTime;
00588 stats->PROCESSING = 0;
00589 }
00590
00591
00592
00593
00594
00595 static void initUsage()
00596 {
00597 CpvInitialize(CWebModeStats *, cwebStats);
00598 CWebModeStats *stats=new CWebModeStats;
00599 CpvAccess(cwebStats)=stats;
00600 usageReset(stats,CmiWallTimer());
00601 usageStart(stats,CmiWallTimer());
00602 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,(CcdVoidFn)usageStart,stats);
00603 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)usageStop,stats);
00604 }
00605
00606 static int getUsage(void)
00607 {
00608 int usage = 0;
00609 double time = CmiWallTimer();
00610 CWebModeStats *stats=CpvAccess(cwebStats);
00611 double totalTime = time - stats->beginTime;
00612
00613 if(stats->PROCESSING)
00614 {
00615 usageStop(stats,time); usageStart(stats,time);
00616 }
00617 if(totalTime > 0.)
00618 usage = (int)(0.5 + 100 *stats->usedTime/totalTime);
00619 usageReset(stats,time);
00620
00621 return usage;
00622 }
00623
00624 static int getSchedQlen(void)
00625 {
00626 return(CqsLength((Queue)CpvAccess(CsdSchedQueue)));
00627 }
00628
00629 #endif
00630
00631 #if ! CMK_WEB_MODE
00632 static void CWeb_Invalid(void)
00633 {
00634 CmiAbort("Invalid web mode handler invoked!\n");
00635 }
00636 #endif
00637
00638 void CWebInit(void)
00639 {
00640 #if CMK_WEB_MODE
00641 CcsRegisterHandler("perf_monitor", (CmiHandler)CWebHandler);
00642
00643 CmiAssignOnce(&CWeb_CollectIndex, CmiRegisterHandler((CmiHandler)CWeb_Collect));
00644 CmiAssignOnce(&CWeb_ReduceIndex, CmiRegisterHandler((CmiHandler)CWeb_Reduce));
00645
00646 initUsage();
00647 CWebPerformanceRegisterFunction(getUsage);
00648 CWebPerformanceRegisterFunction(getSchedQlen);
00649 #else
00650
00651
00652 CmiRegisterHandler((CmiHandler)CWeb_Invalid);
00653 CmiRegisterHandler((CmiHandler)CWeb_Invalid);
00654 #endif
00655 }
00656
00657
00658 void CcsBuiltinsInit(char **argv)
00659 {
00660 CcsRegisterHandler("ccs_getinfo",(CmiHandler)ccs_getinfo);
00661 CcsRegisterHandler("ccs_killport",(CmiHandler)ccs_killport);
00662 CcsRegisterHandler("ccs_killpe",(CmiHandler)ccs_killpe);
00663 CWebInit();
00664 CpdListInit();
00665 }
00666
00667
00668 #endif
00669
00670 void PUP_fmt::fieldHeader(typeCode_t typeCode,int nItems) {
00671
00672 lengthLen_t ll;
00673 if (nItems==1) ll=lengthLen_single;
00674 else if (nItems<256) ll=lengthLen_byte;
00675 else ll=lengthLen_int;
00676
00677 byte intro=(((int)ll)<<4)+(int)typeCode;
00678 p(intro);
00679
00680 switch(ll) {
00681 case lengthLen_single: break;
00682 case lengthLen_byte: {
00683 byte l=nItems;
00684 p(l);
00685 } break;
00686 case lengthLen_int: {
00687 p(nItems);
00688 } break;
00689 case lengthLen_long: CmiAbort("Should not have reached here!"); break;
00690 };
00691 }
00692
00693 void PUP_fmt::comment(const char *message) {
00694 size_t nItems=strlen(message);
00695 fieldHeader(typeCode_comment,nItems);
00696 p((char *)message,nItems);
00697 }
00698 void PUP_fmt::synchronize(unsigned int m) {
00699 fieldHeader(typeCode_sync,1);
00700 p(m);
00701 }
00702 void PUP_fmt::bytes(void *ptr,size_t n,size_t itemSize,PUP::dataType t) {
00703 if(itemSize > INT_MAX || n > INT_MAX || itemSize*n > INT_MAX)
00704 CmiAbort("Ccs does not support messages greater than INT_MAX...\n");
00705 switch(t) {
00706 case PUP::Tchar:
00707 case PUP::Tuchar:
00708 case PUP::Tbyte:
00709 fieldHeader(typeCode_byte,n);
00710 p.bytes(ptr,n,itemSize,t);
00711 break;
00712 case PUP::Tshort: case PUP::Tint:
00713 case PUP::Tushort: case PUP::Tuint:
00714 case PUP::Tbool:
00715 fieldHeader(typeCode_int,n);
00716 p.bytes(ptr,n,itemSize,t);
00717 break;
00718
00719 case PUP::Tlong: case PUP::Tlonglong:
00720 case PUP::Tulong: case PUP::Tulonglong:
00721 fieldHeader(typeCode_long,n);
00722 p.bytes(ptr,n,itemSize,t);
00723 break;
00724 case PUP::Tfloat:
00725 fieldHeader(typeCode_float,n);
00726 p.bytes(ptr,n,itemSize,t);
00727 break;
00728 case PUP::Tdouble: case PUP::Tlongdouble:
00729 fieldHeader(typeCode_double,n);
00730 p.bytes(ptr,n,itemSize,t);
00731 break;
00732 case PUP::Tpointer:
00733 fieldHeader(typeCode_pointer,n);
00734 p.bytes(ptr,n,itemSize,t);
00735 break;
00736 default: CmiAbort("Unrecognized type code in PUP_fmt::bytes");
00737 };
00738 }
00739
00740