00001
00002
00003
00004
00005 #include <stdio.h>
00006 #include <stdlib.h>
00007 #include <errno.h>
00008 #include <string.h>
00009
00010 #include "converse.h"
00011 #include "ckhashtable.h"
00012 #include "pup.h"
00013 #include "pup_toNetwork.h"
00014 #include "debug-conv++.h"
00015 #include "conv-ccs.h"
00016 #include "sockRoutines.h"
00017 #include "queueing.h"
00018 #include "ccs-builtins.h"
00019
00020 #ifdef __MINGW_H
00021 #include "process.h"
00022 #endif
00023
00024 #if CMK_CCS_AVAILABLE
00025
00026 void ccs_getinfo(char *msg);
00027
00028
00029
00030
00031
00032
00033
00034 typedef struct killPortStruct{
00035 skt_ip_t ip;
00036 unsigned int port;
00037 struct killPortStruct *next;
00038 } killPortStruct;
00039
00040 static killPortStruct *killList=NULL;
00041
00042 static void ccs_killport(char *msg)
00043 {
00044 killPortStruct *oldList=killList;
00045 int port=ChMessageInt(*(ChMessageInt_t *)(msg+CmiReservedHeaderSize));
00046 skt_ip_t ip;
00047 unsigned int connPort;
00048 CcsCallerId(&ip,&connPort);
00049 killList=(killPortStruct *)malloc(sizeof(killPortStruct));
00050 killList->ip=ip;
00051 killList->port=port;
00052 killList->next=oldList;
00053 CmiFree(msg);
00054 }
00055
00056 static int noMoreErrors(int c,const char *m) {return -1;}
00057 extern "C" void CcsImpl_kill(void)
00058 {
00059 skt_set_abort(noMoreErrors);
00060 while (killList!=NULL)
00061 {
00062 SOCKET fd=skt_connect(killList->ip,killList->port,20);
00063 if (fd!=INVALID_SOCKET) {
00064 skt_sendN(fd,"die\n",strlen("die\n")+1);
00065 skt_close(fd);
00066 }
00067 killList=killList->next;
00068 }
00069 }
00070
00071
00072
00073
00074
00075
00076 #include <signal.h>
00077
00078 static void ccs_killpe(char *msg) {
00079 #if CMK_HAS_KILL
00080 kill(getpid(), 9);
00081 #else
00082 CmiAbort("ccs_killpe() not supported!");
00083 #endif
00084 }
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 static void CpdListBoundsCheck(CpdListAccessor *l,int &lo,int &hi)
00110 {
00111 if (l->checkBoundary()) {
00112 int len=l->getLength();
00113 if (lo<0) lo=0;
00114 if (hi>len) hi=len;
00115 }
00116 }
00117
00118 typedef CkHashtableTslow<const char *,CpdListAccessor *> CpdListTable_t;
00119 CpvStaticDeclare(CpdListTable_t *,cpdListTable);
00120
00124 static CpdListAccessor *CpdListLookup(const char *path)
00125 {
00126 CpdListAccessor *acc=CpvAccess(cpdListTable)->get(path);
00127 if (acc==NULL) {
00128 CmiError("CpdListAccessor> Unrecognized list path '%s'\n",path);
00129 return NULL;
00130 }
00131 return acc;
00132 }
00133
00139 static CpdListAccessor *CpdListLookup(const ChMessageInt_t *lenAndPath)
00140 {
00141 static const int CpdListMaxLen=80;
00142 int len=ChMessageInt(lenAndPath[0]);
00143 const char *path=(const char *)(lenAndPath+1);
00144 char pathBuf[CpdListMaxLen+1];
00145 if ((len<0) || (len>CpdListMaxLen)) {
00146 CmiError("CpdListAccessor> Invalid list path length %d!\n",len);
00147 return NULL;
00148 }
00149 strncpy(pathBuf,path,len);
00150 pathBuf[len]=0;
00151 return CpdListLookup(pathBuf);
00152 }
00153
00154
00155
00156
00157 static void CpdList_ccs_list_len(char *msg)
00158 {
00159 const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
00160 CpdListAccessor *acc=CpdListLookup(req);
00161 if (acc!=NULL) {
00162 ChMessageInt_t reply=ChMessageInt_new(acc->getLength());
00163 CcsSendReply(sizeof(reply),(void *)&reply);
00164 }
00165 CmiFree(msg);
00166 }
00167
00168
00169
00170
00171
00172
00173
00174
00175 static CpdListAccessor *CpdListHeader_ccs_list_items(char *msg,
00176 CpdListItemsRequest &h)
00177 {
00178 int msgLen=CmiSize((void *)msg)-CmiReservedHeaderSize;
00179 CpdListAccessor *ret=NULL;
00180 const ChMessageInt_t *req=(const ChMessageInt_t *)(msg+CmiReservedHeaderSize);
00181 h.lo=ChMessageInt(req[0]);
00182 h.hi=ChMessageInt(req[1]);
00183 h.extraLen=ChMessageInt(req[2]);
00184 if (h.extraLen>=0
00185 && ((int)(3*sizeof(ChMessageInt_t)+h.extraLen))<msgLen) {
00186 h.extra=(void *)(req+3);
00187 ret=CpdListLookup((ChMessageInt_t *)(h.extraLen+(char *)h.extra));
00188 if (ret!=NULL) CpdListBoundsCheck(ret,h.lo,h.hi);
00189 }
00190 return ret;
00191 }
00192
00193
00194
00195 static void pupCpd(PUP::er &p, CpdListAccessor *acc, CpdListItemsRequest &req)
00196 {
00197 p.syncComment(PUP::sync_begin_array,"CpdList");
00198 acc->pup(p,req);
00199 p.syncComment(PUP::sync_end_array);
00200 }
00201
00202 static void CpdList_ccs_list_items_txt(char *msg)
00203 {
00204 CpdListItemsRequest req;
00205 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00206 if(acc == NULL) CmiPrintf("ccs-builtins> Null Accessor--bad list name (txt)\n");
00207 if (acc!=NULL) {
00208 int bufLen;
00209 {
00210 PUP::sizerText p; pupCpd(p,acc,req); bufLen=p.size();
00211 }
00212 char *buf=new char[bufLen];
00213 {
00214 PUP::toText p(buf); pupCpd(p,acc,req);
00215 if (p.size()!=bufLen)
00216 CmiError("ERROR! Sizing/packing length mismatch for %s list pup function!\n",
00217 acc->getPath());
00218 }
00219 CcsSendReply(bufLen,(void *)buf);
00220 delete[] buf;
00221 }
00222 CmiFree(msg);
00223 }
00224
00225 static void CpdList_ccs_list_items_set(char *msg)
00226 {
00227 CpdListItemsRequest req;
00228 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00229 if(acc == NULL) CmiPrintf("ccs-builtins> Null Accessor--bad list name (set)\n");
00230 else {
00231 PUP_toNetwork_unpack p(req.extra);
00232 pupCpd(p,acc,req);
00233 if (p.size()!=req.extraLen)
00234 CmiPrintf("Size mismatch during ccs_list_items.set: client sent %d bytes, but %d bytes used!\n",
00235 req.extraLen,p.size());
00236 }
00237 CmiFree(msg);
00238 }
00239
00241 void CpdMachineArchitecture(char *msg) {
00242 char reply[8];
00243 reply[0]=CHARMDEBUG_MAJOR;
00244 reply[1]=CHARMDEBUG_MINOR;
00245
00246 reply[2] = 0;
00247 if (sizeof(char*) == 4) reply[2] = 1;
00248 else if (sizeof(char*) == 8) reply[2] = 2;
00249
00250 reply[3] = 0;
00251 int value = 1;
00252 char firstByte = *((char*)&value);
00253 if (firstByte == 1) reply[3] = 1;
00254 else reply[3] = 2;
00255
00256 #if CMK_BIGSIM_CHARM
00257 reply[3] |= 4;
00258 #endif
00259
00260 reply[4] = sizeof(int);
00261
00262 reply[5] = sizeof(long);
00263 #if CMK_LONG_LONG_DEFINED
00264
00265 reply[6] = sizeof(long long);
00266 #else
00267
00268
00269
00270 reply[6] = 0;
00271 #endif
00272
00273 reply[7] = sizeof(bool);
00274 CcsSendReply(8, (void*)reply);
00275 CmiFree(msg);
00276 }
00277
00278 static void CpdList_ccs_list_items_fmt(char *msg)
00279 {
00280 CpdListItemsRequest req;
00281 CpdListAccessor *acc=CpdListHeader_ccs_list_items(msg,req);
00282 if (acc!=NULL) {
00283 int bufLen;
00284 {
00285 PUP_toNetwork_sizer ps;
00286 PUP_fmt p(ps);
00287 pupCpd(p,acc,req);
00288 bufLen=ps.size();
00289 }
00290 char *buf=new char[bufLen];
00291 {
00292 PUP_toNetwork_pack pp(buf);
00293 PUP_fmt p(pp);
00294 pupCpd(p,acc,req);
00295 if (pp.size()!=bufLen)
00296 CmiError("ERROR! Sizing/packing length mismatch for %s list pup function (%d sizing, %d packing)\n",
00297 acc->getPath(),bufLen,pp.size());
00298 }
00299 CcsSendReply(bufLen,(void *)buf);
00300 delete[] buf;
00301 }
00302 CmiFree(msg);
00303 }
00304
00305
00306
00307
00308
00309 class CpdList_introspect : public CpdListAccessor {
00310 CpdListTable_t *tab;
00311 public:
00312 CpdList_introspect(CpdListTable_t *tab_) :tab(tab_) { }
00313 virtual const char *getPath(void) const { return "converse/lists";}
00314 virtual size_t getLength(void) const {
00315 size_t len=0;
00316 CkHashtableIterator *it=tab->iterator();
00317 while (NULL!=it->next()) len++;
00318 delete it;
00319 return len;
00320 }
00321 virtual void pup(PUP::er &p,CpdListItemsRequest &req) {
00322 CkHashtableIterator *it=tab->iterator();
00323 void *objp;
00324 int curObj=0;
00325 while (NULL!=(objp=it->next())) {
00326 if (curObj>=req.lo && curObj<req.hi) {
00327 CpdListAccessor *acc=*(CpdListAccessor **)objp;
00328 char *pathName=(char *)acc->getPath();
00329 beginItem(p,curObj);
00330 p.comment("name");
00331 p(pathName,strlen(pathName));
00332 }
00333 curObj++;
00334 }
00335 }
00336 };
00337
00338
00339
00340
00341 #endif
00342
00343
00344
00345
00346 void CpdListRegister(CpdListAccessor *acc)
00347 #if CMK_CCS_AVAILABLE
00348 {
00349 CpvAccess(cpdListTable)->put(acc->getPath())=acc;
00350 }
00351 #else
00352 { }
00353 #endif
00354
00355 extern "C" void CpdListRegister_c(const char *path,
00356 CpdListLengthFn_c len,void *lenParam,
00357 CpdListItemsFn_c items,void *itemsParam,int checkBoundary)
00358 #if CMK_CCS_AVAILABLE
00359 {
00360 CpdListRegister(new CpdListAccessor_c(path,
00361 len,lenParam,items,itemsParam,checkBoundary!=0?true:false));
00362 }
00363 #else
00364 { }
00365 #endif
00366
00367 #if CMK_CCS_AVAILABLE
00368
00369
00370
00371
00372 static void CpdListInit(void) {
00373 CpvInitialize(CpdListTable_t *,cpdListTable);
00374 CpvAccess(cpdListTable)=new CpdListTable_t(31,0.5,
00375 CkHashFunction_string,CkHashCompare_string);
00376 CpdListRegister(new CpdList_introspect(CpvAccess(cpdListTable)));
00377
00378 CcsRegisterHandler("ccs_list_len",(CmiHandler)CpdList_ccs_list_len);
00379 CcsRegisterHandler("ccs_list_items.txt",(CmiHandler)CpdList_ccs_list_items_txt);
00380 CcsRegisterHandler("ccs_list_items.fmt",(CmiHandler)CpdList_ccs_list_items_fmt);
00381 CcsRegisterHandler("ccs_list_items.set",(CmiHandler)CpdList_ccs_list_items_set);
00382 CcsRegisterHandler("debug/converse/arch",(CmiHandler)CpdMachineArchitecture);
00383 }
00384
00385 #if CMK_WEB_MODE
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407 #if 0
00408 # define WEBDEBUG(x) CmiPrintf x
00409 #else
00410 # define WEBDEBUG(x)
00411 #endif
00412
00413 #define WEB_INTERVAL 1000
00414 #define MAXFNS 20
00415
00416 typedef struct {
00417 char hdr[CmiReservedHeaderSize];
00418 int fromPE;
00419 int perfData[MAXFNS];
00420 } CWeb_CollectedData;
00421
00422
00423 static int hasApplet=0;
00424 static CcsDelayedReply appletReply;
00425
00426 typedef int (*CWebFunction)(void);
00427 static CWebFunction CWebPerformanceFunctionArray[MAXFNS];
00428 static int CWebNoOfFns;
00429 static int CWeb_ReduceIndex;
00430 static int CWeb_CollectIndex;
00431
00432
00433
00434 static int collectedCount;
00435 static CWeb_CollectedData **collectedValues;
00436
00437 static void CWeb_Deliver(void)
00438 {
00439 int i,j;
00440
00441 if (hasApplet) {
00442 WEBDEBUG(("CWeb_Deliver to applet\n"));
00443
00444 char *reply=(char *)malloc(6+14*CmiNumPes()*CWebNoOfFns);
00445 sprintf(reply,"perf");
00446
00447 for(i=0; i<CmiNumPes(); i++){
00448 for (j=0;j<CWebNoOfFns;j++)
00449 {
00450 char buf[20];
00451 sprintf(buf," %d",collectedValues[i]->perfData[j]);
00452 strcat(reply,buf);
00453 }
00454 }
00455 CcsSendDelayedReply(appletReply,strlen(reply) + 1, reply);
00456 free(reply);
00457 hasApplet=0;
00458 }
00459 else
00460 WEBDEBUG(("CWeb_Deliver (NO APPLET)\n"));
00461
00462
00463 for(i = 0; i < CmiNumPes(); i++){
00464 CmiFree(collectedValues[i]);
00465 collectedValues[i] = 0;
00466 }
00467 collectedCount = 0;
00468 }
00469
00470
00471
00472 static void CWeb_Reduce(void *msg){
00473 CWeb_CollectedData *cur,*prev;
00474 int src;
00475 if(CmiMyPe() != 0){
00476 CmiAbort("CWeb performance data sent to wrong processor...\n");
00477 }
00478 WEBDEBUG(("CWeb_Reduce"));
00479 cur=(CWeb_CollectedData *)msg;
00480 src=cur->fromPE;
00481 prev = collectedValues[src];
00482 collectedValues[src] = cur;
00483 if(prev == 0) collectedCount++;
00484 else CmiFree(prev);
00485
00486 if(collectedCount == CmiNumPes()){
00487 CWeb_Deliver();
00488 }
00489 }
00490
00491
00492
00493
00494 static void CWeb_Collect(void)
00495 {
00496 CWeb_CollectedData *msg;
00497 int i;
00498
00499 WEBDEBUG(("CWeb_Collect on %d\n",CmiMyPe()));
00500 msg = (CWeb_CollectedData *)CmiAlloc(sizeof(CWeb_CollectedData));
00501 msg->fromPE = CmiMyPe();
00502
00503
00504 for(i = 0; i < CWebNoOfFns; i++)
00505 msg->perfData[i] = CWebPerformanceFunctionArray[i] ();
00506
00507
00508 CmiSetHandler(msg, CWeb_ReduceIndex);
00509 CmiSyncSendAndFree(0, sizeof(CWeb_CollectedData), msg);
00510
00511
00512 CcdCallFnAfter((CcdVoidFn)CWeb_Collect, 0, WEB_INTERVAL);
00513 }
00514
00515 extern "C" void CWebPerformanceRegisterFunction(CWebFunction fn)
00516 {
00517 if (CmiMyRank()!=0) return;
00518 if (CWebNoOfFns>=MAXFNS) CmiAbort("Registered too many CWebPerformance functions!");
00519 CWebPerformanceFunctionArray[CWebNoOfFns] = fn;
00520 CWebNoOfFns++;
00521 }
00522
00523
00524
00525
00526 static void CWebHandler(void){
00527 if(CcsIsRemoteRequest()) {
00528 static int startedCollection=0;
00529
00530 WEBDEBUG(("CWebHandler request on %d\n",CmiMyPe()));
00531 hasApplet=1;
00532 appletReply=CcsDelayReply();
00533
00534 if(startedCollection == 0){
00535 WEBDEBUG(("Starting data collection on every processor\n"));
00536 int i;
00537 startedCollection=1;
00538 collectedCount=0;
00539 collectedValues = (CWeb_CollectedData **)malloc(sizeof(void *) * CmiNumPes());
00540 for(i = 0; i < CmiNumPes(); i++)
00541 collectedValues[i] = 0;
00542
00543
00544 for(i = 0; i < CmiNumPes(); i++){
00545 char *msg = (char *)CmiAlloc(CmiReservedHeaderSize);
00546 CmiSetHandler(msg, CWeb_CollectIndex);
00547 CmiSyncSendAndFree(i, CmiReservedHeaderSize,msg);
00548 }
00549 }
00550 }
00551 }
00552
00557 struct CWebModeStats {
00558 public:
00559 double beginTime;
00560 double startTime;
00561 double usedTime;
00562 int PROCESSING;
00563 };
00564 CpvStaticDeclare(CWebModeStats *,cwebStats);
00565
00566
00567
00568 static void usageReset(CWebModeStats *stats,double curWallTime)
00569 {
00570 stats->beginTime=curWallTime;
00571 stats->usedTime = 0.;
00572 }
00573
00574
00575
00576 static void usageStart(CWebModeStats *stats,double curWallTime)
00577 {
00578 stats->startTime = curWallTime;
00579 stats->PROCESSING = 1;
00580 }
00581
00582
00583
00584 static void usageStop(CWebModeStats *stats,double curWallTime)
00585 {
00586 stats->usedTime += curWallTime - stats->startTime;
00587 stats->PROCESSING = 0;
00588 }
00589
00590
00591
00592
00593
00594 static void initUsage()
00595 {
00596 CpvInitialize(CWebModeStats *, cwebStats);
00597 CWebModeStats *stats=new CWebModeStats;
00598 CpvAccess(cwebStats)=stats;
00599 usageReset(stats,CmiWallTimer());
00600 usageStart(stats,CmiWallTimer());
00601 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_BUSY,(CcdVoidFn)usageStart,stats);
00602 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)usageStop,stats);
00603 }
00604
00605 static int getUsage(void)
00606 {
00607 int usage = 0;
00608 double time = CmiWallTimer();
00609 CWebModeStats *stats=CpvAccess(cwebStats);
00610 double totalTime = time - stats->beginTime;
00611
00612 if(stats->PROCESSING)
00613 {
00614 usageStop(stats,time); usageStart(stats,time);
00615 }
00616 if(totalTime > 0.)
00617 usage = (int)(0.5 + 100 *stats->usedTime/totalTime);
00618 usageReset(stats,time);
00619
00620 return usage;
00621 }
00622
00623 static int getSchedQlen(void)
00624 {
00625 return(CqsLength((Queue)CpvAccess(CsdSchedQueue)));
00626 }
00627
00628 #endif
00629
00630 #if ! CMK_WEB_MODE
00631 static void CWeb_Invalid(void)
00632 {
00633 CmiAbort("Invalid web mode handler invoked!\n");
00634 }
00635 #endif
00636
00637 void CWebInit(void)
00638 {
00639 #if CMK_WEB_MODE
00640 CcsRegisterHandler("perf_monitor", (CmiHandler)CWebHandler);
00641
00642 CWeb_CollectIndex=CmiRegisterHandler((CmiHandler)CWeb_Collect);
00643 CWeb_ReduceIndex=CmiRegisterHandler((CmiHandler)CWeb_Reduce);
00644
00645 initUsage();
00646 CWebPerformanceRegisterFunction(getUsage);
00647 CWebPerformanceRegisterFunction(getSchedQlen);
00648 #else
00649
00650
00651 CmiRegisterHandler((CmiHandler)CWeb_Invalid);
00652 CmiRegisterHandler((CmiHandler)CWeb_Invalid);
00653 #endif
00654 }
00655
00656
00657 extern "C" void CcsBuiltinsInit(char **argv)
00658 {
00659 CcsRegisterHandler("ccs_getinfo",(CmiHandler)ccs_getinfo);
00660 CcsRegisterHandler("ccs_killport",(CmiHandler)ccs_killport);
00661 CcsRegisterHandler("ccs_killpe",(CmiHandler)ccs_killpe);
00662 CWebInit();
00663 CpdListInit();
00664 }
00665
00666
00667 #endif
00668
00669 void PUP_fmt::fieldHeader(typeCode_t typeCode,int nItems) {
00670
00671 lengthLen_t ll;
00672 if (nItems==1) ll=lengthLen_single;
00673 else if (nItems<256) ll=lengthLen_byte;
00674 else ll=lengthLen_int;
00675
00676 byte intro=(((int)ll)<<4)+(int)typeCode;
00677 p(intro);
00678
00679 switch(ll) {
00680 case lengthLen_single: break;
00681 case lengthLen_byte: {
00682 byte l=nItems;
00683 p(l);
00684 } break;
00685 case lengthLen_int: {
00686 p(nItems);
00687 } break;
00688 };
00689 }
00690
00691 void PUP_fmt::comment(const char *message) {
00692 int nItems=strlen(message);
00693 fieldHeader(typeCode_comment,nItems);
00694 p((char *)message,nItems);
00695 }
00696 void PUP_fmt::synchronize(unsigned int m) {
00697 fieldHeader(typeCode_sync,1);
00698 p(m);
00699 }
00700 void PUP_fmt::bytes(void *ptr,int n,size_t itemSize,PUP::dataType t) {
00701 switch(t) {
00702 case PUP::Tchar:
00703 case PUP::Tuchar:
00704 case PUP::Tbyte:
00705 fieldHeader(typeCode_byte,n);
00706 p.bytes(ptr,n,itemSize,t);
00707 break;
00708 case PUP::Tshort: case PUP::Tint:
00709 case PUP::Tushort: case PUP::Tuint:
00710 case PUP::Tbool:
00711 fieldHeader(typeCode_int,n);
00712 p.bytes(ptr,n,itemSize,t);
00713 break;
00714
00715 case PUP::Tlong: case PUP::Tlonglong:
00716 case PUP::Tulong: case PUP::Tulonglong:
00717 fieldHeader(typeCode_long,n);
00718 p.bytes(ptr,n,itemSize,t);
00719 break;
00720 case PUP::Tfloat:
00721 fieldHeader(typeCode_float,n);
00722 p.bytes(ptr,n,itemSize,t);
00723 break;
00724 case PUP::Tdouble: case PUP::Tlongdouble:
00725 fieldHeader(typeCode_double,n);
00726 p.bytes(ptr,n,itemSize,t);
00727 break;
00728 case PUP::Tpointer:
00729 fieldHeader(typeCode_pointer,n);
00730 p.bytes(ptr,n,itemSize,t);
00731 break;
00732 default: CmiAbort("Unrecognized type code in PUP_fmt::bytes");
00733 };
00734 }
00735
00736