00001
00002
00003 #if ! defined(_WIN32) || defined(__CYGWIN__)
00004 #include "unistd.h"
00005 #endif
00006
00007 #include "pose.h"
00008 #include "pose.def.h"
00009
00010 CpvDeclare(int, stateRecovery);
00011 CpvDeclare(eventID, theEventID);
00012
00013 void POSEreadCmdLine();
00014 #ifdef POSE_COMM_ON
00015 extern int com_debug;
00016 #endif
00017 double busyWait;
00018 double sim_timer;
00019 int POSE_inactDetect;
00020 int totalNumPosers;
00021 POSE_TimeType POSE_endtime;
00022 POSE_TimeType POSE_GlobalClock;
00023 POSE_TimeType POSE_GlobalTS;
00024 POSE_Config pose_config;
00025 #ifdef POSE_COMM_ON
00026 ComlibInstanceHandle POSE_commlib_insthndl;
00027 #endif
00028 int _POSE_SEQUENTIAL;
00029 int seqCheckpointInProgress;
00030 POSE_TimeType seqLastCheckpointGVT;
00031 double seqLastCheckpointTime;
00032 double seqStartTime;
00033 CkQ<Skipped_Event> POSE_Skipped_Events;
00034 int poseIndexOfStopEvent;
00035
00036 const eventID& GetEventID() {
00037
00038
00039 CpvAccess(theEventID).incEventID();
00040 CkAssert(CpvAccess(theEventID).getPE()>=0);
00041 return(CpvAccess(theEventID));
00042 }
00043
00044
00045 void POSE_init()
00046 {
00047 POSE_init(1, POSE_UnsetTS);
00048 }
00049
00050 void POSE_init(int ET)
00051 {
00052 POSE_init(0, ET);
00053 }
00054
00055 void POSE_init(int IDflag, int ET)
00056 {
00057 CkPrintf("Initializing POSE... \n");
00058 POSEreadCmdLine();
00059 if (pose_config.checkpoint_gvt_interval) {
00060 CkPrintf("POSE checkpointing interval set to %lld GVT ticks\n", pose_config.checkpoint_gvt_interval);
00061 }
00062 if (pose_config.checkpoint_time_interval) {
00063 CkPrintf("POSE checkpointing interval set to %d seconds\n", pose_config.checkpoint_time_interval);
00064 }
00065 if (pose_config.dop) {
00066 CkPrintf("POSE DOP analysis enabled...deleting dop log files...\n");
00067 char fName[32];
00068 for (int i = 0; i < CkNumPes(); i++) {
00069 sprintf(fName, "dop%d.log", i);
00070 unlink(fName);
00071 }
00072 sprintf(fName, "dop_mod.out");
00073 unlink(fName);
00074 sprintf(fName, "dop_sim.out");
00075 unlink(fName);
00076 }
00077 POSE_inactDetect = IDflag;
00078 totalNumPosers = 0;
00079 POSE_endtime = ET;
00080 #ifdef SEQUENTIAL_POSE
00081 _POSE_SEQUENTIAL = 1;
00082 #else
00083 _POSE_SEQUENTIAL = 0;
00084 #endif
00085 #ifndef CMK_OPTIMIZE
00086 traceRegisterUserEvent("Forward Execution", 10);
00087 traceRegisterUserEvent("Cancellation", 20);
00088 traceRegisterUserEvent("Cancel Spawn", 30);
00089 traceRegisterUserEvent("Rollback", 40);
00090 traceRegisterUserEvent("Commit", 50);
00091 traceRegisterUserEvent("OptSync", 60);
00092 #endif
00093 #ifndef SEQUENTIAL_POSE
00094 #ifdef POSE_COMM_ON
00095
00096 POSE_commlib_insthndl = CkGetComlibInstance();
00097
00098 StreamingStrategy *strategy = new StreamingStrategy(COMM_TIMEOUT,COMM_MAXMSG);
00099
00100
00101
00102 POSE_commlib_insthndl.setStrategy(strategy);
00103
00104
00105 CkPrintf("Simulation run with StreamingStrategy(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
00106
00107 #endif
00108
00109 MemPoolID = CProxy_MemoryPool::ckNew();
00110
00111 TempMemID = CProxy_TimePool::ckNew();
00112 #endif
00113
00114 #ifndef CMK_OPTIMIZE
00115 theLocalStats = CProxy_localStat::ckNew();
00116 CProxy_globalStat::ckNew(&theGlobalStats);
00117 #endif
00118 #ifndef SEQUENTIAL_POSE
00119
00120 ThePVT = CProxy_PVT::ckNew();
00121 TheGVT = CProxy_GVT::ckNew();
00122
00123 if(pose_config.lb_on)
00124 {
00125
00126 TheLBG = CProxy_LBgroup::ckNew();
00127 TheLBstrategy = CProxy_LBstrategy::ckNew();
00128 CkPrintf("Load balancing is ON.\n");
00129 }
00130 #endif
00131 CProxy_pose::ckNew(&POSE_Coordinator_ID, 0);
00132
00133 #ifdef POSE_COMM_ON
00134 POSE_Objects_RO = CProxy_sim::ckNew();
00135 POSE_Objects = POSE_Objects_RO;
00136 #else
00137 POSE_Objects = CProxy_sim::ckNew();
00138 #endif
00139
00140
00141
00142
00143
00144
00145
00146 #ifdef SEQUENTIAL_POSE
00147 if (CkNumPes() > 1) CkAbort("ERROR: Cannot run a sequential simulation on more than one processor!\n");
00148 CkPrintf("NOTE: POSE running in sequential simulation mode!\n");
00149 int fnIdx = CkIndex_pose::stop();
00150 CkStartQD(fnIdx, &POSE_Coordinator_ID);
00151 POSE_GlobalClock = 0;
00152 POSE_GlobalTS = 0;
00153 seqCheckpointInProgress = 0;
00154 seqLastCheckpointGVT = 0;
00155 seqLastCheckpointTime = seqStartTime = 0.0;
00156 poseIndexOfStopEvent = -1;
00157 #else
00158
00159
00160
00161
00162 #endif
00163 CkPrintf("POSE initialization complete.\n");
00164 if (POSE_inactDetect) CkPrintf("Using Inactivity Detection for termination.\n");
00165 else
00166 #if USE_LONG_TIMESTAMPS
00167 CkPrintf("Using endTime of %lld for termination.\n", POSE_endtime);
00168 #else
00169 CkPrintf("Using endTime of %d for termination.\n", POSE_endtime);
00170 #endif
00171 sim_timer = CmiWallTimer();
00172 }
00173
00174 void POSE_startTimer() {
00175 CkPrintf("Starting simulation...\n");
00176 sim_timer = CmiWallTimer();
00177 }
00178
00180 void POSE_useID()
00181 {
00182 CkPrintf("WARNING: POSE_useID obsolete. See POSE_init params.\n");
00183 }
00184
00186 void POSE_useET(POSE_TimeType et)
00187 {
00188 CkPrintf("WARNING: POSE_useET obsolete. See POSE_init params.\n");
00189 }
00190
00192 void POSE_registerCallBack(CkCallback cb)
00193 {
00194 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00195 callBack *cbm = new callBack;
00196 cbm->callback = cb;
00197 POSE_Coordinator.registerCallBack(cbm);
00198 }
00199
00201 void POSE_stop()
00202 {
00203 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00204 POSE_Coordinator.stop();
00205 }
00206
00208 void POSE_exit()
00209 {
00210 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00211 POSE_Coordinator.exit();
00212 }
00213
00215 void setPoseIndexOfStopEvent(int index) {
00216 poseIndexOfStopEvent = index;
00217 }
00218
00220 void POSE_prepExit(void *param, void *msg)
00221 {
00222 CkReductionMsg *m = (CkReductionMsg *)msg;
00223 long long *finalBasicStats = ((long long*)m->getData());
00224 CkPrintf("Final basic stats: Commits: %lld Rollbacks: %lld\n", finalBasicStats[0], finalBasicStats[1]);
00225 delete m;
00226 #ifdef SEQUENTIAL_POSE
00227 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00228 POSE_Coordinator.prepExit();
00229 #else
00230 CProxy_GVT g(TheGVT);
00231 g.sumGVTIterationCounts();
00232 #endif
00233 }
00234
00236 void POSE_sumGVTIterations(void *param, void *msg) {
00237 CkReductionMsg *m = (CkReductionMsg *)msg;
00238 CkPrintf("Final basic stats: GVT iterations: %d\n", *((int*)m->getData()));
00239 delete m;
00240 CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00241 POSE_Coordinator.prepExit();
00242 }
00243
00245 void POSE_set_busy_wait(double n) { busyWait = n; }
00246
00248 void POSE_busy_wait()
00249 {
00250 double start = CmiWallTimer();
00251 while (CmiWallTimer() - start < busyWait) ;
00252 }
00253
00255 void POSE_busy_wait(double n)
00256 {
00257 double start = CmiWallTimer();
00258 while (CmiWallTimer() - start < n) ;
00259 }
00260
00262 void pose::registerCallBack(callBack *cbm)
00263 {
00264 callBackSet = 1;
00265 cb = cbm->callback;
00266 }
00267
00269 void pose::stop(void)
00270 {
00271 #ifdef SEQUENTIAL_POSE
00272
00273 if (poseIndexOfStopEvent >= 0) {
00274 POSE_Objects[poseIndexOfStopEvent].invokeStopEvent();
00275 CkStartQD(CkIndex_pose::stop(), &POSE_Coordinator_ID);
00276
00277 } else if (seqCheckpointInProgress) {
00278 POSE_Objects[0].SeqBeginCheckpoint();
00279 } else {
00280 #if USE_LONG_TIMESTAMPS
00281 CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
00282 #else
00283 CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
00284 #endif
00285
00286 POSE_Objects.Terminate();
00287 }
00288 #endif
00289
00290 }
00291
00293 void pose::prepExit(void)
00294 {
00295 #ifndef CMK_OPTIMIZE
00296 if(pose_config.stats)
00297 {
00298 CProxy_localStat stats(theLocalStats);
00299 CkPrintf("%d PE Simulation finished at %f. Gathering stats...\n",
00300 CkNumPes(), CmiWallTimer() - sim_timer);
00301 stats.SendStats();
00302 }
00303 else
00304 {
00305 CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(),
00306 CmiWallTimer() - sim_timer);
00307 POSE_exit();
00308 }
00309 #else
00310 CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(),
00311 CmiWallTimer() - sim_timer);
00312 POSE_exit();
00313 #endif
00314 }
00315
00317 void pose::exit(void)
00318 {
00319 if (callBackSet)
00320 cb.send();
00321 else
00322 CkExit();
00323 }
00324
00325
00326 void _registerseqpose(void)
00327 {
00328 _registerpose();
00329 }
00330
00331 void POSEreadCmdLine()
00332 {
00333 char **argv = CkGetArgv();
00334 CmiArgGroup("Charm++","POSE");
00335 pose_config.stats=CmiGetArgFlagDesc(argv, "+stats_pose",
00336 "Gather timing information and other statistics");
00337
00338
00339
00340
00341
00342
00343 pose_config.trace=CmiGetArgFlagDesc(argv, "+trace_pose",
00344 "Traces key POSE operations like Forward Execution, Rollback, Cancellation, Fossil Collection, etc. via user events for display in projections");
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355 pose_config.dop=CmiGetArgFlagDesc(argv, "+dop_pose",
00356 "Critical path analysis by measuring degree of parallelism");
00357 pose_config.dopSkipCalcs=CmiGetArgFlagDesc(argv, "+dop_pose_skip_calcs",
00358 "Records degree of parallelism logs but doesn't perform end-of-simulation calculations");
00359 if (pose_config.dopSkipCalcs) {
00360 pose_config.dop = true;
00361 }
00362
00363 CmiGetArgIntDesc(argv, "+memman_pose", &pose_config.max_usage , "Coarse memory management: Restricts forward execution of objects with over <max_usage>/<checkpoint store_rate> checkpoints; default to 10");
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380 pose_config.lb_on=CmiGetArgFlagDesc(argv, "+lb_on_pose", "Use load balancing");
00381 CmiGetArgIntDesc(argv, "+lb_skip_pose", &pose_config.lb_skip , "Load balancing skip N; default 51");
00382 CmiGetArgIntDesc(argv, "+lb_threshold_pose", &pose_config.lb_threshold , "Load balancing threshold N; default 4000");
00383 CmiGetArgIntDesc(argv, "+lb_diff_pose", &pose_config.lb_diff , "Load balancing min diff between min and max load PEs; default 2000");
00384 CmiGetArgIntDesc(argv, "+checkpoint_rate_pose", &pose_config.store_rate , "Sets checkpoint to 1 for every <rate> events. Default to 1. ");
00385 CmiGetArgIntDesc(argv, "+checkpoint_gvt_pose", &pose_config.checkpoint_gvt_interval,
00386 "Checkpoint approximately every <gvt #> of GVT ticks; default = 0 = no checkpointing; overrides +checkpoint_time_pose");
00387 if (pose_config.checkpoint_gvt_interval < 0) {
00388 CmiAbort("+checkpoint_gvt_pose value must be >= 0; 0 = no checkpointing\n");
00389 }
00390 CmiGetArgIntDesc(argv, "+checkpoint_time_pose", &pose_config.checkpoint_time_interval,
00391 "Checkpoint approximately every <time> seconds; default = 0 = no checkpointing; overridden by checkpoint_gvt_pose");
00392 if (pose_config.checkpoint_time_interval < 0) {
00393 CmiAbort("+checkpoint_time_pose value must be >= 0; 0 = no checkpointing\n");
00394 }
00395 if ((pose_config.checkpoint_gvt_interval > 0) && (pose_config.checkpoint_time_interval > 0)) {
00396 CmiPrintf("WARNING: checkpoint GVT and time values both set; ignoring time value\n");
00397 pose_config.checkpoint_time_interval = 0;
00398 }
00399
00400 CmiGetArgIntDesc(argv, "+lb_gvt_pose", &pose_config.lb_gvt_interval,
00401 "Load balancing approximately every <gvt #> of GVT ticks; default = 0 = no lb");
00402 if (pose_config.lb_gvt_interval < 0) {
00403 CmiAbort("+lb_gvt_pose value must be >= 0; 0 = no load balancing\n");
00404 }
00405
00406
00407 CmiGetArgIntDesc(argv, "+leash_specwindow_pose", &pose_config.spec_window , "Sets speculative window behavior.");
00408 CmiGetArgIntDesc(argv, "+leash_min_pose", &pose_config.min_leash , "Sets speculative window behavior minimum leash. Default 10.");
00409 CmiGetArgIntDesc(argv, "+leash_max_pose", &pose_config.max_leash , "Sets speculative window behavior maximum leash. Default 100.");
00410 CmiGetArgIntDesc(argv, "+leash_flex_pose", &pose_config.max_leash , "Sets speculative window behavior leash flex. Default 10.");
00411 if(pose_config.deterministic= CmiGetArgFlagDesc(argv, "+deterministic_pose", "sorts events of same timestamp by event id for repeatable behavior "))
00412 {
00413 CkPrintf("WARNING: deterministic_pose: enter at your own risk, though this feature is hopefully not broken anymore\n");
00414 }
00415 }