00001
00005
00006
00007 #ifndef _TRACE_UTILIZATION_H
00008 #define _TRACE_UTILIZATION_H
00009
00010 #include <stdio.h>
00011 #include <errno.h>
00012 #include <deque>
00013
00014 #include "charm++.h"
00015
00016
00017 #include "trace.h"
00018 #include "envelope.h"
00019 #include "register.h"
00020 #include "trace-common.h"
00021 #include "ckcallback-ccs.h"
00022
00023 #include "TraceUtilization.decl.h"
00024
00025 #define INVALIDEP -2
00026 #define TRACEON_EP -3
00027 #define NUM_DUMMY_EPS 9
00028
00029
00030
00031 #define BIN_PER_SEC 1000
00032 #define BIN_SIZE 0.001
00033 #define NUM_BINS 32768
00034
00035
00037 #define numBins_T int
00038 #define numProcs_T int
00039 #define entriesInBin_T short
00040 #define ep_T short
00041 #define utilization_T unsigned char
00042 #define other_EP 10000
00043
00044
00045 extern CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
00046
00047
00048
00049 void collectUtilizationData(void *, double);
00050
00051
00052
00054 class TraceUtilizationInit : public Chare {
00055 public:
00056 TraceUtilizationInit(CkArgMsg *m) {
00057 delete m;
00058 CkPrintf("[%d] TraceUtilizationInit creating traceUtilizationGroupProxy");
00059 fflush(stdout);
00060
00061 traceUtilizationGroupProxy = CProxy_TraceUtilizationBOC::ckNew();
00062
00063 CkPrintf("Trace Summary now listening in for CCS Client\n");
00064 CcsRegisterHandler("CkPerfSumDetail compressed", CkCallback(CkIndex_TraceUtilizationBOC::ccsRequestSumDetailCompressed(NULL), traceUtilizationGroupProxy[0]));
00065
00066 CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
00067 CcdCallOnConditionKeep(CcdPERIODIC_1second, collectUtilizationData, (void *)NULL);
00068
00069 }
00070 };
00071
00072
00073
00074
00075
00076
00084 class compressedBuffer {
00085 public:
00086 char* buf;
00087 int pos;
00088
00089 compressedBuffer(){
00090 buf = NULL;
00091 pos = 0;
00092 }
00093
00094 compressedBuffer(int bytes){
00095 buf = (char*)malloc(bytes);
00096 pos = 0;
00097 }
00098
00099 compressedBuffer(void *buffer){
00100 buf = (char*)buffer;
00101 pos = 0;
00102 }
00103
00104 void init(void *buffer){
00105 buf = (char*)buffer;
00106 pos = 0;
00107 }
00108
00109 inline void * currentPtr(){
00110 return (void*)(buf+pos);
00111 }
00112
00113 template <typename T>
00114 T read(int offset){
00115
00116 T v;
00117 memcpy(&v, buf+offset, sizeof(T));
00118 return v;
00119 }
00120
00121 template <typename T>
00122 void write(T v, int offset){
00123 T v2 = v;
00124
00125 memcpy(buf+offset, &v2, sizeof(T));
00126 }
00127
00128 template <typename T>
00129 void increment(int offset){
00130 T temp;
00131 temp = read<T>(offset);
00132 temp ++;
00133 write<T>(temp, offset);
00134 }
00135
00136 template <typename T>
00137 void accumulate(T v, int offset){
00138 T temp;
00139 temp = read<T>(offset);
00140 temp += v;
00141 write<T>(temp, offset);
00142 }
00143
00144 template <typename T>
00145 int push(T v){
00146 int oldpos = pos;
00147 write<T>(v, pos);
00148 pos += sizeof(T);
00149 return oldpos;
00150 }
00151
00152 template <typename T>
00153 T pop(){
00154 T temp = read<T>(pos);
00155 pos += sizeof(T);
00156 return temp;
00157 }
00158
00159 template <typename T>
00160 T peek(){
00161 T temp = read<T>(pos);
00162 return temp;
00163 }
00164
00165 template <typename T0, typename T>
00166 T peekSecond(){
00167 T temp;
00168 memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
00169 return temp;
00170 }
00171
00172 int datalength(){
00173 return pos;
00174 }
00175
00176 void * buffer(){
00177 return (void*) buf;
00178 }
00179
00180 void freeBuf(){
00181 free(buf);
00182 }
00183
00184 ~compressedBuffer(){
00185
00186 }
00187
00188 };
00189
00190
00191
00192 compressedBuffer compressAvailableNewSumDetail(int max=10000);
00193 void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuffer);
00194
00195 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs);
00196 void printCompressedBuf(compressedBuffer b);
00197 compressedBuffer fakeCompressedMessage();
00198 compressedBuffer emptyCompressedBuffer();
00199 void sanityCheckCompressedBuf(compressedBuffer b);
00200 bool isCompressedBufferSane(compressedBuffer b);
00201 double averageUtilizationInBuffer(compressedBuffer b);
00202
00203
00204
00205
00206
00207 class TraceUtilization : public Trace {
00208 public:
00209 int execEp;
00210 unsigned int epInfoSize;
00211 double start;
00212
00213 double *cpuTime;
00214 int lastBinUsed;
00215 unsigned int numBinsSent;
00216 unsigned int previouslySentBins;
00217
00218
00219 TraceUtilization() {
00220 execEp = TRACEON_EP;
00221 cpuTime = NULL;
00222 lastBinUsed = -1;
00223 numBinsSent = 0;
00224 }
00225
00226
00228 void initMem(){
00229 int _numEntries=_entryTable.size();
00230 epInfoSize = _numEntries + NUM_DUMMY_EPS + 1;
00231
00232 cpuTime = new double[NUM_BINS*epInfoSize];
00233 _MEMCHECK(cpuTime);
00234
00235 if(CkMyPe() == 0)
00236 writeSts();
00237
00238 }
00239
00240 void writeSts(void);
00241
00242 void creation(envelope *e, int epIdx, int num=1) {}
00243
00244 void beginExecute(envelope *e, void *obj);
00245 void beginExecute(CmiObjId *tid);
00246 void beginExecute(int event,int msgType,int ep,int srcPe, int mlen=0, CmiObjId *idx=NULL, void *obj=NULL);
00247 void endExecute(void);
00248 void beginIdle(double currT) {}
00249 void endIdle(double currT) {}
00250 void beginPack(void){}
00251 void endPack(void) {}
00252 void beginUnpack(void) {}
00253 void endUnpack(void) {}
00254 void beginComputation(void) { initMem(); }
00255 void endComputation(void) {}
00256 void traceClearEps() {}
00257 void traceWriteSts() {}
00258 void traceClose() {}
00259
00260 void addEventType(int eventType);
00261
00262 int cpuTimeEntriesAvailable() const { return lastBinUsed+1; }
00263 int cpuTimeEntriesSentSoFar() const { return numBinsSent; }
00264 void incrementNumCpuTimeEntriesSent(int n) { numBinsSent += n; }
00265
00266
00267 double sumUtilization(int startBin, int endBin);
00268
00269
00270 void updateCpuTime(int epIdx, double startTime, double endTime){
00271
00272
00273
00274 if (epIdx >= epInfoSize) {
00275 CkPrintf("WARNING: epIdx=%d >= epInfoSize=%d\n", (int)epIdx, (int)epInfoSize );
00276 return;
00277 }
00278
00279 int startingBinIdx = (int)(startTime/BIN_SIZE);
00280 int endingBinIdx = (int)(endTime/BIN_SIZE);
00281
00282 if (startingBinIdx == endingBinIdx) {
00283 addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
00284 } else if (startingBinIdx < endingBinIdx) {
00285 addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*BIN_SIZE - startTime);
00286 while(++startingBinIdx < endingBinIdx)
00287 addToCPUtime(startingBinIdx, epIdx, BIN_SIZE);
00288 addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*BIN_SIZE);
00289 }
00290 }
00291
00292
00294 inline void zeroIfNecessary(unsigned int interval){
00295 for(unsigned int i=lastBinUsed+1; i<= interval; i++){
00296
00297 for(unsigned int j=0;j<epInfoSize;j++){
00298 cpuTime[(i%NUM_BINS)*epInfoSize+j] = 0.0;
00299 }
00300 }
00301 lastBinUsed = interval;
00302 }
00303
00304 UInt getEpInfoSize() { return epInfoSize; }
00305
00307 inline double getCPUtime(unsigned int interval, unsigned int ep) {
00308 CkAssert(ep < epInfoSize);
00309 if(cpuTime != NULL && interval <= lastBinUsed)
00310 return cpuTime[(interval%NUM_BINS)*epInfoSize+ep];
00311 else {
00312 CkPrintf("getCPUtime called with invalid options: cpuTime=%p lastBinUsed=%d interval=%d ep=%d\n", cpuTime, lastBinUsed, (int)interval, (int)ep);
00313 return 0.0;
00314 }
00315 }
00316
00317 inline void addToCPUtime(unsigned int interval, unsigned int ep, double val){
00318
00319 zeroIfNecessary(interval);
00320
00321 cpuTime[(interval%NUM_BINS)*epInfoSize+ep] += val;
00322 }
00323
00324
00325 inline double getUtilization(int interval, int ep){
00326 return getCPUtime(interval, ep) * 100.0 * (double)BIN_PER_SEC;
00327 }
00328
00329
00330 compressedBuffer compressNRecentSumDetail(int desiredBinsToSend);
00331
00332
00333 };
00334
00335
00336
00337
00338
00339
00340 class TraceUtilizationBOC : public CBase_TraceUtilizationBOC {
00341
00342 std::deque<CkReductionMsg *> storedSumDetailResults;
00343
00344 public:
00345 TraceUtilizationBOC() {}
00346 TraceUtilizationBOC(CkMigrateMessage* msg) {}
00347 ~TraceUtilizationBOC() {}
00348
00349
00351 void ccsRequestSumDetailCompressed(CkCcsRequestMsg *m);
00352 void collectSumDetailData();
00353 void sumDetailDataCollected(CkReductionMsg *);
00354
00355 };
00356
00357
00358
00359
00360
00361 #endif
00362