arch/origin-pthreads/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/origin-pthreads/machine.c,v $
00003  * $Author: gioachin $
00004  * $Date: 2004-10-06 01:28:55 $
00005  * $Revision: 1.20 $
00006  *****************************************************************************/
00007 
00014 #include <errno.h>
00015 #include <pthread.h>
00016 #include <sched.h>
00017 #include <time.h>
00018 
00019 #include <stdio.h>
00020 #include <sys/types.h>
00021 #include <sys/time.h>
00022 #include <limits.h>
00023 #include <unistd.h>
00024 
00025 #include "converse.h"
00026 
00027 #define BLK_LEN  512
00028 
00029 typedef struct {
00030   pthread_mutex_t mutex;
00031   pthread_cond_t cond;
00032   int waiting;
00033   CdsFifo q;
00034 } McQueue;
00035 
00036 static McQueue *McQueueCreate(void);
00037 static void McQueueAddToBack(McQueue *queue, void *element);
00038 static void *McQueueRemoveFromFront(McQueue *queue);
00039 static McQueue **MsgQueue;
00040 
00041 CpvDeclare(void*, CmiLocalQueue);
00042 
00043 int Cmi_argc;
00044 int _Cmi_numpes;
00045 int Cmi_usched;
00046 int Cmi_initret;
00047 CmiStartFn Cmi_startFn;
00048 
00049 pthread_key_t perThreadKey;
00050 
00051 static void *threadInit(void *arg);
00052 
00053 pthread_mutex_t memory_mutex;
00054 
00055 void CmiMemLock() {pthread_mutex_lock(&memory_mutex);}
00056 void CmiMemUnlock() {pthread_mutex_unlock(&memory_mutex);}
00057 
00058 int barrier;
00059 pthread_cond_t barrier_cond;
00060 pthread_mutex_t barrier_mutex;
00061 
00062 void CmiNodeBarrier(void)
00063 {
00064   pthread_mutex_lock(&barrier_mutex);
00065   barrier++;
00066   if(barrier!=CmiNumPes())
00067     pthread_cond_wait(&barrier_cond, &barrier_mutex);
00068   else {
00069     barrier = 0;
00070     pthread_cond_broadcast(&barrier_cond);
00071   }
00072   pthread_mutex_unlock(&barrier_mutex);
00073 }
00074 
00075 void CmiNodeAllBarrier(void)
00076 {
00077   pthread_mutex_lock(&barrier_mutex);
00078   barrier++;
00079   if(barrier!=CmiNumPes()+1)
00080     pthread_cond_wait(&barrier_cond, &barrier_mutex);
00081   else {
00082     barrier = 0;
00083     pthread_cond_broadcast(&barrier_cond);
00084   }
00085   pthread_mutex_unlock(&barrier_mutex);
00086 }
00087 
00088 CmiNodeLock CmiCreateLock(void)
00089 {
00090   pthread_mutex_t *lock;
00091   lock = (pthread_mutex_t *) CmiAlloc(sizeof(pthread_mutex_t));
00092   pthread_mutex_init(lock, (pthread_mutexattr_t *) 0);
00093   return lock;
00094 }
00095 
00096 void CmiLock(CmiNodeLock lock)
00097 {
00098   pthread_mutex_lock(lock);
00099 }
00100 
00101 void CmiUnlock(CmiNodeLock lock)
00102 {
00103   pthread_mutex_unlock(lock);
00104 }
00105 
00106 int CmiTryLock(CmiNodeLock lock)
00107 {
00108   return pthread_mutex_trylock(lock);
00109 }
00110 
00111 void CmiDestroyLock(CmiNodeLock lock)
00112 {
00113   pthread_mutex_destroy(lock);
00114 }
00115 
00116 int CmiMyPe()
00117 {
00118   int mype = (size_t) pthread_getspecific(perThreadKey);
00119   return mype;
00120 }
00121 
00122 /***********************************************************************
00123  *
00124  * Abort function:
00125  *
00126  ************************************************************************/
00127 
00128 void CmiAbort(const char *message)
00129 {
00130   CmiError(message);
00131   abort();
00132 }
00133 
00134 int CmiAsyncMsgSent(CmiCommHandle msgid)
00135 {
00136   return 1;
00137 }
00138 
00139 
00140 typedef struct {
00141   char       **argv;
00142   int        mype;
00143 } USER_PARAMETERS;
00144 
00145 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00146 {
00147   int i;
00148   USER_PARAMETERS *usrparam;
00149   pthread_t *aThread;
00150  
00151   _Cmi_numpes = 0; 
00152   Cmi_usched = usched;
00153   Cmi_initret = initret;
00154   Cmi_startFn = fn;
00155 
00156   CmiGetArgInt(argv,"+p",&_Cmi_numpes);
00157   if (_Cmi_numpes <= 0)
00158   {
00159     CmiError("Error: requested number of processors is invalid %d\n",
00160               _Cmi_numpes);
00161     abort();
00162   }
00163 
00164 
00165   pthread_mutex_init(&memory_mutex, (pthread_mutexattr_t *) 0);
00166 
00167   MsgQueue=(McQueue **)CmiAlloc(_Cmi_numpes*sizeof(McQueue *));
00168   for(i=0; i<_Cmi_numpes; i++) 
00169     MsgQueue[i] = McQueueCreate();
00170 
00171   pthread_key_create(&perThreadKey, (void *) 0);
00172   barrier = 0;
00173   pthread_cond_init(&barrier_cond, (pthread_condattr_t *) 0);
00174   pthread_mutex_init(&barrier_mutex, (pthread_mutexattr_t *) 0);
00175 
00176   /* suggest to IRIX that we actually use the right number of processors */
00177   pthread_setconcurrency(_Cmi_numpes);
00178 
00179   Cmi_argc = CmiGetArgc(argv);
00180   aThread = (pthread_t *) CmiAlloc(sizeof(pthread_t) * _Cmi_numpes);
00181   for(i=1; i<_Cmi_numpes; i++) {
00182     usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
00183     usrparam->argv = CmiCopyArgs(argv);
00184     usrparam->mype = i;
00185 
00186     pthread_create(&aThread[i],(pthread_attr_t *)0,threadInit,(void *)usrparam);
00187   }
00188   usrparam = (USER_PARAMETERS *) CmiAlloc(sizeof(USER_PARAMETERS));
00189   usrparam->argv = CmiCopyArgs(argv);
00190   usrparam->mype = 0;
00191   threadInit(usrparam);
00192 }
00193 
00194 void CmiTimerInit(void);
00195 
00196 static void *threadInit(void *arg)
00197 {
00198   USER_PARAMETERS *usrparam;
00199   usrparam = (USER_PARAMETERS *) arg;
00200 
00201 
00202   pthread_setspecific(perThreadKey, (void *) usrparam->mype);
00203 
00204   CthInit(usrparam->argv);
00205   ConverseCommonInit(usrparam->argv);
00206   CpvInitialize(void*, CmiLocalQueue);
00207   CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00208   CmiTimerInit();
00209   if (Cmi_initret==0) {
00210     Cmi_startFn(Cmi_argc, usrparam->argv);
00211     if (Cmi_usched==0) CsdScheduler(-1);
00212     ConverseExit();
00213   }
00214   return (void *) 0;
00215 }
00216 
00217 
00218 void ConverseExit(void)
00219 {
00220   ConverseCommonExit();
00221   CmiNodeBarrier();
00222 }
00223 
00224 
00225 void CmiDeclareArgs(void)
00226 {
00227 }
00228 
00229 
00230 void CmiNotifyIdle()
00231 {
00232   McQueue *queue = MsgQueue[CmiMyPe()];
00233   struct timespec ts;
00234   pthread_mutex_lock(&(queue->mutex));
00235   if(CdsFifo_Empty(queue->q)){
00236     queue->waiting++;
00237     ts.tv_sec = (time_t) 0;
00238     ts.tv_nsec = 10000000L;
00239     pthread_cond_timedwait(&(queue->cond), &(queue->mutex), &ts);
00240     queue->waiting--;
00241   }
00242   pthread_mutex_unlock(&(queue->mutex));
00243   return;
00244 }
00245 
00246 void *CmiGetNonLocal()
00247 {
00248   return McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
00249 }
00250 
00251 
00252 void CmiSyncSendFn(int destPE, int size, char *msg)
00253 {
00254   char *buf;
00255 
00256   buf=(void *)CmiAlloc(size);
00257   memcpy(buf,msg,size);
00258   McQueueAddToBack(MsgQueue[destPE],buf); 
00259   CQdCreate(CpvAccess(cQdState), 1);
00260 }
00261 
00262 
00263 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00264 {
00265   CmiSyncSendFn(destPE, size, msg); 
00266   return 0;
00267 }
00268 
00269 
00270 void CmiFreeSendFn(int destPE, int size, char *msg)
00271 {
00272   if (CmiMyPe()==destPE) {
00273     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00274   } else {
00275     McQueueAddToBack(MsgQueue[destPE],msg); 
00276   }
00277   CQdCreate(CpvAccess(cQdState), 1);
00278 }
00279 
00280 void CmiSyncBroadcastFn(int size, char *msg)
00281 {
00282   int i;
00283   for(i=0; i<_Cmi_numpes; i++)
00284     if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00285 }
00286 
00287 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00288 {
00289   CmiSyncBroadcastFn(size, msg);
00290   return 0;
00291 }
00292 
00293 void CmiFreeBroadcastFn(int size, char *msg)
00294 {
00295   CmiSyncBroadcastFn(size,msg);
00296   CmiFree(msg);
00297 }
00298 
00299 void CmiSyncBroadcastAllFn(int size, char *msg)
00300 {
00301   int i;
00302   for(i=0; i<CmiNumPes(); i++) 
00303     CmiSyncSendFn(i,size,msg);
00304 }
00305 
00306 
00307 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00308 {
00309   CmiSyncBroadcastAllFn(size, msg);
00310   return 0; 
00311 }
00312 
00313 
00314 void CmiFreeBroadcastAllFn(int size, char *msg)
00315 {
00316   int i;
00317   for(i=0; i<CmiNumPes(); i++) {
00318     if(CmiMyPe() != i) {
00319       CmiSyncSendFn(i,size,msg);
00320     }
00321   }
00322   CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00323   CQdCreate(CpvAccess(cQdState), 1);
00324 }
00325 
00326 /* ****************************************************************** */
00327 /*    The following internal functions implements FIFO queues for     */
00328 /*    messages. These queues are shared among threads                 */
00329 /* ****************************************************************** */
00330 
00331 static void ** AllocBlock(unsigned int len)
00332 {
00333   void **blk;
00334 
00335   blk=(void **)CmiAlloc(len*sizeof(void *));
00336   return blk;
00337 }
00338 
00339 static void 
00340 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
00341 {
00342   memcpy(destblk, &(srcblk[first]), (len-first)*sizeof(void *));
00343   memcpy(&(destblk[len-first]),srcblk,first*sizeof(void *));
00344 }
00345 
00346 McQueue * McQueueCreate(void)
00347 {
00348   McQueue *queue;
00349 
00350   queue = (McQueue *) CmiAlloc(sizeof(McQueue));
00351   pthread_mutex_init(&(queue->mutex), (pthread_mutexattr_t *) 0);
00352   pthread_cond_init(&(queue->cond), (pthread_condattr_t *) 0);
00353   queue->waiting = 0;
00354   queue->q = CdsFifo_Create_len(BLK_LEN);
00355   return queue;
00356 }
00357 
00358 void McQueueAddToBack(McQueue *queue, void *element)
00359 {
00360   pthread_mutex_lock(&(queue->mutex));
00361   CdsFifo_Enqueue(queue->q, element);
00362   if(queue->waiting) {
00363     pthread_cond_broadcast(&(queue->cond));
00364   }
00365   pthread_mutex_unlock(&(queue->mutex));
00366 }
00367 
00368 
00369 void * McQueueRemoveFromFront(McQueue *queue)
00370 {
00371   void *element = 0;
00372   pthread_mutex_lock(&(queue->mutex));
00373   element = CdsFifo_Dequeue(queue->q);
00374   pthread_mutex_unlock(&(queue->mutex));
00375   return element;
00376 }
00377 
00378 /* Timer Routines */
00379 
00380 
00381 CpvStaticDeclare(double,inittime_wallclock);
00382 CpvStaticDeclare(double,inittime_virtual);
00383 
00384 void CmiTimerInit(void)
00385 {
00386   struct timespec temp;
00387   CpvInitialize(double, inittime_wallclock);
00388   CpvInitialize(double, inittime_virtual);
00389   clock_gettime(CLOCK_SGI_CYCLE, &temp);
00390   CpvAccess(inittime_wallclock) = (double) temp.tv_sec +
00391                                   1e-9 * temp.tv_nsec;
00392   CpvAccess(inittime_virtual) = CpvAccess(inittime_wallclock);
00393 }
00394 
00395 double CmiWallTimer(void)
00396 {
00397   struct timespec temp;
00398   double currenttime;
00399 
00400   clock_gettime(CLOCK_SGI_CYCLE, &temp);
00401   currenttime = (double) temp.tv_sec +
00402                 1e-9 * temp.tv_nsec;
00403   return (currenttime - CpvAccess(inittime_wallclock));
00404 }
00405 
00406 double CmiCpuTimer(void)
00407 {
00408   struct timespec temp;
00409   double currenttime;
00410 
00411   clock_gettime(CLOCK_SGI_CYCLE, &temp);
00412   currenttime = (double) temp.tv_sec +
00413                 1e-9 * temp.tv_nsec;
00414   return (currenttime - CpvAccess(inittime_virtual));
00415 }
00416 
00417 double CmiTimer(void)
00418 {
00419   return CmiCpuTimer();
00420 }
00421 

Generated on Sun Jun 29 13:29:05 2008 for Charm++ by  doxygen 1.5.1