00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <string.h>
00004 #include "charm++.h"
00005 #include "ck.h"
00006 #include "ckevacuation.h"
00007
00008
00009 #define DEBUGC(x)
00010
00011
00012
00013
00014
00015
00016
00017
00018 int _ckEvacBcastIdx;
00019 int _ckAckEvacIdx;
00020 int numValidProcessors;
00021
00022 double evacTime;
00023
00024 int remainingElements;
00025 int allowMessagesOnly;
00026
00027
00028 double firstRecv;
00029
00030
00031
00032 void _ckEvacBcast(struct evacMsg *msg){
00033 if(msg->remainingElements == -1){
00034 firstRecv = CmiWallTimer();
00035 return;
00036 }
00037 printf("[%d]<%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
00038 fprintf(stderr,"[%d] <%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
00039 CpvAccess(_validProcessors)[msg->pe] = 0;
00040 set_avail_vector(CpvAccess(_validProcessors));
00041 if(msg->pe == CpvAccess(serializer)){
00042 CpvAccess(serializer) = getNextSerializer();
00043 }
00044
00045
00046
00047
00048
00049
00050 int numGroups = CkpvAccess(_groupIDTable)->size();
00051 int i;
00052 CkElementInformHome inform;
00053 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(inform););
00054
00055 if(msg->remainingElements == 0){
00056 struct evacMsg reply;
00057 reply.pe = CkMyPe();
00058
00059 CmiSetHandler(&reply,_ckAckEvacIdx);
00060 CmiSyncSend(msg->pe,sizeof(struct evacMsg),(char *)&reply);
00061 allowMessagesOnly = -1;
00062 }else{
00063 allowMessagesOnly = msg->pe;
00064 }
00065 }
00066
00067
00068
00069
00070
00071
00072
00073
00074 void _ckAckEvac(struct evacMsg *msg){
00075 numValidProcessors--;
00076 if(numValidProcessors == 0){
00077 set_avail_vector(CpvAccess(_validProcessors));
00078 printf("[%d] <%.6f> Reply from all processors took %.6lf s \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
00079
00080
00081 }
00082 }
00083
00084
00085 void CkAnnounceEvac(int remain){
00086
00087 struct evacMsg msg;
00088 msg.pe = CkMyPe();
00089 msg.remainingElements = remain;
00090 CmiSetHandler(&msg,_ckEvacBcastIdx);
00091 CmiSyncBroadcast(sizeof(struct evacMsg),(char *)&msg);
00092 }
00093
00094
00095 void CkStopScheduler(){
00096 if(remainingElements > 0){
00097 return;
00098 }
00099
00100
00101
00102
00103 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00104 for(int i=0;i<numNodeGroups;i++){
00105 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00106 obj->doneEvacuate();
00107 }
00108 int thisPE = CkMyPe();
00109 printf("[%d] Stopping Scheduler \n", thisPE);
00110
00111 CpvAccess(_validProcessors)[thisPE]=0;
00112 }
00113
00114 void CkEmmigrateElement(void *arg){
00115 CkLocRec_local *rec = (CkLocRec_local *)arg;
00116 CkLocMgr *mgr = rec->getLocMgr();
00117 const CkArrayIndex &idx = rec->getIndex();
00118 int targetPE=getNextPE(idx);
00119
00120
00121 rec->AsyncMigrate(CmiTrue);
00122 mgr->emigrate(rec,targetPE);
00123 CkEvacuatedElement();
00124
00125 }
00126
00127 void CkEvacuatedElement(){
00128 if(!CpvAccess(_validProcessors)[CkMyPe()]){
00129 return;
00130 }
00131 if(!CkpvAccess(startedEvac)){
00132 return;
00133 }
00134 remainingElements=0;
00135
00136
00137
00138 int numGroups = CkpvAccess(_groupIDTable)->size();
00139 int i;
00140 CkElementEvacuate evac;
00141 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
00142
00143 CmiAssert(remainingElements >= 0);
00144 DEBUGC(printf("[%d] remaining elements %d \n",CkMyPe(),remainingElements));
00145 if(remainingElements == 0){
00146 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
00147 CpvAccess(_validProcessors)[CkMyPe()] = 0;
00148 CkAnnounceEvac(0);
00149 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00150 for(int i=0;i<numNodeGroups;i++){
00151 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00152 obj->doneEvacuate();
00153 }
00154 }
00155 }
00156
00157 int evacuate;
00158 extern "C" void CkClearAllArrayElements();
00159
00160 void CkDecideEvacPe(){
00161 if(evacuate > 0){
00162 return;
00163 }
00164 evacuate = 1;
00165 evacTime = CmiWallTimer();
00166 CkClearAllArrayElements();
00167 }
00168
00169
00170
00171 int numEvacuated;
00172
00173
00174
00175
00176 extern "C"
00177 void CkClearAllArrayElements(){
00178 if(evacuate != 1){
00179 return;
00180 }
00181 evacuate=2;
00182 remainingElements=0;
00183 numEvacuated=0;
00184
00185 printf("[%d] <%.6lf> Start Evacuation \n",CkMyPe(),evacTime);
00186 CkpvAccess(startedEvac)=1;
00187
00188 if(CkMyPe() == CpvAccess(serializer)){
00189 CpvAccess(serializer) = getNextSerializer();
00190 }
00191
00192
00193
00194
00195 int numGroups = CkpvAccess(_groupIDTable)->size();
00196 int i;
00197 CkElementEvacuate evac;
00198 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
00199
00200
00201
00202
00203
00204 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00205 for(i=0;i<numNodeGroups;i++){
00206 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00207 obj->evacuate();
00208 }
00209
00210 DEBUGC(printf("[%d] remaining elements %d number Evacuated %d \n",CkMyPe(),remainingElements,numEvacuated));
00211 numValidProcessors = CkNumValidPes()-1;
00212 CkAnnounceEvac(remainingElements);
00213 if(remainingElements == 0){
00214
00215
00216
00217
00218 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
00219 CpvAccess(_validProcessors)[CkMyPe()] = 0;
00220 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00221 for(int i=0;i<numNodeGroups;i++){
00222 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00223 obj->doneEvacuate();
00224 }
00225 }
00226 }
00227
00228 void CkClearAllArrayElementsCPP(){
00229 CkClearAllArrayElements();
00230 }
00231
00232 void CkElementEvacuate::addLocation(CkLocation &loc){
00233 CkLocMgr *locMgr = loc.getManager();
00234 CkLocRec_local *rec = loc.getLocalRecord();
00235 const CkArrayIndex &i = loc.getIndex();
00236 int targetPE=getNextPE(i);
00237 if(rec->isAsyncEvacuate()){
00238 numEvacuated++;
00239 printf("[%d]<%.6lf> START to emigrate array element \n",CkMyPe(),CmiWallTimer());
00240 rec->AsyncMigrate(CmiTrue);
00241 locMgr->emigrate(rec,targetPE);
00242 printf("[%d]<%.6lf> emigrated array element \n",CkMyPe(),CmiWallTimer());
00243 }else{
00244
00245
00246
00247
00248 CkVec<CkMigratable *>list;
00249 locMgr->migratableList(rec,list);
00250 DEBUGC(printf("[%d] ArrayElement not ready to Evacuate number of migratable %d \n",CkMyPe(),list.size()));
00251 for(int i=0;i<list.size();i++){
00252 if(list[i]->isAsyncEvacuate()){
00253 DEBUGC(printf("[%d] possible TCharm element decides to migrate \n",CkMyPe()));
00254
00255 rec->AsyncMigrate(CmiTrue);
00256 locMgr->emigrate(rec,targetPE);
00257 numEvacuated++;
00258 }
00259 }
00260
00261
00262
00263 }
00264 }
00265
00266 void CkElementInformHome::addLocation(CkLocation &loc){
00267 const CkArrayIndex &i = loc.getIndex();
00268 CkLocMgr *locMgr = loc.getManager();
00269 locMgr->informHome(i,CkMyPe());
00270 }
00271
00272
00273
00274
00275
00276
00277
00278 int getNextPE(const CkArrayIndex &i){
00279 if (i.nInts==1) {
00280
00281 int ans= (i.data()[0])%CkNumPes();
00282 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
00283 ans = (ans +1 )%CkNumPes();
00284 }
00285 return ans;
00286 }else{
00287
00288 unsigned int hash=(i.hash()+739)%1280107;
00289 int ans = (hash % CkNumPes());
00290 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
00291 ans = (ans +1 )%CkNumPes();
00292 }
00293 return ans;
00294
00295 }
00296
00297 }
00298
00299
00300
00301
00302
00303 int getNextSerializer(){
00304 int currentSerializer = CpvAccess(serializer);
00305 int nextSerializer = (currentSerializer+1)%CkNumPes();
00306
00307 while(!(CpvAccess(_validProcessors)[nextSerializer])){
00308 nextSerializer = (nextSerializer + 1)%CkNumPes();
00309 if(nextSerializer == currentSerializer){
00310 CkAbort("All processors are invalid ");
00311 }
00312 }
00313 return nextSerializer;
00314 }
00315
00316 int CkNumValidPes(){
00317 #if CMK_BIGSIM_CHARM
00318 return CkNumPes();
00319 #else
00320 int count=0;
00321 for(int i=0;i<CkNumPes();i++){
00322 if(CpvAccess(_validProcessors)[i]){
00323 count++;
00324 }
00325 }
00326 return count;
00327 #endif
00328 }
00329
00330
00331 void processRaiseEvacFile(char *raiseEvacFile){
00332 FILE *fp = fopen(raiseEvacFile,"r");
00333 if(fp == NULL){
00334 printf("Could not open raiseevac file %s. Ignoring raiseevac \n",raiseEvacFile);
00335 return;
00336 }
00337 char line[100];
00338 while(fgets(line,99,fp)!=0){
00339 int pe,faultTime;
00340 sscanf(line,"%d %d",&pe,&faultTime);
00341 if(pe == CkMyPe()){
00342 printf("[%d] Processor to be evacuated after %ds\n",CkMyPe(),faultTime);
00343 CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, faultTime*1000);
00344 }
00345 }
00346 fclose(fp);
00347 }