libs/ck-libs/pose/pose.C

Go to the documentation of this file.
00001 
00002 #include "pose.h"
00003 #include "pose.def.h"
00004 
00005 CpvDeclare(int, stateRecovery);
00006 CpvDeclare(eventID, theEventID);
00007 
00008 void POSEreadCmdLine();
00009 #ifdef POSE_COMM_ON
00010 extern int comm_debug;
00011 #endif
00012 double busyWait;
00013 double sim_timer;
00014 int POSE_inactDetect;
00015 POSE_TimeType POSE_endtime;
00016 POSE_TimeType POSE_GlobalClock;
00017 POSE_TimeType POSE_GlobalTS;
00018 POSE_Config pose_config;
00019 ComlibInstanceHandle POSE_commlib_insthndl;
00020 
00021 
00022 const eventID& GetEventID() {
00023   //CpvStaticDeclare(eventID, theEventID);  // initializes to [0.pe]
00024   //  for each pe called on
00025   CpvAccess(theEventID).incEventID();
00026   CkAssert(CpvAccess(theEventID).getPE()>=0);
00027   return(CpvAccess(theEventID));
00028  }
00029 
00030 // Main initialization for all of POSE
00031 void POSE_init() // use inactivity detection by default
00032 {
00033   POSE_init(1, POSE_UnsetTS);
00034 }
00035 
00036 void POSE_init(int ET) // a single parameter specifies endtime
00037 {
00038   POSE_init(0, ET);
00039 }
00040 
00041 void POSE_init(int IDflag, int ET) // can specify both
00042 {
00043   CkPrintf("Initializing POSE...  \n");
00044   POSEreadCmdLine();
00045   POSE_inactDetect = IDflag;
00046   POSE_endtime = ET;
00047 #ifndef CMK_OPTIMIZE
00048   traceRegisterUserEvent("Forward Execution", 10);
00049   traceRegisterUserEvent("Cancellation", 20);
00050   traceRegisterUserEvent("Cancel Spawn", 30);
00051   traceRegisterUserEvent("Rollback", 40);
00052   traceRegisterUserEvent("Commit", 50);
00053   traceRegisterUserEvent("OptSync", 60);
00054 #endif
00055 #ifndef SEQUENTIAL_POSE
00056 #ifdef POSE_COMM_ON
00057   // Create the communication library for POSE
00058   POSE_commlib_insthndl = CkGetComlibInstance();
00059   // Create the communication strategy for POSE
00060   StreamingStrategy *strategy = new StreamingStrategy(COMM_TIMEOUT,COMM_MAXMSG);
00061   //MeshStreamingStrategy *strategy = new MeshStreamingStrategy(COMM_TIMEOUT,COMM_MAXMSG);
00062   //PrioStreaming *strategy = new PrioStreaming(COMM_TIMEOUT,COMM_MAXMSG);
00063   //Register the strategy
00064   POSE_commlib_insthndl.setStrategy(strategy);
00065   //comm_debug=1;
00066   //CkPrintf("Simulation run with PrioStreaming(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
00067   CkPrintf("Simulation run with StreamingStrategy(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
00068   //CkPrintf("Simulation run with MeshStreaming(%d,%d) for communication optimization...\n", COMM_TIMEOUT, COMM_MAXMSG);
00069 #endif
00070   // Create a MemoryPool with global handle for memory recycling 
00071   MemPoolID = CProxy_MemoryPool::ckNew();
00072   // Create a Temporal Memory Manager
00073   TempMemID = CProxy_TimePool::ckNew();
00074 #endif
00075   // Initialize statistics collection if desired
00076 #ifndef CMK_OPTIMIZE
00077   theLocalStats = CProxy_localStat::ckNew();
00078   CProxy_globalStat::ckNew(&theGlobalStats);
00079 #endif
00080 #ifndef SEQUENTIAL_POSE
00081   // Initialize global handles to GVT and PVT
00082   ThePVT = CProxy_PVT::ckNew(); 
00083   TheGVT = CProxy_GVT::ckNew();
00084   // Start off using normal forward execution
00085   if(pose_config.lb_on)
00086     {
00087       // Initialize the load balancer
00088       TheLBG = CProxy_LBgroup::ckNew();
00089       TheLBstrategy = CProxy_LBstrategy::ckNew();
00090       CkPrintf("Load balancing is ON.\n");
00091     }
00092 #endif
00093   CProxy_pose::ckNew(&POSE_Coordinator_ID, 0);
00094   // Create array to hold all POSE objects
00095 #ifdef POSE_COMM_ON  
00096   POSE_Objects_RO = CProxy_sim::ckNew(); 
00097   POSE_Objects = POSE_Objects_RO;
00098 #else
00099   POSE_Objects = CProxy_sim::ckNew(); 
00100 #endif
00101   //#ifndef SEQUENTIAL_POSE
00102   //#ifdef POSE_COMM_ON
00103   // Make POSE_Objects use the comm lib
00104   //  ComlibDelegateProxy(&POSE_Objects);
00105   //#endif
00106   //#endif
00107 
00108 #ifdef SEQUENTIAL_POSE
00109   if (CkNumPes() > 1) CkAbort("ERROR: Cannot run a sequential simulation on more than one processor!\n");
00110   CkPrintf("NOTE: POSE running in sequential simulation mode!\n");
00111   int fnIdx = CkIndex_pose::stop();
00112   CkStartQD(fnIdx, &POSE_Coordinator_ID);
00113   POSE_GlobalClock = 0;
00114   POSE_GlobalTS = 0;
00115 #else
00116   /*  CkPrintf("WARNING: Charm Quiescence termination enabled!\n");
00117   int fnIdx = CkIndex_pose::stop();
00118   CkStartQD(fnIdx, &POSE_Coordinator_ID);
00119   */
00120 #endif  
00121   CkPrintf("POSE initialization complete.\n");
00122   if (POSE_inactDetect) CkPrintf("Using Inactivity Detection for termination.\n");
00123   else 
00124 #if USE_LONG_TIMESTAMPS
00125     CkPrintf("Using endTime of %lld for termination.\n", POSE_endtime);
00126 #else
00127     CkPrintf("Using endTime of %d for termination.\n", POSE_endtime);
00128 #endif
00129   sim_timer = CmiWallTimer(); 
00130 }
00131 
00132 void POSE_startTimer() {
00133   CkPrintf("Starting simulation...\n"); 
00134   sim_timer = CmiWallTimer(); 
00135 }
00136 
00138 void POSE_useID() 
00139 {
00140   CkPrintf("WARNING: POSE_useID obsolete. See POSE_init params.\n");
00141 }
00142 
00144 void POSE_useET(POSE_TimeType et) 
00145 {
00146   CkPrintf("WARNING: POSE_useET obsolete. See POSE_init params.\n");
00147 }
00148 
00150 void POSE_registerCallBack(CkCallback cb)
00151 {
00152   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00153   callBack *cbm = new callBack;
00154   cbm->callback = cb;
00155   POSE_Coordinator.registerCallBack(cbm);
00156 }
00157 
00159 void POSE_stop()
00160 {
00161   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00162   POSE_Coordinator.stop();
00163 }
00164 
00166 void POSE_exit()
00167 {
00168   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00169   POSE_Coordinator.exit();
00170 }
00171 
00173 void POSE_prepExit(void *param, void *msg)
00174 {
00175   CkReductionMsg *m=(CkReductionMsg *)msg;
00176   delete m;
00177   CProxy_pose POSE_Coordinator(POSE_Coordinator_ID);
00178   POSE_Coordinator.prepExit();
00179 }
00180 
00182 void POSE_set_busy_wait(double n) { busyWait = n; }
00183 
00185 void POSE_busy_wait()
00186 {
00187   double start = CmiWallTimer();
00188   while (CmiWallTimer() - start < busyWait) ;
00189 }
00190 
00192 void POSE_busy_wait(double n)
00193 {
00194   double start = CmiWallTimer();
00195   while (CmiWallTimer() - start < n) ;
00196 }
00197 
00199 void pose::registerCallBack(callBack *cbm) 
00200 {
00201   callBackSet = 1;
00202   cb = cbm->callback;
00203 }
00204 
00206 void pose::stop(void) 
00207 { 
00208 #ifdef SEQUENTIAL_POSE
00209 #if USE_LONG_TIMESTAMPS
00210   CkPrintf("Sequential Endtime Approximation: %lld\n", POSE_GlobalClock);
00211 #else
00212   CkPrintf("Sequential Endtime Approximation: %d\n", POSE_GlobalClock);
00213 #endif
00214   // Call sequential termination here, when done it calls prepExit
00215   POSE_Objects.Terminate();
00216 #endif
00217   // prepExit();
00218 }
00219 
00221 void pose::prepExit(void) 
00222 {
00223 #ifndef CMK_OPTIMIZE
00224   if(pose_config.stats)
00225     {
00226       CProxy_localStat stats(theLocalStats);
00227       CkPrintf("%d PE Simulation finished at %f. Gathering stats...\n", 
00228                CkNumPes(), CmiWallTimer() - sim_timer);
00229       stats.SendStats();
00230     }
00231   else
00232     {
00233       CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(), 
00234                CmiWallTimer() - sim_timer);
00235       POSE_exit();
00236     }
00237 #else
00238   CkPrintf("%d PE Simulation finished at %f.\n", CkNumPes(), 
00239            CmiWallTimer() - sim_timer);
00240   POSE_exit();
00241 #endif  
00242 }
00243 
00245 void pose::exit(void) 
00246 { 
00247   if (callBackSet)
00248     cb.send(); // need to make callback here
00249   else
00250     CkExit();
00251 }
00252 
00253 // this is a HACK to get module seqpose working
00254 void _registerseqpose(void)
00255 {
00256   _registerpose();
00257 }
00258 
00259 void POSEreadCmdLine()
00260 {
00261   char **argv = CkGetArgv();
00262   CmiArgGroup("Charm++","POSE");
00263   pose_config.stats=CmiGetArgFlagDesc(argv, "+stats_pose",
00264                         "Gather timing information and other statistics");
00265   /*  semantic meaning for these still to be determined
00266   CmiGetArgIntDesc(argv, "+start_proj_pose",&pose_config.start_proj,
00267                         "GVT to initiate projections tracing");
00268   CmiGetArgIntDesc(argv, "+end_proj_pose",&pose_config.end_proj,
00269                         "GVT to end projections tracing");
00270   */
00271   pose_config.trace=CmiGetArgFlagDesc(argv, "+trace_pose",
00272                         "Traces key POSE operations like Forward Execution, Rollback, Cancellation, Fossil Collection, etc. via user events for display in projections");
00273 
00274   pose_config.dop=CmiGetArgFlagDesc(argv, "+dop_pose",
00275                         "Critical path analysis by measuring degree of parallelism");
00276 
00277   CmiGetArgIntDesc(argv, "+memman_pose", &pose_config.max_usage , "Coarse memory management: Restricts forward execution of objects with over <max_usage>/<checkpoint store_rate> checkpoints; default to 10");
00278   /*
00279   pose_config.msg_pool=CmiGetArgFlagDesc(argv, "+pose_msgpool",  "Store and reuse pools of messages under a certain size default 1000");
00280   CmiGetArgIntDesc(argv, "+msgpoolsize_pose", &pose_config.msg_pool_size , "Store and reuse pools of messages under a certain size default 1000");
00281 
00282   CmiGetArgIntDesc(argv, "+msgpoolmax_pose", &pose_config.max_pool_msg_size , "Store and reuse pools of messages under a certain size");
00283   char *strat;
00284   CmiGetArgStringDesc(argv, "+commlib_strat_pose", &strat , "Use commlib with strat in {stream|mesh|prio}");
00285   if(strcmp("stream",strat)==0)
00286     pose_config.commlib_strat=stream;
00287   if(strcmp("mesh",strat)==0)
00288     pose_config.commlib_strat=mesh;
00289   if(strcmp("prio",strat)==0)
00290     pose_config.commlib_strat=prio;
00291   CmiGetArgIntDesc(argv, "+commlib_timeout-pose", &pose_config.commlib_timeout , "Use commlib with timeout N; default 1ms");
00292   CmiGetArgIntDesc(argv, "+commlib_maxmsg_pose", &pose_config.commlib_maxmsg , "Use commlib with max msg N;  default 5");
00293   */
00294   pose_config.lb_on=CmiGetArgFlagDesc(argv, "+lb_on_pose", "Use load balancing");
00295   CmiGetArgIntDesc(argv, "+lb_skip_pose", &pose_config.lb_skip , "Load balancing skip N; default 51");
00296   CmiGetArgIntDesc(argv, "+lb_threshold_pose", &pose_config.lb_threshold , "Load balancing threshold N; default 4000");
00297   CmiGetArgIntDesc(argv, "+lb_diff_pose", &pose_config.lb_diff , "Load balancing  min diff between min and max load PEs; default 2000");
00298   CmiGetArgIntDesc(argv, "+checkpoint_rate_pose", &pose_config.store_rate , " Set checkpoint to 1 for every <rate> events. Default to 1. ");
00299   /* max_iteration seems to be defunct */
00300   //  CmiGetArgIntDesc(argv, "+FEmax_pose", &pose_config.max_iter , "Sets max events executed in single forward execution step.  Default to 100.");
00301   CmiGetArgIntDesc(argv, "+leash_specwindow_pose", &pose_config.spec_window , "Sets speculative window behavior.");
00302   CmiGetArgIntDesc(argv, "+leash_min_pose", &pose_config.min_leash , "Sets speculative window behavior minimum leash. Default 10.");
00303   CmiGetArgIntDesc(argv, "+leash_max_pose", &pose_config.max_leash , "Sets speculative window behavior maximum leash. Default 100.");
00304   CmiGetArgIntDesc(argv, "+leash_flex_pose", &pose_config.max_leash , "Sets speculative window behavior leash flex. Default 10.");
00305   if(pose_config.deterministic= CmiGetArgFlagDesc(argv, "+deterministic_pose",  "sorts events of same timestamp by event id for repeatable behavior "))
00306     {
00307       CkPrintf("enter at your own risk, this feature is broken\n");
00308     }
00309 
00310 
00311 }  

Generated on Sun Jun 29 13:29:25 2008 for Charm++ by  doxygen 1.5.1