libs/ck-libs/collide/threadCollide.C

Go to the documentation of this file.
00001 /*
00002  * threadCollide: threaded interface to Collision detection library
00003  * Orion Sky Lawlor, olawlor@acm.org, 7/19/2001
00004  */
00005 #include "collidecharm_impl.h"
00006 #include "collidecharm.h"
00007 #include "collidec.h"
00008 #include "tcharm.h"
00009 #include "collide.decl.h"
00010 
00011 #define COLLIDE_TRACE 0
00012 #if COLLIDE_TRACE
00013   define TRACE(x) ckout<<"["<<CkMyPe()<<"] "<<x<<endl;
00014 #else
00015 #  define TRACE(x) /* empty */
00016 #endif
00017 
00018 /* TCharm Semaphore ID for collision startup */
00019 #define COLLIDE_TCHARM_SEMAID 0x0C0771DE /* _COLLIDE */
00020 
00021 class threadCollide;
00022 
00026 class threadCollisions : public CMessage_threadCollisions {
00027 public:
00028         int src; /* processor number of source group */
00029         int nColls; /* number of collision records below */
00030         Collision *colls; /* points into message body */
00031 };
00032 
00037 class threadCollideMgr : public CBase_threadCollideMgr
00038 {
00039         //Map chunk number to contributor (big, but fast indexing scheme)
00040         CkVec<threadCollide *> contrib;
00041         inline threadCollide *lookup(int chunkNo) {
00042                 threadCollide *ret=contrib[chunkNo];
00043 #ifndef CMK_OPTIMIZE
00044                 if (ret==NULL) CkAbort("threadCollideMgr can't find contributor");
00045 #endif
00046                 return ret;
00047         }
00048         
00049         //Temporarily store collisions to be sent to each processor:
00050         CkVec<CollisionList> toPE;
00051         
00052         //Temporarily store collisions sent in from each processor:
00053         CkVec<threadCollisions *> fromPE;
00054         
00055         //Counter for collisions from remote processors:
00056         int nRemote;
00057         
00058         //Cached version of CkMyPe
00059         int myPe;
00060         
00061 public:
00062         threadCollideMgr(void) 
00063                 :toPE(CkNumPes()), fromPE(CkNumPes())
00064         {
00065                 for (int p=0;p<CkNumPes();p++) fromPE[p]=0;
00066                 nRemote=0;
00067                 myPe=CkMyPe();
00068         }
00069         
00071         void registerContributor(threadCollide *chunk,int chunkNo) 
00072         {
00073                 while (contrib.size()<=chunkNo) contrib.push_back(0);
00074                 contrib[chunkNo]=chunk;
00075         }
00076         void unregisterContributor(int chunkNo) {
00077                 contrib[chunkNo]=NULL;
00078         }
00079         
00082         void collisions(ArrayElement *src,int step,CollisionList &colls);
00083         
00086         void sendRemote(CkReductionMsg *m);
00087         
00089         void remoteCollisions(threadCollisions *m);
00090         
00092         void sift(int nColl,const Collision *colls);
00093 };
00094 
00099 class threadCollide : public TCharmClient1D {
00100         typedef TCharmClient1D super;
00101         // Outgoing collision requests:
00102         CollideHandle collide;
00103         // Incoming collision lists:
00104         CProxy_threadCollideMgr mgr;
00105 protected:
00106         virtual void setupThreadPrivate(CthThread th) {}
00107 public:
00108         growableBufferT<Collision> colls; //Accumulated Collisions
00109         
00110         threadCollide(const CProxy_TCharm &threads,
00111                 const CProxy_threadCollideMgr &mgr_,
00112                 const CollideHandle &collide_) 
00113                 :super(threads), mgr(mgr_), collide(collide_)
00114         {
00115                 arriving();
00117                 thread->semaPut(COLLIDE_TCHARM_SEMAID,this);
00118         }
00119         threadCollide(CkMigrateMessage *m) :super(m) {}
00120         
00121         void arriving(void) {
00122                 CollideRegister(collide,thisIndex);
00123                 mgr.ckLocalBranch()->registerContributor(this,thisIndex);
00124         }
00125         void pup(PUP::er &p) {
00126                 super::pup(p);
00127                 p|mgr;
00128                 p|collide;
00129         }
00130         void ckJustMigrated(void) {
00131                 super::ckJustMigrated();
00132                 arriving();
00133         }
00134         void leaving(void) {
00135                 CollideUnregister(collide,thisIndex);
00136                 mgr.ckLocalBranch()->unregisterContributor(thisIndex);
00137         }
00138         ~threadCollide() {
00139                 leaving();
00140         }
00141         inline const CkArrayID &getArrayID(void) const {return thisArrayID;}
00142         
00143         
00145         void contribute(int n,const bbox3d *boxes,const int *prio) 
00146         {
00147                 CollideBoxesPrio(collide,thisIndex,n,boxes,prio);
00148                 thread->suspend(); //Will be resumed by call to resultsDone()
00149         }
00150         
00152         void resultsDone(void) {
00153                 thread->resume();
00154         }
00155 };
00156 
00157 
00160 void threadCollideMgr::collisions(ArrayElement *src,int step,
00161                                   CollisionList &colls) {
00162   // Do a fake reduction, so we'll know when all voxels have reported:
00163   src->contribute(0,0,CkReduction::sum_int,
00164                   CkCallback(CkIndex_threadCollideMgr::sendRemote(0),thisProxy));
00165   
00166   // Split out this voxel's contribution
00167   int i=0, n=colls.size();
00168   static int count=0;
00169   
00170   TRACE("Voxel contributes "<<n<<" collisions")
00171     
00172     //printf("COLLIDE: Total collisions contributed so far: %d\n", count+=n);
00173     for (i=0;i<n;i++) {
00174       const Collision &c=colls[i];
00175       toPE[c.A.pe].push_back(c);
00176       if (c.B.pe!=c.A.pe) { //Report collision to both processors
00177         Collision cB(c.B,c.A); //Swap so B is listed first
00178         toPE[c.B.pe].push_back(cB);
00179       }
00180     }
00181 }
00182 
00185 threadCollisions *listToMessage(CollisionList &l) 
00186 {
00187         int n=l.size();
00188         Collision *c=l.detachBuffer();
00189         threadCollisions *m=new (n,0) threadCollisions;
00190         m->nColls=n;
00191         for (int i=0;i<n;i++) m->colls[i]=c[i];
00192         free(c);
00193         return m;
00194 }
00195 
00198 void threadCollideMgr::sendRemote(CkReductionMsg *m) {
00199         // FIXME: optimize this all-to-all
00200         int p,n=CkNumPes();
00201         for (p=0;p<n;p++) { // Loop over destination processors:
00202                 TRACE("Sending "<<toPE[p].size()<<" collisions to "<<p)
00203                 threadCollisions *m=listToMessage(toPE[p]);
00204                 m->src=myPe;
00205                 if (p==myPe) /* local */
00206                         remoteCollisions(m);
00207                 else /* remote */
00208                         thisProxy[p].remoteCollisions(m);
00209         }
00210 }
00211 
00213 void threadCollideMgr::remoteCollisions(threadCollisions *m) {
00214         /*
00215         Subtle: to guarantee that matching collisions are presented 
00216         in the same order everywhere, we order each chunk's collisions
00217         by reporting (source) processor.  We do this without a sort by
00218         buffering, then "sifting" each collision in the proper order.
00219         */
00220         
00221         // Just buffer this message 
00222         // (FIXME: if it's the one we're waiting for, sift it right away)
00223         if (fromPE[m->src]!=NULL) 
00224                 CkAbort("threadCollideMgr::remoteCollisions unexpected message");
00225         fromPE[m->src]=m;
00226         
00227         // See if we're done yet
00228         if (++nRemote==CkNumPes()) 
00229         {       
00230                 // Sift out our collisions to each array element
00231                 TRACE("Sifting collisions out to each array element")
00232                 int p,n=CkNumPes();
00233                 for (p=0;p<n;p++) {
00234                         sift(fromPE[p]->nColls,fromPE[p]->colls);
00235                         delete fromPE[p]; fromPE[p]=NULL;
00236                 }
00237                 
00238                 // Get ready for the next step
00239                 nRemote=0;
00240                 
00241                 // Tell all our array elements that the results are now in
00242                 for (int i=0;i<contrib.size();i++)
00243                         if (contrib[i])
00244                                 contrib[i]->resultsDone();
00245         }
00246 }
00247 
00249 void threadCollideMgr::sift(int nColl,const Collision *colls) 
00250 {
00251         for (int i=0;i<nColl;i++) {
00252                 const Collision &c=colls[i];
00253 #ifndef CMK_OPTIMIZE
00254                 if (c.A.pe!=myPe) CkAbort("Should only have local collisions now");
00255 #endif
00256                 lookup(c.A.chunk)->colls.push_back(c);
00257                 if (c.A.pe==c.B.pe && c.A.chunk!=c.B.chunk) 
00258                 { //Report this collision to both local chunks:
00259                         Collision cB(c.B,c.A); //Swap so B is listed first
00260                         lookup(c.B.chunk)->colls.push_back(cB);
00261                 }
00262                         
00263         }
00264 }
00265 
00266 /*************** API Routines *****************/
00267 //Declare this at the start of every API routine:
00268 #define COLLIDEAPI(routineName) TCHARM_API_TRACE(routineName,"collide")
00269 
00270 
00271 int TCHARMLIB_Get_rank(TCharm *tc,int mpi_comm) {
00272         // FIXME: call AMPI_Get_rank if given a real AMPI communicator
00273         return tc->getElement();
00274 }
00275 CkArrayOptions TCHARMLIB_Bound_array(TCharm *tc,int mpi_comm) {
00276         // FIXME: bind to AMPI if given a real AMPI communicator
00277         CkArrayOptions opts(tc->getNumElements());
00278         opts.bindTo(tc->getProxy());
00279         return opts;
00280 }
00281 
00282 CDECL collide_t COLLIDE_Init(int mpi_comm,
00283         const double *gridStart,const double *gridSize)
00284 {
00285         COLLIDEAPI("COLLIDE_Init");
00286         TCharm *tc=TCharm::get();
00287         if (tc==NULL) CkAbort("Must call COLLIDE_Init from driver");
00288         int rank=TCHARMLIB_Get_rank(tc,mpi_comm);
00289         if (rank==0) { // I am the master: I must create the array
00290           CkArrayOptions opts(TCHARMLIB_Bound_array(tc,mpi_comm));
00291           CProxy_threadCollideMgr client=
00292             CProxy_threadCollideMgr::ckNew();
00293           CollideGrid3d gridMap(*(vector3d *)gridStart, *(vector3d *)gridSize);
00294           CollideHandle collide=
00295             CollideCreate(gridMap,client);
00296           CProxy_threadCollide::ckNew(tc->getProxy(),client,collide,opts);
00297           // As array elements are created, they will
00298           //  do tc->semaPut(COLLIDE_TCHARM_SEMAID,this);
00299         }
00300         // Block until the collision objects are all created:
00301         threadCollide *coll=(threadCollide *)tc->semaGet(COLLIDE_TCHARM_SEMAID);
00302         // hideous: extract the groupID's "idx" to use as a "collide_t"
00303         CkGroupID g=coll->getArrayID();
00304         collide_t c=g.idx;
00305         return c;
00306 }
00307 FORTRAN_AS_C_RETURN(int,COLLIDE_INIT,COLLIDE_Init,collide_init,
00308         (int *comm,double *s,double *e), (*comm,s,e))
00309 
00310 threadCollide *COLLIDE_Lookup(collide_t c) {
00311         CkGroupID g; g.idx=c;
00312         CProxy_threadCollide coll(g);
00313         threadCollide *ret=coll[TCharm::get()->getElement()].ckLocal();
00314 #ifndef CMK_OPTIMIZE
00315         if (ret==NULL) CkAbort("COLLIDE can't find its collision array element.");
00316 #endif  
00317         return ret;
00318 }
00319 
00320 CDECL void COLLIDE_Boxes(collide_t c,int nBox,const double *boxes)
00321 {
00322         COLLIDEAPI("COLLIDE_Boxes");
00323         COLLIDE_Lookup(c)->contribute(nBox,(const bbox3d *)boxes,NULL);
00324 }
00325 FORTRAN_AS_C(COLLIDE_BOXES,COLLIDE_Boxes,collide_boxes,
00326         (int *c,int *n,double *box),(*c,*n,box))
00327 
00328 CDECL void COLLIDE_Boxes_prio(collide_t c,int nBox,const double *boxes,const int *prio)
00329 {
00330         COLLIDEAPI("COLLIDE_Boxes_prio");
00331         COLLIDE_Lookup(c)->contribute(nBox,(const bbox3d *)boxes,prio);
00332 }
00333 FORTRAN_AS_C(COLLIDE_BOXES_PRIO,COLLIDE_Boxes_prio,collide_boxes_prio,
00334         (int *c,int *n,double *box,int *prio),(*c,*n,box,prio))
00335 
00336 CDECL int COLLIDE_Count(collide_t c) {
00337         COLLIDEAPI("COLLIDE_Count");
00338         return COLLIDE_Lookup(c)->colls.size();
00339 }
00340 FORTRAN_AS_C_RETURN(int,COLLIDE_COUNT,COLLIDE_Count,collide_count,
00341         (int *c),(*c))
00342 
00343 static void getCollisionList(collide_t c,int *out,int indexBase) {
00344         growableBufferT<Collision> &colls=COLLIDE_Lookup(c)->colls;
00345         int i,n=colls.size();
00346         Collision *in=colls.detachBuffer();
00347         for (i=0;i<n;i++) {
00348                 out[3*i+0]=in[i].A.number+indexBase;
00349                 out[3*i+1]=in[i].B.chunk+indexBase;
00350                 out[3*i+2]=in[i].B.number+indexBase;
00351         }
00352         free(in);
00353 }
00354 
00355 CDECL void COLLIDE_List(collide_t c,int *out) {
00356         COLLIDEAPI("COLLIDE_List");
00357         getCollisionList(c,out,0);
00358 }
00359 FDECL void FTN_NAME(COLLIDE_LIST,collide_list)(collide_t *c,int *out) {
00360         COLLIDEAPI("COLLIDE_List");
00361         getCollisionList(*c,out,1);
00362 }
00363 
00364 CDECL void COLLIDE_Destroy(collide_t c) {
00365         COLLIDEAPI("COLLIDE_Destroy");
00366         /* FIXME: delete entire array */
00367 }
00368 FORTRAN_AS_C(COLLIDE_DESTROY,COLLIDE_Destroy,collide_destroy,
00369         (int *c),(*c))
00370 
00371 #include "collide.def.h"

Generated on Sun Jun 29 13:29:19 2008 for Charm++ by  doxygen 1.5.1