arch/exemplar/machine.c

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * $Source: /cvsroot/charm/src/arch/exemplar/machine.c,v $
00003  * $Author: gioachin $
00004  * $Date: 2004-10-06 01:28:55 $
00005  * $Revision: 2.42 $
00006  *****************************************************************************/
00007 
00013 
00014 #include <spp_prog_model.h>
00015 #include <memory.h>
00016 #include <cps.h>
00017 #include <math.h>
00018 #include "converse.h"
00019 
00020 
00021 #define BLK_LEN         512
00022 typedef struct {
00023   mem_sema_t sema;
00024   void     **blk;
00025   unsigned int blk_len;
00026   unsigned int first;
00027   unsigned int len;
00028   unsigned int maxlen;
00029 } McQueue;
00030 
00031 static McQueue *McQueueCreate(void);
00032 static void McQueueAddToBack(McQueue *queue, void *element);
00033 static void *McQueueRemoveFromFront(McQueue *queue);
00034 static McQueue **MsgQueue;
00035 #define g_malloc(s)  memory_class_malloc(s,NEAR_SHARED_MEM)
00036 
00037 
00038 CpvDeclare(void*, CmiLocalQueue);
00039 node_private int _Cmi_numpes;
00040 
00041 static node_private barrier_t barrier;
00042 
00043 static void threadInit();
00044 
00045 /***********************************************************************
00046  *
00047  * Abort function:
00048  *
00049  ************************************************************************/
00050 
00051 void CmiAbort(const char *message)
00052 {
00053   CmiError(message);
00054   exit(1);
00055 }
00056 
00057 
00058 
00059 void *CmiSvAlloc(size)
00060 int size;
00061 {
00062   char *res;
00063   res = (char *) memory_class_malloc(size+2*sizeof(int),NEAR_SHARED_MEM);
00064   if (res==0) CmiAbort("Memory allocation failed.");
00065   ((int *)res)[0]=size;
00066   ((int *)res)[1]=(-1);
00067   return (void *)(res+2*sizeof(int));
00068 }
00069 
00070 void CmiSvFree(blk)
00071 char *blk;
00072 {
00073   free(blk-2*sizeof(int));
00074 }
00075 
00076 
00077 
00078 CmiNotifyIdle()
00079 {
00080 }
00081 
00082 CmiNodeLock CmiCreateLock(void)
00083 {
00084   CmiNodeLock *plock = (CmiNodeLock *)malloc(sizeof(CmiNodeLock));
00085   cps_mutex_alloc(*plock);
00086   return *plock;
00087 }
00088 
00089 int CmiProbeLock(CmiNodeLock lock)
00090 {
00091   if(cps_mutex_trylock(lock) == 0){
00092     cps_mutex_unlock(lock);
00093     return 1;
00094   } else {
00095     return 0;
00096   }
00097 }
00098 
00099 
00100 int CmiAsyncMsgSent(msgid)
00101 CmiCommHandle msgid;
00102 {
00103    return 1;
00104 }
00105 
00106 typedef struct {
00107    CmiStartFn fn;
00108    int usched;
00109 } USER_PARAMETERS;
00110 
00111 
00112 static node_private int Cmi_argc;
00113 static node_private char** Cmi_argv;
00114 static node_private USER_PARAMETERS Cmi_param;
00115 static node_private CmiStartFn Cmi_fn;
00116 static node_private int Cmi_usched;
00117 static node_private int Cmi_initret;
00118 
00119 void ConverseInit(int argc, char** argv, CmiStartFn fn, int usched, int initret)
00120 {
00121     int i;
00122     spawn_sym_t request;
00123 
00124     /* figure out number of processors required */
00125     i =  0;
00126     _Cmi_numpes = 0; 
00127     CmiGetArgInt(argv,"+p",&_Cmi_numpes);
00128     if (_Cmi_numpes <= 0)
00129       CmiAbort("Invalid number of processors\n");
00130 
00131     Cmi_argc = argc;
00132     Cmi_argv = argv;
00133     Cmi_fn = fn;
00134     Cmi_usched = usched;
00135     Cmi_initret = initret;
00136 
00137     request.node = CPS_SAME_NODE;
00138     request.min  = _Cmi_numpes;
00139     request.max  = _Cmi_numpes;
00140     request.threadscope = CPS_THREAD_PARALLEL;
00141    
00142     if(cps_barrier_alloc(&barrier)!=0)
00143       CmiAbort("Cannot Alocate Barrier\n");
00144 
00145     MsgQueue=(McQueue **)g_malloc(_Cmi_numpes*sizeof(McQueue *));
00146     if (MsgQueue == (McQueue **)0) {
00147         CmiAbort("Cannot Allocate Memory...\n");
00148     }
00149     for(i=0; i<_Cmi_numpes; i++) MsgQueue[i] = McQueueCreate();
00150 
00151     if (cps_ppcall(&request, threadInit ,(void *) 0) < 0) {
00152         CmiAbort("Cannot create threads...\n");
00153     } 
00154     cps_barrier_free(&barrier);
00155 
00156 }
00157 
00158 void ConverseExit(void)
00159 {
00160    ConverseCommonExit();
00161    cps_barrier(&barrier,&_Cmi_numpes);
00162 }
00163 
00164 static void threadInit(arg)
00165 void *arg;
00166 {
00167     char **argv=CmiCopyArgs(Cmi_argv);
00168     CpvInitialize(void*, CmiLocalQueue);
00169 
00170     ConverseCommonInit(argv);
00171     neighbour_init(CmiMyPe());
00172     CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00173     CmiSpanTreeInit();
00174     CmiTimerInit();
00175     if (Cmi_initret==0) {
00176       Cmi_fn(CmiGetArgc(argv), argv);
00177       if (Cmi_usched==0) CsdScheduler(-1);
00178       ConverseExit();
00179     }
00180 }
00181 
00182 void *CmiGetNonLocal(void)
00183 {
00184      int *buf, *msg;
00185      int size;
00186      msg = McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
00187      if(msg==0)
00188        return 0;
00189      size = *(msg-1);
00190      if((buf = (int *) CmiAlloc(size))==0)
00191        CmiAbort("Cannot allocate memory!\n");
00192      memcpy(buf, msg, size);
00193      free(msg-1);
00194      return buf;
00195 }
00196 
00197 
00198 void CmiSyncSendFn(int destPE, int size, char *msg)
00199 {
00200    int *buf;
00201 
00202    buf=(int *)g_malloc(size+sizeof(int));
00203    if(buf==(void *)0) {
00204      CmiAbort("Cannot allocate memory!\n");
00205    }
00206    *buf++ = size;
00207    memcpy((char *)buf,(char *)msg,size);
00208    McQueueAddToBack(MsgQueue[destPE],buf); 
00209    CQdCreate(CpvAccess(cQdState), 1);
00210 }
00211 
00212 
00213 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00214 {
00215     CmiSyncSendFn(destPE, size, msg); 
00216     return 0;
00217 }
00218 
00219 
00220 void CmiFreeSendFn(int destPE, int size, char *msg)
00221 {
00222   if (CmiMyPe()==destPE) {
00223     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00224     CQdCreate(CpvAccess(cQdState), 1);
00225   } else {
00226     CmiSyncSendFn(destPE, size, msg);
00227     CmiFree(msg);
00228   }
00229 }
00230 
00231 void CmiSyncBroadcastFn(int size, char *msg)
00232 {
00233        int i;
00234        for(i=0; i<CmiNumPes(); i++)
00235          if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00236 }
00237 
00238 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00239 {
00240     CmiSyncBroadcastFn(size, msg);
00241     return 0;
00242 }
00243 
00244 void CmiFreeBroadcastFn(int size, char *msg)
00245 {
00246     CmiSyncBroadcastFn(size,msg);
00247     CmiFree(msg);
00248 }
00249 
00250 void CmiSyncBroadcastAllFn(int size, char *msg)
00251 {
00252     int i;
00253     for(i=0; i<CmiNumPes(); i++) CmiSyncSendFn(i,size,msg);
00254 }
00255 
00256 
00257 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00258 {
00259     CmiSyncBroadcastAllFn(size, msg);
00260     return 0; 
00261 }
00262 
00263 
00264 void CmiFreeBroadcastAllFn(int size, char *msg)
00265 {
00266     int i;
00267     for(i=0; i<CmiNumPes(); i++)
00268        if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00269     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00270     CQdCreate(CpvAccess(cQdState), 1);
00271 }
00272 
00273 
00274 
00275 void CmiNodeBarrier()
00276 {
00277    if(cps_barrier(&barrier,&_Cmi_numpes)!=0)
00278      CmiAbort("Error in Barrier\n");
00279 }
00280 
00281 
00282 
00283 
00284 
00285 /* ****************************************************************** */
00286 /*    The following internal functions implements FIFO queues for     */
00287 /*    messages. These queues are shared among threads                 */
00288 /* ****************************************************************** */
00289 
00290 
00291 
00292 static void ** AllocBlock(unsigned int len)
00293 {
00294         void ** blk;
00295 
00296         blk=(void **)g_malloc(len*sizeof(void *));
00297         if(blk==(void **)0)
00298         {
00299                 CmiAbort("Cannot Allocate Memory!\n");
00300         }
00301         return blk;
00302 }
00303 
00304 static void SpillBlock(void **srcblk, void **destblk, int first, int len)
00305 {
00306         memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00307         memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00308 }
00309 
00310 static McQueue * McQueueCreate(void)
00311 {
00312         McQueue *queue;
00313         int one = 1;
00314 
00315         queue = (McQueue *) g_malloc(sizeof(McQueue));
00316         if(queue==(McQueue *)0)
00317         {
00318                 CmiError("Cannot Allocate Memory!\n");
00319                 exit(1);
00320         }
00321         m_init32(&(queue->sema), &one);
00322         queue->blk = AllocBlock(BLK_LEN);
00323         queue->blk_len = BLK_LEN;
00324         queue->first = 0;
00325         queue->len = 0;
00326         queue->maxlen = 0;
00327         return queue;
00328 }
00329 
00330 static 
00331 void
00332 McQueueAddToBack(queue, element)
00333 McQueue *queue;
00334 void  *element;
00335 {
00336         m_lock(&(queue->sema));
00337         if(queue->len==queue->blk_len)
00338         {
00339                 void **blk;
00340 
00341                 queue->blk_len *= 3;
00342                 blk = AllocBlock(queue->blk_len);
00343                 SpillBlock(queue->blk, blk, queue->first, queue->len);
00344                 free(queue->blk);
00345                 queue->blk = blk;
00346                 queue->first = 0;
00347         }
00348         queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00349         if(queue->len>queue->maxlen)
00350                 queue->maxlen = queue->len;
00351         m_unlock(&(queue->sema));
00352 }
00353 
00354 
00355 static
00356 void *
00357 McQueueRemoveFromFront(queue)
00358 McQueue *queue;
00359 {
00360         void *element;
00361         m_lock(&(queue->sema));
00362         element = (void *) 0;
00363         if(queue->len)
00364         {
00365                 element = queue->blk[queue->first++];
00366                 queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00367                 queue->len--;
00368         }
00369         m_unlock(&(queue->sema));
00370         return element;
00371 }
00372 

Generated on Sun Jun 29 13:29:05 2008 for Charm++ by  doxygen 1.5.1