00001
00002
00003
00004
00005
00006
00007
00017 #include <stdlib.h>
00018 #include <unistd.h>
00019 #include <malloc.h>
00020 #include <mpp/shmem.h>
00021 #include "converse.h"
00022
00023
00024
00025
00026
00027 #define MAX_PES 2048
00028
00029
00030
00031
00032 enum boolean {false = 0, true = 1};
00033 enum {list_empty = -1 };
00034
00035
00036
00037
00038
00039 CpvDeclare(void*, CmiLocalQueue);
00040 int _Cmi_mype;
00041 int _Cmi_numpes;
00042 int _Cmi_myrank;
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 typedef struct McDistListS
00055 {
00056 int nxt_node;
00057 struct McMsgHdrS *nxt_addr;
00058 int msg_sz;
00059 } McDistList;
00060
00061 typedef struct McMsgHdrS
00062 {
00063 McDistList list_node;
00064 enum {Unknown, Message, BcastMessage } msg_type;
00065 enum boolean received_f;
00066 union
00067 {
00068 struct McMsgHdrS *ptr;
00069 int count;
00070 } bcast;
00071 int bcast_msg_size;
00072 int handler;
00073 } McMsgHdr;
00074
00075
00076
00077
00078
00079 static void McInit();
00080 static void McInitList();
00081 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe);
00082 static void McRetrieveRemote(void);
00083 static void McCleanUpInTransit(void);
00084
00085
00086
00087
00088
00089
00090
00091 #define BLK_LEN 512
00092
00093 typedef struct McQueueS
00094 {
00095 void **blk;
00096 unsigned int blk_len;
00097 unsigned int first;
00098 unsigned int len;
00099 } McQueue;
00100
00101 static McQueue *McQueueCreate(void);
00102 static void McQueueAddToBack(McQueue *queue, void *element);
00103 static void *McQueueRemoveFromFront(McQueue *queue);
00104 static void *McQueueRemoveFromBack(McQueue *queue);
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 static McQueue *in_transit_queue;
00120 static McQueue *in_transit_tmp_queue;
00121
00122
00123 static McQueue *tmp_queue;
00124
00125
00126
00127
00128 static McQueue *received_queue;
00129
00130
00131
00132
00133 static McQueue *received_token_queue;
00134
00135
00136
00137
00138 static McQueue *broadcast_queue;
00139 static McQueue *broadcast_tmp_queue;
00140
00141
00142
00143
00144 static McDistList head;
00145
00146
00147 static long *my_lock;
00148 static long head_lock[MAX_PES];
00149 static long bcast_lock[MAX_PES];
00150
00151 #define ALIGN8(x) (8*(((x)+7)/8))
00152
00153 int McChecksum(char *msg, int size)
00154 {
00155 int chksm;
00156 int i;
00157
00158 chksm=0xff;
00159 for(i=0; i < size; i++)
00160 chksm ^= *(msg+i);
00161 return chksm;
00162 }
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174 void CmiSyncSendFn(int dest_pe, int size, char *msg)
00175 {
00176 McMsgHdr *dup_msg;
00177
00178 dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00179 memcpy(dup_msg,msg,size);
00180 dup_msg->msg_type = Message;
00181
00182 McRetrieveRemote();
00183
00184 if (dest_pe == _Cmi_mype)
00185 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dup_msg);
00186 else
00187 {
00188 McEnqueueRemote(dup_msg,ALIGN8(size),dest_pe);
00189 }
00190 CQdCreate(CpvAccess(cQdState), 1);
00191 }
00192
00193 CmiCommHandle CmiAsyncSendFn(int dest_pe, int size, char *msg)
00194 {
00195 CmiSyncSendFn(dest_pe, size, msg);
00196 return 0;
00197 }
00198
00199 void CmiFreeSendFn(int dest_pe, int size, char *msg)
00200 {
00201
00202 McRetrieveRemote();
00203 ((McMsgHdr *)msg)->msg_type = Message;
00204
00205 if (dest_pe == _Cmi_mype)
00206 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
00207 else
00208 {
00209 McEnqueueRemote(msg,size,dest_pe);
00210 }
00211 CQdCreate(CpvAccess(cQdState), 1);
00212 }
00213
00214 void CmiSyncBroadcastFn(int size, char *msg)
00215 {
00216 int i;
00217 McMsgHdr *dup_msg;
00218 McMsgHdr bcast_msg_tok;
00219 McMsgHdr *dup_tok;
00220 int hdr_size;
00221
00222
00223
00224
00225 dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00226 memcpy(dup_msg,msg,size);
00227 dup_msg->bcast.count = _Cmi_numpes - 1;
00228
00229
00230
00231
00232
00233
00234 bcast_msg_tok.msg_type = BcastMessage;
00235 bcast_msg_tok.bcast.ptr = dup_msg;
00236 bcast_msg_tok.bcast_msg_size = size;
00237
00238 hdr_size = ALIGN8(sizeof(McMsgHdr));
00239
00240
00241
00242
00243
00244 for(i=0; i<_Cmi_numpes; i++)
00245 if (i != _Cmi_mype)
00246 {
00247 dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00248 memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00249 McEnqueueRemote(dup_tok,hdr_size,i);
00250 }
00251
00252
00253
00254
00255
00256 McQueueAddToBack(broadcast_queue,dup_msg);
00257 CQdCreate(CpvAccess(cQdState), _Cmi_numpes-1);
00258 }
00259
00260 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
00261 {
00262 CmiSyncBroadcastFn(size,msg);
00263 return 0;
00264 }
00265
00266 void CmiFreeBroadcastFn(int size, char *msg)
00267 {
00268 CmiSyncBroadcastFn(size,msg);
00269 CmiFree(msg);
00270 }
00271
00272 void CmiSyncBroadcastAllFn(int size, char *msg)
00273 {
00274 int i;
00275 CmiSyncBroadcastFn(size,msg);
00276 CmiSyncSendFn(_Cmi_mype, size, msg);
00277 }
00278
00279 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
00280 {
00281 CmiSyncBroadcastAllFn(size,msg);
00282 return 0;
00283 }
00284
00285 void CmiFreeBroadcastAllFn(int size, char *msg)
00286 {
00287 CmiSyncBroadcastAllFn(size,msg);
00288 CmiFree(msg);
00289 }
00290
00291
00292 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg)
00293 {
00294 int i;
00295 McMsgHdr *dup_msg;
00296 McMsgHdr bcast_msg_tok;
00297 McMsgHdr *dup_tok;
00298 int hdr_size;
00299 int n_remote_pes;
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309 n_remote_pes = 0;
00310 for (i=0; i < npes; i++)
00311 {
00312 if (pes[i] == _Cmi_mype)
00313 CmiSyncSendFn(_Cmi_mype, size, msg);
00314 else
00315 n_remote_pes++;
00316 }
00317 if (n_remote_pes == 0)
00318 return;
00319
00320
00321
00322
00323 dup_msg = (McMsgHdr *)CmiAlloc(ALIGN8(size));
00324 memcpy(dup_msg,msg,size);
00325 dup_msg->bcast.count = n_remote_pes;
00326
00327
00328
00329 bcast_msg_tok.msg_type = BcastMessage;
00330 bcast_msg_tok.bcast.ptr = dup_msg;
00331 bcast_msg_tok.bcast_msg_size = size;
00332
00333 hdr_size = ALIGN8(sizeof(McMsgHdr));
00334
00335
00336
00337
00338
00339 for(i=0; i<npes; i++)
00340 if (pes[i] != _Cmi_mype)
00341 {
00342 dup_tok = (McMsgHdr *)CmiAlloc(ALIGN8(hdr_size));
00343 memcpy(dup_tok,&bcast_msg_tok,hdr_size);
00344 McEnqueueRemote(dup_tok,hdr_size,pes[i]);
00345 CQdCreate(CpvAccess(cQdState), 1);
00346 }
00347
00348
00349
00350
00351
00352 McQueueAddToBack(broadcast_queue,dup_msg);
00353 }
00354
00355 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg)
00356 {
00357 CmiSyncListSendFn(npes, pes, size, msg);
00358 return 0;
00359 }
00360
00361 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg)
00362 {
00363 CmiSyncListSendFn(npes,pes,size,msg);
00364 CmiFree(msg);
00365 }
00366
00367 typedef struct {
00368 char header[CmiMsgHeaderSizeBytes];
00369 CmiGroup grp;
00370 int size;
00371 char *user_msg;
00372 } McMultiMsg;
00373
00374 CpvDeclare(int,McMulticastWaitHandler);
00375
00376 void CmiSyncMulticastFn(CmiGroup grp, int size, char *msg)
00377 {
00378 int npes;
00379 int *pes;
00380 McMultiMsg multi_msg;
00381
00382
00383
00384
00385
00386
00387
00388
00389 CmiLookupGroup(grp, &npes, &pes);
00390 if (pes != 0)
00391 CmiSyncListSendFn( npes, pes, size, msg);
00392 else
00393 {
00394 multi_msg.grp = grp;
00395 multi_msg.size = size;
00396 multi_msg.user_msg = (char *) CmiAlloc(ALIGN8(size));
00397 memcpy(multi_msg.user_msg,msg,size);
00398
00399 CmiSetHandler(&multi_msg,CpvAccess(McMulticastWaitHandler));
00400 CmiSyncSendFn(CmiMyPe(),sizeof(McMultiMsg),(char *)&multi_msg);
00401 }
00402 }
00403
00404 void McMulticastWaitFn(McMultiMsg *msg)
00405 {
00406 CmiFreeMulticastFn(msg->grp,msg->size,msg->user_msg);
00407 }
00408
00409 void CmiMulticastInit(void)
00410 {
00411 CpvInitialize(int,McMulticastWaitHandler);
00412 CpvAccess(McMulticastWaitHandler) = CmiRegisterHandler(McMulticastWaitFn);
00413 }
00414
00415 CmiCommHandle CmiAsyncMulticastFn(CmiGroup grp, int size, char *msg)
00416 {
00417 CmiSyncMulticastFn(grp, size, msg);
00418 return 0;
00419 }
00420
00421 void CmiFreeMulticastFn(CmiGroup grp, int size, char *msg)
00422 {
00423 CmiSyncMulticastFn(grp, size, msg);
00424 CmiFree(msg);
00425 }
00426
00427
00428
00429
00430
00431
00432
00433
00434 void CmiAbort(const char *message)
00435 {
00436 CmiError(message);
00437 globalexit(1);
00438 }
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449 void *CmiGetNonLocal()
00450 {
00451 McRetrieveRemote();
00452
00453 return (void *)McQueueRemoveFromFront(received_queue);
00454 }
00455
00456 void
00457 ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
00458 {
00459 McInit();
00460 CthInit(argv);
00461 ConverseCommonInit(argv);
00462 for(argc=0;argv[argc];argc++);
00463 if (initret==0)
00464 {
00465 fn(argc,argv);
00466 if (usched==0) CsdScheduler(-1);
00467 ConverseExit();
00468 }
00469 }
00470
00471 void ConverseExit()
00472 {
00473 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
00474 if (CmiMyPe() == 0){
00475 CmiPrintf("End of program\n");
00476 }
00477 #endif
00478 ConverseCommonExit();
00479 localexit(0);
00480 }
00481
00482 void CmiNotifyIdle(void)
00483 {
00484
00485 McCleanUpInTransit();
00486 }
00487
00488
00489
00490
00491
00492 static void McInit(void)
00493 {
00494 CpvInitialize(void *, CmiLocalQueue);
00495 CpvAccess(CmiLocalQueue) = CdsFifo_Create();
00496 _Cmi_mype = _my_pe();
00497 _Cmi_numpes = _num_pes();
00498 _Cmi_myrank = 0;
00499
00500 McInitList();
00501 }
00502
00503 static void McInitList(void)
00504 {
00505 int i;
00506
00507 received_queue = McQueueCreate();
00508 tmp_queue = McQueueCreate();
00509 received_token_queue = McQueueCreate();
00510 broadcast_queue = McQueueCreate();
00511 broadcast_tmp_queue = McQueueCreate();
00512 in_transit_tmp_queue = McQueueCreate();
00513 in_transit_queue = McQueueCreate();
00514
00515 head.nxt_node = list_empty;
00516 head.nxt_addr = NULL;
00517 head.msg_sz = 0;
00518 if (_Cmi_numpes > MAX_PES)
00519 {
00520 CmiPrintf("Not enough processors allocated in machine.c.\n");
00521 CmiPrintf("Change MAX_PES in t3e/machine.c to at least %d and recompile Converse\n",
00522 _Cmi_numpes);
00523 }
00524 for(i=0; i < _Cmi_numpes; i++)
00525 {
00526 head_lock[i] = 0;
00527 bcast_lock[i] = 0;
00528 }
00529 my_lock = &(head_lock[_Cmi_mype]);
00530 barrier();
00531 shmem_clear_lock(my_lock);
00532 shmem_clear_lock(&bcast_lock[_Cmi_mype]);
00533 }
00534
00535 static void McEnqueueRemote(void *msg, int msg_sz, int dst_pe)
00536 {
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547 McDistList tmp_link;
00548 McDistList *msg_link;
00549
00550
00551
00552
00553 McCleanUpInTransit();
00554
00555
00556 McQueueAddToBack(in_transit_queue,msg);
00557
00558
00559 msg_link = &(((McMsgHdr *)msg)->list_node);
00560 ((McMsgHdr *)msg)->received_f = false;
00561
00562
00563 tmp_link.nxt_node = _Cmi_mype;
00564 tmp_link.nxt_addr = msg;
00565 tmp_link.msg_sz = msg_sz;
00566
00567
00568
00569
00570
00571
00572 shmem_set_lock(&(head_lock[dst_pe]));
00573
00574
00575
00576
00577
00578
00579
00580 shmem_get(msg_link, &head, sizeof(McDistList)/sizeof(int), dst_pe);
00581
00582 shmem_put(&head, &tmp_link, sizeof(McDistList)/sizeof(int),dst_pe);
00583
00584 #ifdef DEBUG
00585 printf("[%d] Adding Message to pe %d\n",_Cmi_mype,dst_pe);
00586 printf("[%d] nxt_node = %d\n",_Cmi_mype,tmp_link.nxt_node);
00587 printf("[%d] nxt_addr = %x\n",_Cmi_mype,tmp_link.nxt_addr);
00588 printf("[%d] msg_sz = %x\n",_Cmi_mype,tmp_link.msg_sz);
00589 printf("[%d] Old Message is now at %x\n",_Cmi_mype,msg_link);
00590 printf("[%d] nxt_node = %d\n",_Cmi_mype,msg_link->nxt_node);
00591 printf("[%d] nxt_addr = %x\n",_Cmi_mype,msg_link->nxt_addr);
00592 printf("[%d] msg_sz = %x\n",_Cmi_mype,msg_link->msg_sz);
00593 #endif
00594
00595
00596 shmem_clear_lock(&(head_lock[dst_pe]));
00597 }
00598
00599 static void McRetrieveRemote(void)
00600 {
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611 McDistList list_head;
00612 McDistList *cur_node;
00613 McMsgHdr *cur_msg;
00614 int received_f;
00615 enum boolean bcast_msg;
00616 McMsgHdr *bcast_ptr;
00617
00618
00619
00620 if (head.nxt_node == list_empty)
00621 return;
00622
00623
00624 shmem_set_lock(my_lock);
00625
00626
00627 list_head = head;
00628 head.nxt_node = list_empty;
00629 head.nxt_addr = NULL;
00630 head.msg_sz = 0;
00631 shmem_clear_lock(my_lock);
00632
00633
00634
00635
00636
00637 cur_node = &list_head;
00638 received_f = true;
00639
00640 while (cur_node->nxt_node != list_empty)
00641 {
00642 cur_msg = (McMsgHdr *)CmiAlloc(ALIGN8(cur_node->msg_sz));
00643 if (cur_msg ==NULL)
00644 {
00645 CmiError("%s:%d Cannot Allocate Memory\n",__FILE__,__LINE__);
00646 globalexit(1);
00647 }
00648
00649 shmem_get(cur_msg, cur_node->nxt_addr,
00650 ALIGN8(cur_node->msg_sz)/8, cur_node->nxt_node);
00651
00652
00653
00654
00655
00656 if (cur_msg->msg_type == BcastMessage)
00657 {
00658
00659 bcast_msg = true;
00660 bcast_ptr = (McMsgHdr *)CmiAlloc(ALIGN8(cur_msg->bcast_msg_size));
00661 shmem_set_lock(&(bcast_lock[cur_node->nxt_node]));
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671 shmem_get(bcast_ptr,cur_msg->bcast.ptr,
00672 ALIGN8(cur_msg->bcast_msg_size)/8,cur_node->nxt_node);
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 bcast_ptr->bcast.count--;
00683
00684 shmem_put(&(cur_msg->bcast.ptr->bcast.count),
00685 &bcast_ptr->bcast.count,1,cur_node->nxt_node);
00686 shmem_clear_lock(&(bcast_lock[cur_node->nxt_node]));
00687 }
00688 else bcast_msg = false;
00689
00690
00691 shmem_put(&(cur_node->nxt_addr->received_f),&received_f,
00692 1, cur_node->nxt_node);
00693
00694
00695 if (bcast_msg)
00696 {
00697 McQueueAddToBack(received_token_queue,cur_msg);
00698 McQueueAddToBack(tmp_queue,bcast_ptr);
00699 }
00700 else
00701 McQueueAddToBack(tmp_queue,cur_msg);
00702
00703
00704 cur_node = &(cur_msg->list_node);
00705 }
00706
00707
00708 while ((cur_msg = McQueueRemoveFromBack(tmp_queue)) != NULL) {
00709 McQueueAddToBack(received_queue,cur_msg);
00710 }
00711
00712
00713 while ((cur_msg = McQueueRemoveFromBack(received_token_queue)) != NULL) {
00714 CmiFree(cur_msg);
00715 }
00716 return;
00717 }
00718
00719 static void McCleanUpInTransit(void)
00720 {
00721 McMsgHdr *msg;
00722 McQueue *swap_ptr;
00723
00724
00725
00726 while ((msg = (McMsgHdr *)McQueueRemoveFromFront(broadcast_queue))
00727 != NULL)
00728 {
00729 if (msg->bcast.count == 0)
00730 {
00731
00732
00733
00734 CmiFree(msg);
00735 }
00736 else
00737 {
00738 McQueueAddToBack(broadcast_tmp_queue,msg);
00739 }
00740 }
00741
00742
00743
00744
00745 swap_ptr = broadcast_tmp_queue;
00746 broadcast_tmp_queue = broadcast_queue;
00747 broadcast_queue = swap_ptr;
00748
00749
00750
00751
00752
00753 while ((msg = (McMsgHdr *)McQueueRemoveFromFront(in_transit_queue))
00754 != NULL)
00755 {
00756 if (msg->received_f)
00757 {
00758 CmiFree(msg);
00759 }
00760 else
00761 {
00762 McQueueAddToBack(in_transit_tmp_queue,msg);
00763 }
00764 }
00765
00766
00767
00768
00769 swap_ptr = in_transit_tmp_queue;
00770 in_transit_tmp_queue = in_transit_queue;
00771 in_transit_queue = swap_ptr;
00772 #ifdef DEBUG
00773 CmiPrintf("[%d] done in_transit_queue = %d, tmp_queue = %d\n",
00774 _Cmi_mype,in_transit_queue->len,in_transit_tmp_queue->len);
00775 #endif
00776 }
00777
00778
00779
00780
00781
00782
00783
00784 static void **McQueueAllocBlock(unsigned int len)
00785 {
00786 void ** blk;
00787
00788 blk=(void **)malloc(len*sizeof(void *));
00789 if(blk==(void **)0) {
00790 CmiError("Cannot Allocate Memory!\n");
00791 abort();
00792 }
00793 return blk;
00794 }
00795
00796 static void
00797 McQueueSpillBlock(void **srcblk, void **destblk,
00798 unsigned int first, unsigned int len)
00799 {
00800 memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
00801 memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
00802 }
00803
00804 static McQueue * McQueueCreate(void)
00805 {
00806 McQueue *queue;
00807
00808 queue = (McQueue *) malloc(sizeof(McQueue));
00809 if(queue==(McQueue *)0) {
00810 CmiError("Cannot Allocate Memory!\n");
00811 abort();
00812 }
00813 queue->blk = McQueueAllocBlock(BLK_LEN);
00814 queue->blk_len = BLK_LEN;
00815 queue->first = 0;
00816 queue->len = 0;
00817 return queue;
00818 }
00819
00820 int inside_comm = 0;
00821
00822 static void McQueueAddToBack(McQueue *queue, void *element)
00823 {
00824 inside_comm = 1;
00825 if(queue->len==queue->blk_len) {
00826 void **blk;
00827
00828 queue->blk_len *= 3;
00829 blk = McQueueAllocBlock(queue->blk_len);
00830 McQueueSpillBlock(queue->blk, blk, queue->first, queue->len);
00831 free(queue->blk);
00832 queue->blk = blk;
00833 queue->first = 0;
00834 }
00835 #ifdef DEBUG
00836 CmiPrintf("[%d] Adding %x\n",_Cmi_mype,element);
00837 #endif
00838 queue->blk[(queue->first+queue->len++)%queue->blk_len] = element;
00839 inside_comm = 0;
00840 }
00841
00842 static void * McQueueRemoveFromBack(McQueue *queue)
00843 {
00844 void *element;
00845 element = (void *) 0;
00846 if(queue->len) {
00847 element = queue->blk[(queue->first+queue->len-1)%queue->blk_len];
00848 queue->len--;
00849 }
00850 return element;
00851 }
00852
00853 static void * McQueueRemoveFromFront(McQueue *queue)
00854 {
00855 void *element;
00856 element = (void *) 0;
00857 if(queue->len) {
00858 element = queue->blk[queue->first++];
00859 queue->first = (queue->first+queue->blk_len)%queue->blk_len;
00860 queue->len--;
00861 }
00862 return element;
00863 }
00864
00865
00866
00867
00868 static double clocktick;
00869 static long inittime_wallclock;
00870 static long inittime_virtual;
00871
00872 void CmiTimerInit()
00873 {
00874 inittime_wallclock = _rtc();
00875 inittime_virtual = cpused();
00876 clocktick = 1.0 / (sysconf(_SC_CLK_TCK));
00877 }
00878
00879 double CmiWallTimer()
00880 {
00881 long now;
00882
00883 now = _rtc();
00884 return (clocktick * (now - inittime_wallclock));
00885 }
00886
00887 double CmiCpuTimer()
00888 {
00889 long now;
00890
00891 now = cpused();
00892 return (clocktick * (now - inittime_virtual));
00893 }
00894
00895 double CmiTimer()
00896 {
00897 long now;
00898
00899 now = _rtc();
00900 return (clocktick * (now - inittime_wallclock));
00901 }
00902
00903
00904
00905
00906
00907 static volatile int memflag;
00908 void CmiMemLock() { memflag=1; }
00909 void CmiMemUnlock() { memflag=0; }
00910