00001
00002
00003
00004
00005
00006
00007
00013
00014 #include <spp_prog_model.h>
00015 #include <memory.h>
00016 #include <cps.h>
00017 #include <math.h>
00018 #include "converse.h"
00019
00020
00021 #define BLK_LEN 512
00022 typedef struct {
00023 mem_sema_t sema;
00024 void **blk;
00025 unsigned int blk_len;
00026 unsigned int first;
00027 unsigned int len;
00028 unsigned int maxlen;
00029 } McQueue;
00030
00031 static McQueue *McQueueCreate(void);
00032 static void McQueueAddToBack(McQueue *queue, void *element);
00033 static void *McQueueRemoveFromFront(McQueue *queue);
00034 static McQueue **MsgQueue;
00035 #define g_malloc(s) memory_class_malloc(s,NEAR_SHARED_MEM)
00036
00037
00038 CpvDeclare(void*, CmiLocalQueue);
00039 node_private int _Cmi_numpes;
00040
00041 static node_private barrier_t barrier;
00042
00043 static void threadInit();
00044
00045
00046
00047
00048
00049
00050
00051 void CmiAbort(const char *message)
00052 {
00053 CmiError(message);
00054 exit(1);
00055 }
00056
00057
00058
00059 void *CmiSvAlloc(size)
00060 int size;
00061 {
00062 char *res;
00063 res = (char *) memory_class_malloc(size+2*sizeof(int),NEAR_SHARED_MEM);
00064 if (res==0) CmiAbort("Memory allocation failed.");
00065 ((int *)res)[0]=size;
00066 ((int *)res)[1]=(-1);
00067 return (void *)(res+2*sizeof(int));
00068 }
00069
00070 void CmiSvFree(blk)
00071 char *blk;
00072 {
00073 free(blk-2*sizeof(int));
00074 }
00075
00076
00077
00078 CmiNotifyIdle()
00079 {
00080 }
00081
00082 CmiNodeLock CmiCreateLock(void)
00083 {
00084 CmiNodeLock *plock = (CmiNodeLock *)malloc(sizeof(CmiNodeLock));
00085 cps_mutex_alloc(*plock);
00086 return *plock;
00087 }
00088
00089 int CmiProbeLock(CmiNodeLock lock)
00090 {
00091 if(cps_mutex_trylock(lock) == 0){
00092 cps_mutex_unlock(lock);
00093 return 1;
00094 } else {
00095 return 0;
00096 }
00097 }
00098
00099
00100 int CmiAsyncMsgSent(msgid)
00101 CmiCommHandle msgid;
00102 {
00103 return 1;
00104 }
00105
00106 typedef struct {
00107 CmiStartFn fn;
00108 int usched;
00109 } USER_PARAMETERS;
00110
00111
00112 static node_private int Cmi_argc;
00113 static node_private char** Cmi_argv;
00114 static node_private USER_PARAMETERS Cmi_param;
00115 static node_private CmiStartFn Cmi_fn;
00116 static node_private int Cmi_usched;
00117 static node_private int Cmi_initret;
00118
00119 void ConverseInit(int argc, char** argv, CmiStartFn fn, int usched, int initret)
00120 {
00121 int i;
00122 spawn_sym_t request;
00123
00124
00125 i = 0;
00126 _Cmi_numpes = 0;
00127 CmiGetArgInt(argv,"+p",&_Cmi_numpes);
00128 if (_Cmi_numpes <= 0)
00129 CmiAbort("Invalid number of processors\n");
00130
00131 Cmi_argc = argc;
00132 Cmi_argv = argv;
00133 Cmi_fn = fn;
00134 Cmi_usched = usched;
00135 Cmi_initret = initret;
00136
00137 request.node = CPS_SAME_NODE;
00138 request.min = _Cmi_numpes;
00139 request.max = _Cmi_numpes;
00140 request.threadscope = CPS_THREAD_PARALLEL;
00141
00142 if(cps_barrier_alloc(&barrier)!=0)
00143 CmiAbort("Cannot Alocate Barrier\n");
00144
00145 MsgQueue=(McQueue **)g_malloc(_Cmi_numpes*sizeof(McQueue *));
00146 if (MsgQueue == (McQueue **)0) {
00147 CmiAbort("Cannot Allocate Memory...\n");
00148 }
00149 for(i=0; i<_Cmi_numpes; i++) MsgQueue[i] = McQueueCreate();
00150
00151 if (cps_ppcall(&request, threadInit ,(void *) 0) < 0) {
00152 CmiAbort("Cannot create threads...\n");
00153 }
00154 cps_barrier_free(&barrier);
00155
00156 }
00157
00158 void ConverseExit(void)
00159 {
00160 ConverseCommonExit();
00161 cps_barrier(&barrier,&_Cmi_numpes);
00162 }
00163
00164 static void threadInit(arg)
00165 void *arg;
00166 {
00167 char **argv=CmiCopyArgs(Cmi_argv);
00168 CpvInitialize(void*, CmiLocalQueue);
00169
00170 ConverseCommonInit(argv);
00171 neighbour_init(CmiMyPe());
00172 CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00173 CmiSpanTreeInit();
00174 CmiTimerInit();
00175 if (Cmi_initret==0) {
00176 Cmi_fn(CmiGetArgc(argv), argv);
00177 if (Cmi_usched==0) CsdScheduler(-1);
00178 ConverseExit();
00179 }
00180 }
00181
00182 void *CmiGetNonLocal(void)
00183 {
00184 int *buf, *msg;
00185 int size;
00186 msg = McQueueRemoveFromFront(MsgQueue[CmiMyPe()]);
00187 if(msg==0)
00188 return 0;
00189 size = *(msg-1);
00190 if((buf = (int *) CmiAlloc(size))==0)
00191 CmiAbort("Cannot allocate memory!\n");
00192 memcpy(buf, msg, size);
00193 free(msg-1);
00194 return buf;
00195 }
00196
00197
00198 void CmiSyncSendFn(int destPE, int size, char *msg)
00199 {
00200 int *buf;
00201
00202 buf=(int *)g_malloc(size+sizeof(int));
00203 if(buf==(void *)0) {
00204 CmiAbort("Cannot allocate memory!\n");
00205 }
00206 *buf++ = size;
00207 memcpy((char *)buf,(char *)msg,size);
00208 McQueueAddToBack(MsgQueue[destPE],buf);
00209 CQdCreate(CpvAccess(cQdState), 1);
00210 }
00211
00212
00213 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
00214 {
00215 CmiSyncSendFn(destPE, size, msg);
00216 return 0;
00217 }
00218
00219
00220 void CmiFreeSendFn(int destPE, int size, char *msg)
00221 {
00222 if (CmiMyPe()==destPE) {
00223 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00224 CQdCreate(CpvAccess(cQdState), 1);
00225 } else {
00226 CmiSyncSendFn(destPE, size, msg);
00227 CmiFree(msg);
00228 }
00229 }
00230
00231 void CmiSyncBroadcastFn(int size, char *msg)
00232 {
00233 int i;
00234 for(i=0; i<CmiNumPes(); i++)
00235 if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00236 }
00237
00238 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00239 {
00240 CmiSyncBroadcastFn(size, msg);
00241 return 0;
00242 }
00243
00244 void CmiFreeBroadcastFn(int size, char *msg)
00245 {
00246 CmiSyncBroadcastFn(size,msg);
00247 CmiFree(msg);
00248 }
00249
00250 void CmiSyncBroadcastAllFn(int size, char *msg)
00251 {
00252 int i;
00253 for(i=0; i<CmiNumPes(); i++) CmiSyncSendFn(i,size,msg);
00254 }
00255
00256
00257 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00258 {
00259 CmiSyncBroadcastAllFn(size, msg);
00260 return 0;
00261 }
00262
00263
00264 void CmiFreeBroadcastAllFn(int size, char *msg)
00265 {
00266 int i;
00267 for(i=0; i<CmiNumPes(); i++)
00268 if (CmiMyPe() != i) CmiSyncSendFn(i,size,msg);
00269 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00270 CQdCreate(CpvAccess(cQdState), 1);
00271 }
00272
00273
00274
00275 void CmiNodeBarrier()
00276 {
00277 if(cps_barrier(&barrier,&_Cmi_numpes)!=0)
00278 CmiAbort("Error in Barrier\n");
00279 }
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292 static void ** AllocBlock(unsigned int len)
00293 {
00294 void ** blk;
00295
00296 blk=(void **)g_malloc(len*sizeof(void *));
00297 if(blk==(void **)0)
00298 {
00299 CmiAbort("Cannot Allocate Memory!\n");
00300 }
00301 return blk;
00302 }
00303
00304 static void SpillBlock(void **srcblk, void **destblk, int first, int len)
00305 {
00306 memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00307 memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00308 }
00309
00310 static McQueue * McQueueCreate(void)
00311 {
00312 McQueue *queue;
00313 int one = 1;
00314
00315 queue = (McQueue *) g_malloc(sizeof(McQueue));
00316 if(queue==(McQueue *)0)
00317 {
00318 CmiError("Cannot Allocate Memory!\n");
00319 exit(1);
00320 }
00321 m_init32(&(queue->sema), &one);
00322 queue->blk = AllocBlock(BLK_LEN);
00323 queue->blk_len = BLK_LEN;
00324 queue->first = 0;
00325 queue->len = 0;
00326 queue->maxlen = 0;
00327 return queue;
00328 }
00329
00330 static
00331 void
00332 McQueueAddToBack(queue, element)
00333 McQueue *queue;
00334 void *element;
00335 {
00336 m_lock(&(queue->sema));
00337 if(queue->len==queue->blk_len)
00338 {
00339 void **blk;
00340
00341 queue->blk_len *= 3;
00342 blk = AllocBlock(queue->blk_len);
00343 SpillBlock(queue->blk, blk, queue->first, queue->len);
00344 free(queue->blk);
00345 queue->blk = blk;
00346 queue->first = 0;
00347 }
00348 queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00349 if(queue->len>queue->maxlen)
00350 queue->maxlen = queue->len;
00351 m_unlock(&(queue->sema));
00352 }
00353
00354
00355 static
00356 void *
00357 McQueueRemoveFromFront(queue)
00358 McQueue *queue;
00359 {
00360 void *element;
00361 m_lock(&(queue->sema));
00362 element = (void *) 0;
00363 if(queue->len)
00364 {
00365 element = queue->blk[queue->first++];
00366 queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00367 queue->len--;
00368 }
00369 m_unlock(&(queue->sema));
00370 return element;
00371 }
00372