00001 #if CMK_FAULT_EVAC
00002
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include <string.h>
00006 #include "charm++.h"
00007 #include "ck.h"
00008 #include "ckevacuation.h"
00009
00010 #define DEBUGC(x) //x
00011
00012
00013 int _ckEvacBcastIdx;
00014 int _ckAckEvacIdx;
00015 int numValidProcessors;
00016
00017 double evacTime;
00018
00019 int remainingElements;
00020 int allowMessagesOnly;
00021
00022
00023 double firstRecv;
00024
00025
00026
00027 void _ckEvacBcast(struct evacMsg *msg){
00028 if(msg->remainingElements == -1){
00029 firstRecv = CmiWallTimer();
00030 return;
00031 }
00032 printf("[%d]<%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
00033 fprintf(stderr,"[%d] <%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
00034 CpvAccess(_validProcessors)[msg->pe] = 0;
00035 set_avail_vector(CpvAccess(_validProcessors));
00036 if(msg->pe == CpvAccess(serializer)){
00037 CpvAccess(serializer) = getNextSerializer();
00038 }
00039
00040
00041
00042
00043
00044
00045 int numGroups = CkpvAccess(_groupIDTable)->size();
00046 int i;
00047 CkElementInformHome inform;
00048 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(inform););
00049
00050 if(msg->remainingElements == 0){
00051 struct evacMsg reply;
00052 reply.pe = CkMyPe();
00053
00054 CmiSetHandler(&reply,_ckAckEvacIdx);
00055 CmiSyncSend(msg->pe,sizeof(struct evacMsg),(char *)&reply);
00056 allowMessagesOnly = -1;
00057 }else{
00058 allowMessagesOnly = msg->pe;
00059 }
00060 }
00061
00062
00063
00064
00065
00066
00067
00068
00069 void _ckAckEvac(struct evacMsg *msg){
00070 numValidProcessors--;
00071 if(numValidProcessors == 0){
00072 set_avail_vector(CpvAccess(_validProcessors));
00073 printf("[%d] <%.6f> Reply from all processors took %.6lf s \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
00074
00075
00076 }
00077 }
00078
00079
00080 void CkAnnounceEvac(int remain){
00081
00082 struct evacMsg msg;
00083 msg.pe = CkMyPe();
00084 msg.remainingElements = remain;
00085 CmiSetHandler(&msg,_ckEvacBcastIdx);
00086 CmiSyncBroadcast(sizeof(struct evacMsg),(char *)&msg);
00087 }
00088
00089
00090 void CkStopScheduler(){
00091 if(remainingElements > 0){
00092 return;
00093 }
00094
00095
00096
00097
00098 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00099 for(int i=0;i<numNodeGroups;i++){
00100 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00101 obj->doneEvacuate();
00102 }
00103 int thisPE = CkMyPe();
00104 printf("[%d] Stopping Scheduler \n", thisPE);
00105
00106 CpvAccess(_validProcessors)[thisPE]=0;
00107 }
00108
00109 void CkEmmigrateElement(void *arg){
00110 CkLocRec *rec = (CkLocRec *)arg;
00111 const CkArrayIndex &idx = rec->getIndex();
00112 int targetPE=getNextPE(idx);
00113
00114
00115 rec->AsyncMigrate(true);
00116 rec->migrateMe(targetPE);
00117 CkEvacuatedElement();
00118
00119 }
00120
00121 void CkEvacuatedElement(){
00122 if(!CpvAccess(_validProcessors)[CkMyPe()]){
00123 return;
00124 }
00125 if(!CkpvAccess(startedEvac)){
00126 return;
00127 }
00128 remainingElements=0;
00129
00130
00131
00132 int numGroups = CkpvAccess(_groupIDTable)->size();
00133 int i;
00134 CkElementEvacuate evac;
00135 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
00136
00137 CmiAssert(remainingElements >= 0);
00138 DEBUGC(printf("[%d] remaining elements %d \n",CkMyPe(),remainingElements));
00139 if(remainingElements == 0){
00140 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
00141 CpvAccess(_validProcessors)[CkMyPe()] = 0;
00142 CkAnnounceEvac(0);
00143 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00144 for(int i=0;i<numNodeGroups;i++){
00145 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00146 obj->doneEvacuate();
00147 }
00148 }
00149 }
00150
00151 int evacuate;
00152 void CkClearAllArrayElements();
00153
00154 void CkDecideEvacPe(){
00155 if(evacuate > 0){
00156 return;
00157 }
00158 evacuate = 1;
00159 evacTime = CmiWallTimer();
00160 CkClearAllArrayElements();
00161 }
00162
00163
00164
00165 int numEvacuated;
00166
00167
00168
00169
00170 void CkClearAllArrayElements(){
00171 if(evacuate != 1){
00172 return;
00173 }
00174 evacuate=2;
00175 remainingElements=0;
00176 numEvacuated=0;
00177
00178 printf("[%d] <%.6lf> Start Evacuation \n",CkMyPe(),evacTime);
00179 CkpvAccess(startedEvac)=1;
00180
00181 if(CkMyPe() == CpvAccess(serializer)){
00182 CpvAccess(serializer) = getNextSerializer();
00183 }
00184
00185
00186
00187
00188 int numGroups = CkpvAccess(_groupIDTable)->size();
00189 int i;
00190 CkElementEvacuate evac;
00191 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
00192
00193
00194
00195
00196
00197 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00198 for(i=0;i<numNodeGroups;i++){
00199 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00200 obj->evacuate();
00201 }
00202
00203 DEBUGC(printf("[%d] remaining elements %d number Evacuated %d \n",CkMyPe(),remainingElements,numEvacuated));
00204 numValidProcessors = CkNumValidPes()-1;
00205 CkAnnounceEvac(remainingElements);
00206 if(remainingElements == 0){
00207
00208
00209
00210
00211 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
00212 CpvAccess(_validProcessors)[CkMyPe()] = 0;
00213 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
00214 for(int i=0;i<numNodeGroups;i++){
00215 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
00216 obj->doneEvacuate();
00217 }
00218 }
00219 }
00220
00221 void CkClearAllArrayElementsCPP(){
00222 CkClearAllArrayElements();
00223 }
00224
00225 void CkElementEvacuate::addLocation(CkLocation &loc){
00226 CkLocMgr *locMgr = loc.getManager();
00227 CkLocRec *rec = loc.getLocalRecord();
00228 const CkArrayIndex &i = loc.getIndex();
00229 int targetPE=getNextPE(i);
00230 if(rec->isAsyncEvacuate()){
00231 numEvacuated++;
00232 printf("[%d]<%.6lf> START to emigrate array element \n",CkMyPe(),CmiWallTimer());
00233 rec->AsyncMigrate(true);
00234 locMgr->emigrate(rec,targetPE);
00235 printf("[%d]<%.6lf> emigrated array element \n",CkMyPe(),CmiWallTimer());
00236 }else{
00237
00238
00239
00240
00241 std::vector<CkMigratable *>list;
00242 locMgr->migratableList(rec,list);
00243 DEBUGC(printf("[%d] ArrayElement not ready to Evacuate number of migratable %d \n",CkMyPe(),list.size()));
00244 for(int i=0;i<list.size();i++){
00245 if(list[i]->isAsyncEvacuate()){
00246 DEBUGC(printf("[%d] possible TCharm element decides to migrate \n",CkMyPe()));
00247
00248 rec->AsyncMigrate(true);
00249 locMgr->emigrate(rec,targetPE);
00250 numEvacuated++;
00251 }
00252 }
00253
00254
00255
00256 }
00257 }
00258
00259 void CkElementInformHome::addLocation(CkLocation &loc){
00260 const CkArrayIndex &i = loc.getIndex();
00261 CkLocMgr *locMgr = loc.getManager();
00262 locMgr->informHome(i,CkMyPe());
00263 }
00264
00265
00266
00267
00268
00269
00270
00271 int getNextPE(const CkArrayIndex &i){
00272 if (i.nInts==1) {
00273
00274 int ans= (i.data()[0])%CkNumPes();
00275 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
00276 ans = (ans +1 )%CkNumPes();
00277 }
00278 return ans;
00279 }else{
00280
00281 unsigned int hash=(i.hash()+739)%1280107;
00282 int ans = (hash % CkNumPes());
00283 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
00284 ans = (ans +1 )%CkNumPes();
00285 }
00286 return ans;
00287
00288 }
00289
00290 }
00291
00292
00293
00294
00295
00296 int getNextSerializer(){
00297 int currentSerializer = CpvAccess(serializer);
00298 int nextSerializer = (currentSerializer+1)%CkNumPes();
00299
00300 while(!(CpvAccess(_validProcessors)[nextSerializer])){
00301 nextSerializer = (nextSerializer + 1)%CkNumPes();
00302 if(nextSerializer == currentSerializer){
00303 CkAbort("All processors are invalid ");
00304 }
00305 }
00306 return nextSerializer;
00307 }
00308
00309 int CkNumValidPes(){
00310 #if CMK_BIGSIM_CHARM
00311 return CkNumPes();
00312 #else
00313 int count=0;
00314 for(int i=0;i<CkNumPes();i++){
00315 if(CpvAccess(_validProcessors)[i]){
00316 count++;
00317 }
00318 }
00319 return count;
00320 #endif
00321 }
00322
00323
00324 void processRaiseEvacFile(char *raiseEvacFile){
00325 FILE *fp = fopen(raiseEvacFile,"r");
00326 if(fp == NULL){
00327 printf("Could not open raiseevac file %s. Ignoring raiseevac \n",raiseEvacFile);
00328 return;
00329 }
00330 char line[100];
00331 while(fgets(line,99,fp)!=0){
00332 int pe,faultTime;
00333 sscanf(line,"%d %d",&pe,&faultTime);
00334 if(pe == CkMyPe()){
00335 printf("[%d] Processor to be evacuated after %ds\n",CkMyPe(),faultTime);
00336 CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, faultTime*1000);
00337 }
00338 }
00339 fclose(fp);
00340 }
00341
00342 #endif
00343