00001
00002
00003 #if ! defined(_WIN32)
00004 #include "unistd.h"
00005 #endif
00006
00007 #include "pose.h"
00008 #include "pose.def.h"
00009
00010 CpvDeclare(int, stateRecovery);
00011 CpvDeclare(eventID, theEventID);
00012
00013 void POSEreadCmdLine();
00014 double busyWait;
00015 double sim_timer;
00016 int POSE_inactDetect;
00017 int totalNumPosers;
00018 POSE_TimeType POSE_endtime;
00019 POSE_TimeType POSE_GlobalClock;
00020 POSE_TimeType POSE_GlobalTS;
00021 POSE_Config pose_config;
00022 int _POSE_SEQUENTIAL;
00023 int seqCheckpointInProgress;
00024 POSE_TimeType seqLastCheckpointGVT;
00025 double seqLastCheckpointTime;
00026 double seqStartTime;
00027 CkQ<Skipped_Event> POSE_Skipped_Events;
00028 int poseIndexOfStopEvent;
00029
00030 const eventID& GetEventID() {
00031
00032
00033 CpvAccess(theEventID).incEventID();
00034 CkAssert(CpvAccess(theEventID).getPE()>=0);
00035 return(CpvAccess(theEventID));
00036 }
00037
00038
00039 void POSE_init()
00040 {
00041 POSE_init(1, POSE_UnsetTS);
00042 }
00043
00044 void POSE_init(int ET)
00045 {
00046 POSE_init(0, ET);
00047 }
00048
00049 void POSE_init(int IDflag, int ET)
00050 {
00051 CkPrintf("Initializing POSE... \n");
00052 POSEreadCmdLine();
00053 if (pose_config.checkpoint_gvt_interval) {
00054 CkPrintf("POSE checkpointing interval set to %lld GVT ticks\n", pose_config.checkpoint_gvt_interval);
00055 }
00056 if (pose_config.checkpoint_time_interval) {
00057 CkPrintf("POSE checkpointing interval set to %d seconds\n", pose_config.checkpoint_time_interval);
00058 }
00059 if (pose_config.dop) {
00060 CkPrintf("POSE DOP analysis enabled...deleting dop log files...\n");
00061 char fName[32];
00062 for (int i = 0; i < CkNumPes(); i++) {
00063 sprintf(fName, "dop%d.log", i);
00064 unlink(fName);
00065 }
00066 sprintf(fName, "dop_mod.out");
00067 unlink(fName);
00068 sprintf(fName, "dop_sim.out");
00069 unlink(fName);
00070 }
00071 POSE_inactDetect = IDflag;
00072 totalNumPosers = 0;
00073 POSE_endtime = ET;
00074 #ifdef SEQUENTIAL_POSE
00075 _POSE_SEQUENTIAL = 1;
00076 #else
00077 _POSE_SEQUENTIAL = 0;
00078 #endif
00079 #if !CMK_TRACE_DISABLED
00080 traceRegisterUserEvent("Forward Execution", 10);
00081 traceRegisterUserEvent("Cancellation", 20);
00082 traceRegisterUserEvent("Cancel Spawn", 30);
00083 traceRegisterUserEvent("Rollback", 40);
00084 traceRegisterUserEvent("Commit", 50);
00085 traceRegisterUserEvent("OptSync", 60);
00086 #endif
00087 #ifndef SEQUENTIAL_POSE
00088
00089 MemPoolID = CProxy_MemoryPool::ckNew();
00090
00091 TempMemID = CProxy_TimePool::ckNew();
00092 #endif
00093
00094 #if !CMK_TRACE_DISABLED
00095 theLocalStats = CProxy_localStat::ckNew();
00096 CProxy_globalStat::ckNew(&theGlobalStats);
00097 #endif
00098 #ifndef SEQUENTIAL_POSE
00099
00100 ThePVT = CProxy_PVT::ckNew();
00101 TheGVT = CProxy_GVT::ckNew();
00102
00103 if(pose_config.lb_on)
00104 {
00105
00106 TheLBG = CProxy_LBgroup::ckNew();
00107 TheLBstrategy = CProxy_LBstrategy::ckNew();
00108 CkPrintf("Load balancing is ON.\n");
00109 }
00110 #endif
00111 CProxy_pose::ckNew(&POSE_Coordinator_ID, 0);
00112
00113 POSE_Objects = CProxy_sim::ckNew();
00114
00115 #ifdef SEQUENTIAL_POSE
00116 if (CkNumPes() > 1) CkAbort("ERROR: Cannot run a sequential simulation on more than one processor!\n");
00117 CkPrintf("NOTE: POSE running in sequential simulation mode!\n");
00118 int fnIdx = CkIndex_pose::stop();
00119 CkStartQD(fnIdx, &POSE_Coordinator_ID);
00120 POSE_GlobalClock = 0;
00121 POSE_GlobalTS = 0;
00122 seqCheckpointInProgress = 0;
00123 seqLastCheckpointGVT = 0;
00124 seqLastCheckpointTime = seqStartTime = 0.0;
00125 poseIndexOfStopEvent = -1;
00126 #else
00127
00128
00129
00130
00131 #endif
00132 CkPrintf("POSE initialization complete.\n");
00133 if (POSE_inactDetect) CkPrintf("Using Inactivity Detection for termination.\n");
00134 else
00135 #if USE_LONG_TIMESTAMPS
00136 CkPrintf("Using endTime of %lld for termination.\n", POSE_endtime);
00137 #else
00138 CkPrintf("Using endTime of %d for termination.\n", POSE_endtime);
00139 #endif
00140 sim_timer = CmiWallTimer();
00141 }
00142
00143 void POSE_startTimer() {
00144 CkPrintf("Starting simulation...\n");
00145 sim_timer = CmiWallTimer();
00146 }
00147
00149 void POSE_useID()
00150 {
00151 CkPrintf("WARNING: POSE_useID obsolete. See POSE_init params.\n");
00152 }
00153
00155 void POSE_useET(POSE_TimeType et)
00156 {
00157 CkPrintf("WARNING: POSE_useET obsolete. See POSE_init params.\n");
00158 }
00159
00161 void POSE_registerCallBack(CkCallback cb)
00162 {
00163 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00164 callBack *cbm = new callBack;
00165 cbm->callback = cb;
00166 POSE_Coordinator.registerCallBack(cbm);
00167 }
00168
00170 void POSE_stop()
00171 {
00172 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00173 POSE_Coordinator.stop();
00174 }
00175
00177 void POSE_exit()
00178 {
00179 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00180 POSE_Coordinator.exit();
00181 }
00182
00184 void setPoseIndexOfStopEvent(int index) {
00185 poseIndexOfStopEvent = index;
00186 }
00187
00189 void POSE_prepExit(void *param, void *msg)
00190 {
00191 CkReductionMsg *m = (CkReductionMsg *)msg;
00192 long long *finalBasicStats = ((long long*)m->getData());
00193 CkPrintf("Final basic stats: Commits: %lld Rollbacks: %lld\n", finalBasicStats[0], finalBasicStats[1]);
00194 delete m;
00195 #ifdef SEQUENTIAL_POSE
00196 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00197 POSE_Coordinator.prepExit();
00198 #else
00199 CProxy_GVT g(TheGVT);
00200 g.sumGVTIterationCounts();
00201 #endif
00202 }
00203
00205 void POSE_sumGVTIterations(void *param, void *msg) {
00206 CkReductionMsg *m = (CkReductionMsg *)msg;
00207 CkPrintf("Final basic stats: GVT iterations: %d\n", *((int*)m->getData()));
00208 delete m;
00209 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00210 POSE_Coordinator.prepExit();
00211 }
00212
00214 void POSE_set_busy_wait(double n) { busyWait = n; }
00215
00217 void POSE_busy_wait()
00218 {
00219 double start = CmiWallTimer();
00220 while (CmiWallTimer() - start < busyWait) ;
00221 }
00222
00224 void POSE_busy_wait(double n)
00225 {
00226 double start = CmiWallTimer();
00227 while (CmiWallTimer() - start < n) ;
00228 }
00229
00231 void pose::registerCallBack(callBack *cbm)
00232 {
00233 callBackSet = 1;
00234 cb = cbm->callback;
00235 }
00236
00238 void pose::stop(void)
00239 {
00240 #ifdef SEQUENTIAL_POSE
00241
00242 if (poseIndexOfStopEvent >= 0) {
00243 POSE_Objects[poseIndexOfStopEvent].invokeStopEvent();
00244 CkStartQD(CkIndex_pose::stop(), &POSE_Coordinator_ID);
00245
00246 } else if (seqCheckpointInProgress) {
00247 POSE_Objects[0].SeqBeginCheckpoint();
00248 } else {
00249 #if USE_LONG_TIMESTAMPS
00250 CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
00251 #else
00252 CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
00253 #endif
00254
00255 POSE_Objects.Terminate();
00256 }
00257 #endif
00258
00259 }
00260
00262 void pose::prepExit(void)
00263 {
00264 #if !CMK_TRACE_DISABLED
00265 if(pose_config.stats)
00266 {
00267 CProxy_localStat stats(theLocalStats);
00268 CkPrintf("%d PE Simulation finished at %f. Gathering stats...\n",
00269 CkNumPes(), CmiWallTimer() - sim_timer);
00270 stats.SendStats();
00271 }
00272 else
00273 {
00274 CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(),
00275 CmiWallTimer() - sim_timer);
00276 POSE_exit();
00277 }
00278 #else
00279 CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(),
00280 CmiWallTimer() - sim_timer);
00281 POSE_exit();
00282 #endif
00283 }
00284
00286 void pose::exit(void)
00287 {
00288 if (callBackSet)
00289 cb.send();
00290 else
00291 CkExit();
00292 }
00293
00294
00295 void _registerseqpose(void)
00296 {
00297 _registerpose();
00298 }
00299
00300 void POSEreadCmdLine()
00301 {
00302 char **argv = CkGetArgv();
00303 CmiArgGroup("Charm++","POSE");
00304 pose_config.stats=CmiGetArgFlagDesc(argv, "+stats_pose",
00305 "Gather timing information and other statistics");
00306
00307
00308
00309
00310
00311
00312 pose_config.trace=CmiGetArgFlagDesc(argv, "+trace_pose",
00313 "Traces key POSE operations like Forward Execution, Rollback, Cancellation, Fossil Collection, etc. via user events for display in projections");
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324 pose_config.dop=CmiGetArgFlagDesc(argv, "+dop_pose",
00325 "Critical path analysis by measuring degree of parallelism");
00326 pose_config.dopSkipCalcs=CmiGetArgFlagDesc(argv, "+dop_pose_skip_calcs",
00327 "Records degree of parallelism logs but doesn't perform end-of-simulation calculations");
00328 if (pose_config.dopSkipCalcs) {
00329 pose_config.dop = true;
00330 }
00331
00332 CmiGetArgIntDesc(argv, "+memman_pose", &pose_config.max_usage , "Coarse memory management: Restricts forward execution of objects with over <max_usage>/<checkpoint store_rate> checkpoints; default to 10");
00333
00334
00335
00336
00337
00338
00339 pose_config.lb_on=CmiGetArgFlagDesc(argv, "+lb_on_pose", "Use load balancing");
00340 CmiGetArgIntDesc(argv, "+lb_skip_pose", &pose_config.lb_skip , "Load balancing skip N; default 51");
00341 CmiGetArgIntDesc(argv, "+lb_threshold_pose", &pose_config.lb_threshold , "Load balancing threshold N; default 4000");
00342 CmiGetArgIntDesc(argv, "+lb_diff_pose", &pose_config.lb_diff , "Load balancing min diff between min and max load PEs; default 2000");
00343 CmiGetArgIntDesc(argv, "+checkpoint_rate_pose", &pose_config.store_rate , "Sets checkpoint to 1 for every <rate> events. Default to 1. ");
00344 CmiGetArgIntDesc(argv, "+checkpoint_gvt_pose", &pose_config.checkpoint_gvt_interval,
00345 "Checkpoint approximately every <gvt #> of GVT ticks; default = 0 = no checkpointing; overrides +checkpoint_time_pose");
00346 if (pose_config.checkpoint_gvt_interval < 0) {
00347 CmiAbort("+checkpoint_gvt_pose value must be >= 0; 0 = no checkpointing\n");
00348 }
00349 CmiGetArgIntDesc(argv, "+checkpoint_time_pose", &pose_config.checkpoint_time_interval,
00350 "Checkpoint approximately every <time> seconds; default = 0 = no checkpointing; overridden by checkpoint_gvt_pose");
00351 if (pose_config.checkpoint_time_interval < 0) {
00352 CmiAbort("+checkpoint_time_pose value must be >= 0; 0 = no checkpointing\n");
00353 }
00354 if ((pose_config.checkpoint_gvt_interval > 0) && (pose_config.checkpoint_time_interval > 0)) {
00355 CmiPrintf("WARNING: checkpoint GVT and time values both set; ignoring time value\n");
00356 pose_config.checkpoint_time_interval = 0;
00357 }
00358
00359 CmiGetArgIntDesc(argv, "+lb_gvt_pose", &pose_config.lb_gvt_interval,
00360 "Load balancing approximately every <gvt #> of GVT ticks; default = 0 = no lb");
00361 if (pose_config.lb_gvt_interval < 0) {
00362 CmiAbort("+lb_gvt_pose value must be >= 0; 0 = no load balancing\n");
00363 }
00364
00365
00366 CmiGetArgIntDesc(argv, "+leash_specwindow_pose", &pose_config.spec_window , "Sets speculative window behavior.");
00367 CmiGetArgIntDesc(argv, "+leash_min_pose", &pose_config.min_leash , "Sets speculative window behavior minimum leash. Default 10.");
00368 CmiGetArgIntDesc(argv, "+leash_max_pose", &pose_config.max_leash , "Sets speculative window behavior maximum leash. Default 100.");
00369 CmiGetArgIntDesc(argv, "+leash_flex_pose", &pose_config.max_leash , "Sets speculative window behavior leash flex. Default 10.");
00370 if ((pose_config.deterministic= CmiGetArgFlagDesc(argv, "+deterministic_pose", "sorts events of same timestamp by event id for repeatable behavior ")))
00371 {
00372 CkPrintf("WARNING: deterministic_pose: enter at your own risk, though this feature is hopefully not broken anymore\n");
00373 }
00374 }