00001
00020 #include <stdio.h>
00021 #include <stdlib.h>
00022 #include <unistd.h>
00023 #include <string.h>
00024 #include <sys/types.h>
00025 #include <sys/socket.h>
00026 #if CMK_HAS_MALLOC_H
00027 #include <malloc.h>
00028 #endif
00029 #include <getopt.h>
00030 #include <time.h>
00031
00032 #include <infiniband/verbs.h>
00033
00034 enum ibv_mtu mtu = IBV_MTU_2048;
00035 static int page_size;
00036 static int mtu_size;
00037 static int packetSize;
00038 static int dataSize;
00039 static int rdma;
00040 static int rdmaThreshold;
00041 static int firstBinSize;
00042 static int blockAllocRatio;
00043 static int blockThreshold;
00044
00045
00046 static int maxRecvBuffers;
00047 static int maxTokens;
00048
00049 static int sendPacketPoolSize;
00050
00051 static double _startTime=0;
00052 static int regCount;
00053
00054 static int pktCount;
00055 static int msgCount;
00056 static int minTokensLeft;
00057
00058
00059 static double regTime;
00060
00061 static double processBufferedTime;
00062 static int processBufferedCount;
00063
00064 #define CMK_IBVERBS_STATS 0
00065 #define CMK_IBVERBS_TOKENS_FLOW 1
00066 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on
00067 #define CMK_IBVERBS_DEBUG 0
00068 #define CMI_DIRECT_DEBUG 0
00069 #define WC_LIST_SIZE 32
00070
00071
00072 #if CMK_IBVERBS_STATS
00073 static int numReg=0;
00074 static int numUnReg=0;
00075 static int numCurReg=0;
00076 static int numAlloc=0;
00077 static int numFree=0;
00078 static int numMultiSendUnreg=0;
00079 static int numMultiSend=0;
00080 static int numMultiSendFree=0;
00081 #endif
00082
00083
00084 #define INCTOKENS_FRACTION 0.04
00085 #define INCTOKENS_INCREASE .50
00086
00087
00088 #if CMK_SMP
00089 #define THREAD_MULTI_POOL 1
00090 #endif
00091
00092 #if THREAD_MULTI_POOL
00093 #include "pcqueue.h"
00094 PCQueue **queuePool;
00095 void infi_CmiFreeDirect(void *ptr);
00096 static inline void fillBufferPools();
00097 #endif
00098
00099 #define INFIBARRIERPACKET 128
00100
00101 struct infiIncTokenAckPacket{
00102 int a;
00103 };
00104
00105 typedef struct {
00106 char none;
00107 } CmiIdleState;
00108
00109
00110
00111
00112
00113
00114 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00115
00116 static void CmiNotifyStillIdle(CmiIdleState *s);
00117
00118
00119 static void CmiNotifyBeginIdle(CmiIdleState *s)
00120 {
00121 CmiNotifyStillIdle(s);
00122 }
00123
00124 void CmiNotifyIdle(void) {
00125 CmiNotifyStillIdle(NULL);
00126 }
00127
00128
00129
00130
00131
00132
00133
00134
00135 #define INFIPACKETCODE_DATA 1
00136 #define INFIPACKETCODE_INCTOKENS 2
00137 #define INFIRDMA_START 4
00138 #define INFIRDMA_ACK 8
00139 #define INFIDIRECT_REQUEST 16
00140 #define INFIPACKETCODE_INCTOKENSACK 32
00141 #define INFIDUMMYPACKET 64
00142
00143 struct infiPacketHeader{
00144 char code;
00145 int nodeNo;
00146 #if CMK_IBVERBS_DEBUG
00147 int psn;
00148 #endif
00149 };
00150
00151
00152
00153
00154 #define INFI_MESG 1
00155 #define INFI_DIRECT 2
00156
00157 struct infiRdmaPacket{
00158 int fromNodeNo;
00159 int type;
00160 struct ibv_mr key;
00161 struct ibv_mr *keyPtr;
00162 int remoteSize;
00163 char *remoteBuf;
00164 void *localBuffer;
00165 OutgoingMsg ogm;
00166 struct infiRdmaPacket *next,*prev;
00167 };
00168
00169
00172 #define BUFFER_RECV 1
00173 #define BUFFER_RDMA 2
00174 struct infiBuffer{
00175 int type;
00176 char *buf;
00177 int size;
00178 struct ibv_mr *key;
00179 };
00180
00181
00182
00186 struct infiBufferPool{
00187 int numBuffers;
00188 struct infiBuffer *buffers;
00189 struct infiBufferPool *next;
00190 };
00191
00192
00193
00194
00195
00196
00197
00198 typedef struct infiPacketStruct {
00199 char *buf;
00200 int size;
00201 struct infiPacketHeader header;
00202 struct ibv_mr *keyHeader;
00203 struct OtherNodeStruct *destNode;
00204 struct infiPacketStruct *next;
00205 OutgoingMsg ogm;
00206 struct ibv_sge elemList[2];
00207 struct ibv_send_wr wr;
00208 }* infiPacket;
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 #define BCASTLIST_SIZE 50
00219
00220 struct infiBufferedBcastStruct{
00221 char *msg;
00222 int size;
00223 int broot;
00224 int asm_rank;
00225 int valid;
00226 };
00227
00228 typedef struct infiBufferedBcastPoolStruct{
00229 struct infiBufferedBcastPoolStruct *next,*prev;
00230 struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
00231 int count;
00232
00233 } *infiBufferedBcastPool;
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243 struct infiContext {
00244 struct ibv_context *context;
00245
00246 fd_set asyncFds;
00247 struct timeval tmo;
00248
00249 int ibPort;
00250
00251 struct ibv_pd *pd;
00252 struct ibv_cq *sendCq;
00253 struct ibv_cq *recvCq;
00254 struct ibv_srq *srq;
00255
00256 struct ibv_qp **qp;
00257
00258
00259
00260 struct infiAddr *localAddr;
00261
00262 infiPacket infiPacketFreeList;
00263
00264 struct infiBufferPool *recvBufferPool;
00265
00266 struct infiPacketHeader header;
00267
00268 int srqSize;
00269 int sendCqSize,recvCqSize;
00270 int tokensLeft;
00271
00272 infiBufferedBcastPool bufferedBcastList;
00273
00274 struct infiRdmaPacket *bufferedRdmaAcks;
00275
00276 struct infiRdmaPacket *bufferedRdmaRequests;
00277
00278
00279 int insideProcessBufferedBcasts;
00280 };
00281
00282 static struct infiContext *context;
00283
00284
00285
00286
00290 struct infiAddr {
00291 int lid,qpn,psn;
00292 };
00293
00298 enum { INFI_HEADER_DATA=21,INFI_DATA};
00299
00300 struct infiOtherNodeData{
00301 struct ibv_qp *qp ;
00302 int state;
00303 int totalTokens;
00304 int tokensLeft;
00305 int nodeNo;
00306
00307 int postedRecvs;
00308 int broot;
00309 #if CMK_IBVERBS_DEBUG
00310 int psn;
00311 int recvPsn;
00312 #endif
00313 };
00314
00315
00316
00317
00318
00319
00320 struct infiCmiChunkHeaderStruct;
00321
00322 typedef struct infiCmiChunkMetaDataStruct {
00323 struct ibv_mr *key;
00324 int poolIdx;
00325 void *nextBuf;
00326 struct infiCmiChunkHeaderStruct *owner;
00327 int count;
00328
00329 #if THREAD_MULTI_POOL
00330 int parentPe;
00331 #endif
00332 } infiCmiChunkMetaData;
00333
00334
00335
00336
00337 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
00338
00339 typedef struct {
00340 int size;
00341 void *startBuf;
00342 int count;
00343 } infiCmiChunkPool;
00344
00345 #define INFINUMPOOLS 20
00346 #define INFIMAXPERPOOL 400
00347 #define INFIMULTIPOOL 0xDEAFB00D
00348
00349 #if THREAD_MULTI_POOL
00350 static infiCmiChunkPool **infiCmiChunkPools;
00351
00352 #else
00353 static infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
00354 #endif
00355
00356 static void initInfiCmiChunkPools();
00357
00358
00359 static inline infiPacket newPacket(){
00360 infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
00361 pkt->size = -1;
00362 pkt->header = context->header;
00363 pkt->next = NULL;
00364 pkt->destNode = NULL;
00365 pkt->keyHeader = METADATAFIELD(pkt)->key;
00366 pkt->ogm=NULL;
00367 CmiAssert(pkt->keyHeader!=NULL);
00368 pkt->buf=NULL;
00369
00370 pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
00371 pkt->elemList[0].length = sizeof(struct infiPacketHeader);
00372 pkt->elemList[0].lkey = pkt->keyHeader->lkey;
00373
00374 pkt->wr.wr_id = (uint64_t)pkt;
00375 pkt->wr.sg_list = &(pkt->elemList[0]);
00376 pkt->wr.num_sge = 2;
00377 pkt->wr.opcode = IBV_WR_SEND;
00378 pkt->wr.send_flags = IBV_SEND_SIGNALED;
00379 pkt->wr.next = NULL;
00380
00381 return pkt;
00382 };
00383
00384 #define FreeInfiPacket(pkt){ \
00385 pkt->size = -1;\
00386 pkt->ogm=NULL;\
00387 pkt->buf=NULL;\
00388 pkt->next = context->infiPacketFreeList; \
00389 context->infiPacketFreeList = pkt; \
00390 }
00391
00392 #define MallocInfiPacket(pkt) { \
00393 infiPacket p = context->infiPacketFreeList; \
00394 if(p == NULL){ p = newPacket();} \
00395 else{context->infiPacketFreeList = p->next; } \
00396 pkt = p;\
00397 }
00398
00399
00400
00401 void infi_unregAndFreeMeta(void *md)
00402 {
00403 if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL))
00404 {
00405 int unregstat=ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
00406 CmiAssert(unregstat==0);
00407 free(((infiCmiChunkMetaData *)md));
00408 #if CMK_IBVERBS_STATS
00409 numUnReg++;
00410 numCurReg--;
00411 numMultiSendUnreg++;
00412 #endif
00413 }
00414 }
00415
00416
00417
00418 static inline int pollSendCq(const int toBuffer);
00419
00420 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
00421 static uint16_t getLocalLid(struct ibv_context *context, int port);
00422 static int checkQp(struct ibv_qp *qp){
00423 struct ibv_qp_attr attr;
00424 struct ibv_qp_init_attr init_attr;
00425
00426 ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP ,&init_attr);
00427 if(attr.cur_qp_state != IBV_QPS_RTS){
00428 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
00429 return 0;
00430 }
00431 return 1;
00432 }
00433 static void checkAllQps(){
00434 int i;
00435 for(i=0;i<_Cmi_numnodes;i++){
00436 if(i != _Cmi_mynode){
00437 if(!checkQp(nodes[i].infiData->qp)){
00438 pollSendCq(0);
00439 CmiAssert(0);
00440 }
00441 }
00442 }
00443 }
00444
00445 #if CMK_IBVERBS_FAST_START
00446 static void send_partial_init();
00447 #endif
00448
00449 static void CmiMachineInit(char **argv){
00450 struct ibv_device **devList;
00451 struct ibv_device *dev;
00452 int ibPort;
00453 int i;
00454 int calcMaxSize;
00455 infiPacket *pktPtrs;
00456 struct infiRdmaPacket **rdmaPktPtrs;
00457
00458 MACHSTATE(3,"CmiMachineInit {");
00459 MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
00460 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
00461
00462
00463
00464 devList = ibv_get_device_list(NULL);
00465 CmiAssert(devList != NULL);
00466
00467 dev = *devList;
00468 CmiAssert(dev != NULL);
00469
00470 ibPort=1;
00471
00472 MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
00473
00474 context = (struct infiContext *)malloc(sizeof(struct infiContext));
00475
00476 MACHSTATE1(3,"context allocated %p",context);
00477
00478
00479 context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
00480
00481 MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
00482
00483 context->ibPort = ibPort;
00484
00485 context->context = ibv_open_device(dev);
00486 CmiAssert(context->context != NULL);
00487
00488 MACHSTATE1(3,"device opened %p",context->context);
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498 context->pd = ibv_alloc_pd(context->context);
00499 CmiAssert(context->pd != NULL);
00500 MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
00501
00502
00503
00504
00505
00506
00507
00508 #if CMK_IBVERBS_FAST_START
00509 send_partial_init();
00510 #endif
00511
00512
00513 context->header.nodeNo = _Cmi_mynode;
00514
00515 mtu_size=1200;
00516 packetSize = mtu_size*4;
00517 dataSize = packetSize-sizeof(struct infiPacketHeader);
00518
00519 calcMaxSize=8000;
00520
00521
00522
00523
00524
00525
00526
00527 maxRecvBuffers=calcMaxSize;
00528 maxTokens = maxRecvBuffers;
00529
00530 context->tokensLeft=maxTokens;
00531 context->qp=NULL;
00532
00533 if(_Cmi_numnodes > 1){
00534 #if !CMK_IBVERBS_FAST_START
00535
00536 ChMessage msg;
00537 ctrl_sendone_nolock("barrier",NULL,0,NULL,0);
00538 ChMessage_recv(Cmi_charmrun_fd,&msg);
00539 #endif
00540 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
00541 }
00542
00543 if (Cmi_charmrun_fd == -1) return;
00544
00545
00546 rdma=1;
00547
00548 rdmaThreshold=22000;
00549 firstBinSize = 120;
00550 CmiAssert(rdmaThreshold > firstBinSize);
00551
00552
00553
00554 blockAllocRatio=32;
00555 blockThreshold=8;
00556
00557
00558
00559 #if !THREAD_MULTI_POOL
00560 initInfiCmiChunkPools();
00561 #endif
00562
00563
00564 sendPacketPoolSize = maxTokens/2;
00565 if(sendPacketPoolSize > 2000){
00566 sendPacketPoolSize = 2000;
00567 }
00568
00569 context->infiPacketFreeList=NULL;
00570 pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
00571
00572
00573 #if !THREAD_MULTI_POOL
00574 for(i=0;i<sendPacketPoolSize;i++){
00575 MallocInfiPacket(pktPtrs[i]);
00576 }
00577
00578 for(i=0;i<sendPacketPoolSize;i++){
00579 FreeInfiPacket(pktPtrs[i]);
00580 }
00581 free(pktPtrs);
00582 #endif
00583
00584 context->bufferedBcastList=NULL;
00585 context->bufferedRdmaAcks = NULL;
00586 context->bufferedRdmaRequests = NULL;
00587 context->insideProcessBufferedBcasts=0;
00588
00589
00590 if(rdma){
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608 }
00609
00610
00611 #if CMK_IBVERBS_STATS
00612 regCount =0;
00613 regTime = 0;
00614
00615 pktCount=0;
00616 msgCount=0;
00617
00618 processBufferedCount=0;
00619 processBufferedTime=0;
00620
00621 minTokensLeft = maxTokens;
00622 #endif
00623
00624
00625
00626 MACHSTATE(3,"} CmiMachineInit");
00627 }
00628
00629 void CmiCommunicationInit(char **argv)
00630 {
00631 #if THREAD_MULTI_POOL
00632 initInfiCmiChunkPools();
00633 fillBufferPools();
00634 #endif
00635 }
00636
00637
00638
00639
00640 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
00641 int myLid;
00642 int i;
00643
00644
00645
00646 myLid = getLocalLid(context->context,ibPort);
00647
00648 MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
00649
00650 context->sendCqSize = maxTokens+2;
00651 context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
00652 CmiAssert(context->sendCq != NULL);
00653
00654 MACHSTATE1(3,"sendCq created %p",context->sendCq);
00655
00656
00657 context->recvCqSize = maxRecvBuffers;
00658 context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
00659
00660 MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
00661 CmiAssert(context->recvCq != NULL);
00662
00663
00664
00665 context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
00666
00667 if(numNodes > 1)
00668 {
00669 context->srqSize = (maxRecvBuffers+2);
00670 struct ibv_srq_init_attr srqAttr = {
00671 .attr = {
00672 .max_wr = context->srqSize,
00673 .max_sge = 1
00674 }
00675 };
00676 context->srq = ibv_create_srq(context->pd,&srqAttr);
00677 CmiAssert(context->srq != NULL);
00678
00679 struct ibv_qp_init_attr initAttr = {
00680 .qp_type = IBV_QPT_RC,
00681 .send_cq = context->sendCq,
00682 .recv_cq = context->recvCq,
00683 .srq = context->srq,
00684 .sq_sig_all = 0,
00685 .qp_context = NULL,
00686 .cap = {
00687 .max_send_wr = maxTokens,
00688 .max_send_sge = 2,
00689 },
00690 };
00691 struct ibv_qp_attr attr;
00692
00693 attr.qp_state = IBV_QPS_INIT;
00694 attr.pkey_index = 0;
00695 attr.port_num = ibPort;
00696 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
00697
00698
00699
00700
00701
00702 for( i=1;i<numNodes;i++){
00703 int n = (myNode + i)%numNodes;
00704 if(n == myNode){
00705 }else{
00706 localAddr[n].lid = myLid;
00707 context->qp[n] = ibv_create_qp(context->pd,&initAttr);
00708
00709 MACHSTATE2(3,"qp[%d] created %p",n,context->qp[n]);
00710 CmiAssert(context->qp[n] != NULL);
00711
00712 ibv_modify_qp(context->qp[n], &attr,
00713 IBV_QP_STATE |
00714 IBV_QP_PKEY_INDEX |
00715 IBV_QP_PORT |
00716 IBV_QP_ACCESS_FLAGS);
00717
00718 localAddr[n].qpn = context->qp[n]->qp_num;
00719 localAddr[n].psn = lrand48() & 0xffffff;
00720 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",n,localAddr[n].lid,localAddr[n].qpn,localAddr[n].psn);
00721 }
00722 }
00723 }
00724 MACHSTATE(3,"qps created");
00725 };
00726
00727 void copyInfiAddr(ChInfiAddr *qpList){
00728 int qpListIdx=0;
00729 int i;
00730 MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
00731 for(i=0;i<_Cmi_numnodes;i++){
00732 if(i == _Cmi_mynode){
00733 }else{
00734 qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
00735 qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
00736 qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);
00737 qpListIdx++;
00738 }
00739 }
00740 }
00741
00742
00743 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
00744 struct ibv_port_attr attr;
00745
00746 if (ibv_query_port(dev_context, port, &attr))
00747 return 0;
00748
00749 return attr.lid;
00750 }
00751
00752
00753
00754 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
00755 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
00756
00757
00758
00759
00760 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
00761 struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
00762 int err;
00763 ret->state = INFI_HEADER_DATA;
00764 ret->qp = context->qp[node];
00765
00766
00767 ret->nodeNo = node;
00768
00769 #if CMK_IBVERBS_DEBUG
00770 ret->psn = 0;
00771 ret->recvPsn = 0;
00772 #endif
00773
00774 struct ibv_qp_attr attr = {
00775 .qp_state = IBV_QPS_RTR,
00776 .path_mtu = mtu,
00777 .dest_qp_num = addr[1],
00778 .rq_psn = addr[2],
00779 .max_dest_rd_atomic = 1,
00780 .min_rnr_timer = 31,
00781 .ah_attr = {
00782 .is_global = 0,
00783 .dlid = addr[0],
00784 .sl = 0,
00785 .src_path_bits = 0,
00786 .port_num = context->ibPort
00787 }
00788 };
00789
00790 MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
00791 MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
00792
00793 if (err = ibv_modify_qp(ret->qp, &attr,
00794 IBV_QP_STATE |
00795 IBV_QP_AV |
00796 IBV_QP_PATH_MTU |
00797 IBV_QP_DEST_QPN |
00798 IBV_QP_RQ_PSN |
00799 IBV_QP_MAX_DEST_RD_ATOMIC |
00800 IBV_QP_MIN_RNR_TIMER)) {
00801 MACHSTATE1(3,"ERROR %d",err);
00802 CmiAbort("failed to change qp state to RTR");
00803 }
00804
00805 MACHSTATE(3,"qp state changed to RTR");
00806
00807 attr.qp_state = IBV_QPS_RTS;
00808 attr.timeout = 26;
00809 attr.retry_cnt = 20;
00810 attr.rnr_retry = 7;
00811 attr.sq_psn = context->localAddr[node].psn;
00812 attr.max_rd_atomic = 1;
00813
00814
00815 if (ibv_modify_qp(ret->qp, &attr,
00816 IBV_QP_STATE |
00817 IBV_QP_TIMEOUT |
00818 IBV_QP_RETRY_CNT |
00819 IBV_QP_RNR_RETRY |
00820 IBV_QP_SQ_PSN |
00821 IBV_QP_MAX_QP_RD_ATOMIC)) {
00822 fprintf(stderr, "Failed to modify QP to RTS\n");
00823 exit(1);
00824 }
00825 MACHSTATE(3,"qp state changed to RTS");
00826
00827 MACHSTATE(3,"} initInfiOtherNodeData");
00828 return ret;
00829 }
00830
00831
00832 void infiPostInitialRecvs(){
00833
00834 int numPosts;
00835
00836
00837
00838
00839
00840
00841 if(_Cmi_numnodes > 1){
00842 numPosts = maxRecvBuffers;
00843 }else{
00844 numPosts = 0;
00845 }
00846 if(numPosts > 0){
00847 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
00848 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
00849 }
00850
00851
00852 if (context->qp) {
00853 free(context->qp);
00854 context->qp = NULL;
00855 }
00856 free(context->localAddr);
00857 context->localAddr= NULL;
00858 }
00859
00860 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
00861 int numBuffers;
00862 int i;
00863 int bigSize;
00864 char *bigBuf;
00865 struct infiBufferPool *ret;
00866 struct ibv_mr *bigKey;
00867
00868 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
00869
00870 page_size = sysconf(_SC_PAGESIZE);
00871 ret = malloc(sizeof(struct infiBufferPool));
00872 ret->next = NULL;
00873 numBuffers=ret->numBuffers = numRecvs;
00874
00875 ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
00876
00877 bigSize = numBuffers*sizePerBuffer;
00878 bigBuf=malloc(bigSize);
00879 bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
00880 #if CMK_IBVERBS_STATS
00881 numCurReg++;
00882 numReg++;
00883 #endif
00884
00885 CmiAssert(bigKey != NULL);
00886
00887 for(i=0;i<numBuffers;i++){
00888 struct infiBuffer *buffer = &(ret->buffers[i]);
00889 buffer->type = BUFFER_RECV;
00890 buffer->size = sizePerBuffer;
00891
00892
00893 buffer->buf = &bigBuf[i*sizePerBuffer];
00894 buffer->key = bigKey;
00895
00896 if(buffer->key == NULL){
00897 MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
00898 CmiAssert(buffer->key != NULL);
00899 }
00900 }
00901 return ret;
00902 };
00903
00904
00905
00909 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
00910 int j,err;
00911 struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
00912 struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
00913 struct ibv_recv_wr *bad_wr;
00914
00915 int startBufferIdx=0;
00916 MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
00917 for(j=0;j<numRecvs;j++){
00918
00919
00920 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
00921 sgElements[j].length = sizePerBuffer;
00922 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
00923
00924 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
00925 workRequests[j].sg_list = &sgElements[j];
00926 workRequests[j].num_sge = 1;
00927 if(j != numRecvs-1){
00928 workRequests[j].next = &workRequests[j+1];
00929 }
00930
00931 }
00932 workRequests[numRecvs-1].next = NULL;
00933 MACHSTATE(3,"About to call ibv_post_srq_recv");
00934 if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
00935 CmiAssert(0);
00936 }
00937
00938 free(workRequests);
00939 free(sgElements);
00940 }
00941
00942
00943
00944
00945 static inline void CommunicationServer_nolock(int toBuffer);
00946
00947 void CmiMachineExit()
00948 {
00949 #if CMK_IBVERBS_STATS
00950 printf("[%d] numReg %d numUnReg %d numCurReg %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
00951 #endif
00952
00953 }
00954 static void ServiceCharmrun_nolock();
00955
00956 static void CmiNotifyStillIdle(CmiIdleState *s) {
00957 #if CMK_SMP
00958 CmiCommLock();
00959
00960 #endif
00961
00962
00963 CommunicationServer_nolock(0);
00964 #if CMK_SMP
00965 CmiCommUnlock();
00966 #endif
00967 }
00968
00969 static inline void increaseTokens(OtherNode node);
00970
00971 static inline int pollRecvCq(const int toBuffer);
00972 static inline int pollSendCq(const int toBuffer);
00973
00974
00975 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
00976 #if !CMK_IBVERBS_TOKENS_FLOW
00977 return;
00978 #else
00979
00980 if(context->tokensLeft == 0){
00981 MACHSTATE(3,"GET FREE TOKENS {{{");
00982 }else{
00983 return;
00984 }
00985 while(context->tokensLeft == 0){
00986 CommunicationServer_nolock(1);
00987 }
00988 MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
00989 #endif
00990 }
00991
00992
00999 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
01000 int incTokens=0;
01001 int retval;
01002 #if CMK_IBVERBS_DEBUG
01003 packet->header.psn = (++node->infiData->psn);
01004 #endif
01005
01006
01007
01008 packet->elemList[1].addr = (uintptr_t)packet->buf;
01009 packet->elemList[1].length = size;
01010 packet->elemList[1].lkey = dataKey->lkey;
01011
01012
01013 packet->destNode = node;
01014
01015 #if CMK_IBVERBS_STATS
01016 pktCount++;
01017 #endif
01018
01019 getFreeTokens(node->infiData);
01020
01021 #if CMK_IBVERBS_INCTOKENS
01022 if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
01023 packet->header.code |= INFIPACKETCODE_INCTOKENS;
01024 incTokens=1;
01025 }
01026 #endif
01027
01028
01029
01030
01031
01032
01033 struct ibv_send_wr *bad_wr=NULL;
01034 if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
01035 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
01036 CmiAssert(0);
01037 }
01038 #if CMK_IBVERBS_TOKENS_FLOW
01039 context->tokensLeft--;
01040 #if CMK_IBVERBS_STATS
01041 if(context->tokensLeft < minTokensLeft){
01042 minTokensLeft = context->tokensLeft;
01043 }
01044 #endif
01045 #endif
01046
01047
01048
01049
01050
01051
01052 #if CMK_IBVERBS_INCTOKENS
01053 if(incTokens){
01054 increaseTokens(node);
01055 }
01056 #endif
01057
01058
01059 #if CMK_IBVERBS_DEBUG
01060 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
01061 #else
01062 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
01063 #endif
01064
01065 };
01066
01067
01068 static void inline EnqueueDummyPacket(OtherNode node,int size){
01069 infiPacket packet;
01070 MallocInfiPacket(packet);
01071 packet->size = size;
01072 packet->buf = CmiAlloc(size);
01073
01074 packet->header.code = INFIDUMMYPACKET;
01075
01076 struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
01077
01078 MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
01079 EnqueuePacket(node,packet,size,key);
01080 }
01081
01082
01083
01084
01085
01086
01087 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
01088 infiPacket packet;
01089 MallocInfiPacket(packet);
01090 packet->size = size;
01091 packet->buf=data;
01092
01093
01094 packet->header.code = INFIPACKETCODE_DATA;
01095
01096 ogm->refcount++;
01097 packet->ogm = ogm;
01098
01099 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
01100 CmiAssert(key != NULL);
01101
01102 EnqueuePacket(node,packet,size,key);
01103 };
01104
01105 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
01106 static inline void processAllBufferedMsgs();
01107
01108 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
01109 int size; char *data;
01110
01111
01112
01113 ogm->refcount++;
01114 size = ogm->size;
01115 data = ogm->data;
01116
01117 #if CMK_IBVERBS_STATS
01118 msgCount++;
01119 #endif
01120
01121 MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
01122
01123
01124 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
01125
01126 CmiMsgHeaderSetLength(ogm->data,ogm->size);
01127
01128 if(rdma && size > rdmaThreshold){
01129 EnqueueRdmaPacket(ogm,node);
01130 }else{
01131
01132 while(size > dataSize){
01133 EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
01134 size -= dataSize;
01135 data += dataSize;
01136 }
01137 if(size > 0){
01138 EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
01139 }
01140 }
01141
01142 processAllBufferedMsgs();
01143
01144 ogm->refcount--;
01145 MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
01146 }
01147
01148
01149 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
01150 infiPacket packet;
01151
01152 ogm->refcount++;
01153
01154 MallocInfiPacket(packet);
01155
01156 {
01157 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
01158
01159
01160 packet->size = sizeof(struct infiRdmaPacket);
01161 packet->buf = (char *)rdmaPacket;
01162
01163 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
01164
01165 CmiAssert(key!=NULL);
01166
01167 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
01168
01169 packet->header.code = INFIRDMA_START;
01170 packet->header.nodeNo = _Cmi_mynode;
01171 packet->ogm = NULL;
01172
01173 rdmaPacket->type = INFI_MESG;
01174 rdmaPacket->ogm = ogm;
01175 rdmaPacket->key = *key;
01176 rdmaPacket->keyPtr = key;
01177 rdmaPacket->remoteBuf = ogm->data;
01178 rdmaPacket->remoteSize = ogm->size;
01179
01180
01181 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
01182
01183 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
01184 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
01185 }
01186 }
01187
01188
01189
01190 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
01191 static inline void processSendWC(struct ibv_wc *sendWC);
01192 static unsigned int _count=0;
01193 extern int errno;
01194 static int _countAsync=0;
01195 static inline void processAsyncEvents(){
01196 struct ibv_async_event event;
01197 int ready;
01198 _countAsync++;
01199 if(_countAsync < 1){
01200 return;
01201 }
01202 _countAsync=0;
01203 FD_SET(context->context->async_fd,&context->asyncFds);
01204 CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
01205 ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
01206
01207 if(ready==0){
01208 return;
01209 }
01210 if(ready == -1){
01211
01212 return;
01213 }
01214
01215 if (ibv_get_async_event(context->context, &event)){
01216 return;
01217 CmiAbort("get async event failed");
01218 }
01219 printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
01220 ibv_ack_async_event(&event);
01221
01222
01223 }
01224
01225 static void pollCmiDirectQ();
01226
01227 static inline void CommunicationServer_nolock(int toBuffer) {
01228 int processed;
01229 if(_Cmi_numnodes <= 1){
01230 pollCmiDirectQ();
01231 return;
01232 }
01233 MACHSTATE(2,"CommServer_nolock{");
01234
01235
01236
01237
01238
01239 pollCmiDirectQ();
01240
01241 processed = pollRecvCq(toBuffer);
01242
01243
01244 processed += pollSendCq(toBuffer);
01245
01246 if(toBuffer == 0){
01247
01248 processAllBufferedMsgs();
01249 }
01250
01251
01252
01253
01254 MACHSTATE(2,"} CommServer_nolock ne");
01255
01256 }
01257
01258
01259
01260
01261
01262
01263
01264
01265
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329 static inline int pollRecvCq(const int toBuffer){
01330 int i;
01331 int ne;
01332 struct ibv_wc wc[WC_LIST_SIZE];
01333
01334 MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
01335 ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
01336
01337
01338 if(ne != 0){
01339 MACHSTATE1(3,"pollRecvCq ne %d",ne);
01340 }
01341
01342 for(i=0;i<ne;i++){
01343 if(wc[i].status != IBV_WC_SUCCESS){
01344 CmiAssert(0);
01345 }
01346 switch(wc[i].opcode){
01347 case IBV_WC_RECV:
01348 processRecvWC(&wc[i],toBuffer);
01349 break;
01350 default:
01351 CmiAbort("Wrong type of work completion object in recvq");
01352 break;
01353 }
01354
01355 }
01356 MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
01357 return ne;
01358
01359 }
01360
01361 static inline void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
01362
01363 static inline int pollSendCq(const int toBuffer){
01364 int i;
01365 int ne;
01366 struct ibv_wc wc[WC_LIST_SIZE];
01367
01368 ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
01369
01370
01371
01372 for(i=0;i<ne;i++){
01373 if(wc[i].status != IBV_WC_SUCCESS){
01374 printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
01375 #if CMK_IBVERBS_STATS
01376 printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
01377 #endif
01378 CmiAssert(0);
01379 }
01380 switch(wc[i].opcode){
01381 case IBV_WC_SEND:{
01382
01383 processSendWC(&wc[i]);
01384
01385 break;
01386 }
01387 case IBV_WC_RDMA_READ:
01388 {
01389
01390 processRdmaWC(&wc[i],1);
01391 break;
01392 }
01393 case IBV_WC_RDMA_WRITE:
01394 {
01395
01396
01397 break;
01398 }
01399 default:
01400 CmiAbort("Wrong type of work completion object in recvq");
01401 break;
01402 }
01403
01404 }
01405 return ne;
01406 }
01407
01408
01409
01410
01411
01412
01413 int CheckSocketsReady(int withDelayMs)
01414 {
01415 int nreadable;
01416 CMK_PIPE_DECL(withDelayMs);
01417
01418 CmiStdoutAdd(CMK_PIPE_SUB);
01419 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
01420
01421 nreadable=CMK_PIPE_CALL();
01422 ctrlskt_ready_read = 0;
01423 dataskt_ready_read = 0;
01424 dataskt_ready_write = 0;
01425
01426 if (nreadable == 0) {
01427 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
01428 return nreadable;
01429 }
01430 if (nreadable==-1) {
01431 CMK_PIPE_CHECKERR();
01432 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
01433 return CheckSocketsReady(0);
01434 }
01435 CmiStdoutCheck(CMK_PIPE_SUB);
01436 if (Cmi_charmrun_fd!=-1)
01437 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
01438 MACHSTATE(1,"} CheckSocketsReady")
01439 return nreadable;
01440 }
01441
01442
01443
01444
01445
01446 static void ServiceCharmrun_nolock()
01447 {
01448 int again = 1;
01449 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
01450 while (again)
01451 {
01452 again = 0;
01453 CheckSocketsReady(0);
01454 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
01455 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
01456 }
01457 MACHSTATE(2,"} ServiceCharmrun_nolock end")
01458 }
01459
01460
01461
01462 static void CommunicationServer(int sleepTime, int where){
01463 if( where == COMM_SERVER_FROM_INTERRUPT){
01464 #if CMK_IMMEDIATE_MSG
01465 CmiHandleImmediate();
01466 #endif
01467 return;
01468 }
01469 #if CMK_SMP
01470 if(where == COMM_SERVER_FROM_WORKER){
01471 return;
01472 }
01473 if(where == COMM_SERVER_FROM_SMP){
01474 #endif
01475 ServiceCharmrun_nolock();
01476 #if CMK_SMP
01477 }
01478 CmiCommLock();
01479 #endif
01480 CommunicationServer_nolock(0);
01481 #if CMK_SMP
01482 CmiCommUnlock();
01483 #endif
01484
01485
01486 #if CMK_IMMEDIATE_MSG
01487 if (where == COMM_SERVER_FROM_SMP) {
01488 CmiHandleImmediate();
01489 }
01490 #endif
01491 }
01492
01493
01494 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
01495
01496
01497 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
01498
01499 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
01500 char *newmsg;
01501
01502 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
01503
01504 OtherNode node = &nodes[nodeNo];
01505 newmsg = node->asm_msg;
01506
01508
01509 switch(node->infiData->state){
01510 case INFI_HEADER_DATA:
01511 {
01512 int size;
01513 int rank, srcpe, seqno, magic, i;
01514 unsigned int broot;
01515 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
01516 size = CmiMsgHeaderGetLength(msg);
01517 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
01518
01519
01520
01521
01522 if(len > size){
01523 CmiPrintf("size: %d, len:%d.\n", size, len);
01524 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
01525 }
01526 newmsg = (char *)CmiAlloc(size);
01527 _MEMCHECK(newmsg);
01528 memcpy(newmsg, msg, len);
01529 node->asm_rank = rank;
01530 node->asm_total = size;
01531 node->asm_fill = len;
01532 node->asm_msg = newmsg;
01533 node->infiData->broot = broot;
01534 if(len == size){
01535
01536 node->infiData->state = INFI_HEADER_DATA;
01537 }else{
01538
01539 node->infiData->state = INFI_DATA;
01540 }
01541 break;
01542 }
01543 case INFI_DATA:
01544 {
01545 if(node->asm_fill + len < node->asm_total && len != dataSize){
01546 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
01547 CmiAbort("packet in the middle does not have expected length");
01548 }
01549 if(node->asm_fill+len > node->asm_total){
01550 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
01551 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
01552 }
01553
01554 memcpy(newmsg + node->asm_fill,msg,len);
01555 node->asm_fill += len;
01556 if(node->asm_fill == node->asm_total){
01557 node->infiData->state = INFI_HEADER_DATA;
01558 }else{
01559 node->infiData->state = INFI_DATA;
01560 }
01561 break;
01562 }
01563 }
01566
01567 if(node->infiData->state == INFI_HEADER_DATA){
01568 int total_size = node->asm_total;
01569 node->asm_msg = NULL;
01570 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
01571 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
01572 }
01573
01574 };
01575
01576 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
01577 #if CMK_BROADCAST_SPANNING_TREE
01578 if (rank == DGRAM_BROADCAST
01579 #if CMK_NODE_QUEUE_AVAILABLE
01580 || rank == DGRAM_NODEBROADCAST
01581 #endif
01582 ){
01583 if(toBuffer){
01584 insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
01585 }else{
01586 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
01587 }
01588 }
01589 #elif CMK_BROADCAST_HYPERCUBE
01590 if (rank == DGRAM_BROADCAST
01591 #if CMK_NODE_QUEUE_AVAILABLE
01592 || rank == DGRAM_NODEBROADCAST
01593 #endif
01594 ){
01595 if(toBuffer){
01596 insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
01597 }else{
01598 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
01599 }
01600 }
01601 #endif
01602
01603 switch (rank) {
01604 case DGRAM_BROADCAST: {
01605 int i;
01606 for (i=1; i<_Cmi_mynodesize; i++){
01607 CmiPushPE(i, CopyMsg(newmsg, total_size));
01608 }
01609 CmiPushPE(0, newmsg);
01610 break;
01611 }
01612 #if CMK_NODE_QUEUE_AVAILABLE
01613 case DGRAM_NODEBROADCAST:
01614 case DGRAM_NODEMESSAGE: {
01615 CmiPushNode(newmsg);
01616 break;
01617 }
01618 #endif
01619 default:
01620 {
01621
01622 CmiPushPE(rank, newmsg);
01623 }
01624 }
01625 if(!toBuffer){
01626
01627 processAllBufferedMsgs();
01628
01629 }
01630 }
01631
01632
01633 static inline void increasePostedRecvs(int nodeNo);
01634 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
01635 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
01636
01637
01638
01639
01640 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
01641 struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;
01642 struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
01643 int nodeNo = header->nodeNo;
01644 #if CMK_IBVERBS_DEBUG
01645 OtherNode node = &nodes[nodeNo];
01646 #endif
01647
01648 int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
01649 #if CMK_IBVERBS_DEBUG
01650
01651
01652
01653
01654
01655
01656
01657
01658 MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
01659 #else
01660 MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);
01661 #endif
01662
01663 if(header->code & INFIPACKETCODE_DATA){
01664
01665 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
01666 }
01667 if(header->code & INFIDUMMYPACKET){
01668 MACHSTATE(3,"Dummy packet");
01669 }
01670 if(header->code & INFIBARRIERPACKET){
01671 MACHSTATE(3,"Barrier packet");
01672 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
01673 }
01674
01675 #if CMK_IBVERBS_INCTOKENS
01676 if(header->code & INFIPACKETCODE_INCTOKENS){
01677 increasePostedRecvs(nodeNo);
01678 }
01679 #endif
01680 if(rdma && header->code & INFIRDMA_START){
01681 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
01682
01683
01684 struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
01685 struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
01686 *copyPacket = *rdmaPacket;
01687 copyPacket->fromNodeNo = nodeNo;
01688 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
01689 context->bufferedRdmaRequests = copyPacket;
01690 copyPacket->next = tmp;
01691 copyPacket->prev = NULL;
01692 if(tmp != NULL){
01693 tmp->prev = copyPacket;
01694 }
01695
01696
01697
01698 }
01699 if(rdma && header->code & INFIRDMA_ACK){
01700 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
01701 processRdmaAck(rdmaPacket);
01702 }
01703
01704
01705
01706
01707 {
01708 struct ibv_sge list = {
01709 .addr = (uintptr_t) buffer->buf,
01710 .length = buffer->size,
01711 .lkey = buffer->key->lkey
01712 };
01713
01714 struct ibv_recv_wr wr = {
01715 .wr_id = (uint64_t)buffer,
01716 .sg_list = &list,
01717 .num_sge = 1,
01718 .next = NULL
01719 };
01720 struct ibv_recv_wr *bad_wr;
01721
01722 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
01723 CmiAssert(0);
01724 }
01725 }
01726
01727 };
01728
01729
01730
01731
01732 static inline void processSendWC(struct ibv_wc *sendWC){
01733
01734 infiPacket packet = (infiPacket )sendWC->wr_id;
01735 #if CMK_IBVERBS_TOKENS_FLOW
01736
01737 context->tokensLeft++;
01738 #endif
01739
01740 MACHSTATE2(3,"Packet send complete node %d tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
01741 if(packet->ogm != NULL){
01742 packet->ogm->refcount--;
01743 if(packet->ogm->refcount == 0){
01744 GarbageCollectMsg(packet->ogm);
01745 }
01746 }else{
01747 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code == INFIDUMMYPACKET){
01748 if (packet->buf) CmiFree(packet->buf);
01749 }
01750 }
01751
01752 FreeInfiPacket(packet);
01753 };
01754
01755
01756
01757
01758 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
01759 int nodeNo = fromNodeNo;
01760 OtherNode node = &nodes[nodeNo];
01761 struct infiRdmaPacket *rdmaPacket;
01762
01763 getFreeTokens(node->infiData);
01764 #if CMK_IBVERBS_TOKENS_FLOW
01765
01766 context->tokensLeft--;
01767 #if CMK_IBVERBS_STATS
01768 if(context->tokensLeft < minTokensLeft){
01769 minTokensLeft = context->tokensLeft;
01770 }
01771 #endif
01772 #endif
01773
01774 struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
01775
01776
01777
01778 if(isBuffered){
01779 rdmaPacket = _rdmaPacket;
01780 }else{
01781 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
01782 *rdmaPacket = *_rdmaPacket;
01783 }
01784
01785
01786 rdmaPacket->fromNodeNo = fromNodeNo;
01787 rdmaPacket->localBuffer = (void *)buffer;
01788
01789 buffer->type = BUFFER_RDMA;
01790 buffer->size = rdmaPacket->remoteSize;
01791
01792 buffer->buf = (char *)CmiAlloc(rdmaPacket->remoteSize);
01793
01794
01795 buffer->key = METADATAFIELD(buffer->buf)->key;
01796
01797
01798 MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
01799 MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
01800
01801
01802 {
01803 struct ibv_sge list = {
01804 .addr = (uintptr_t )buffer->buf,
01805 .length = buffer->size,
01806 .lkey = buffer->key->lkey
01807 };
01808
01809 struct ibv_send_wr *bad_wr;
01810 struct ibv_send_wr wr = {
01811 .wr_id = (uint64_t )rdmaPacket,
01812 .sg_list = &list,
01813 .num_sge = 1,
01814 .opcode = IBV_WR_RDMA_READ,
01815 .send_flags = IBV_SEND_SIGNALED,
01816 .wr.rdma = {
01817 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
01818 .rkey = rdmaPacket->key.rkey
01819 }
01820 };
01822 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
01823 CmiAssert(0);
01824 }
01825 }
01826
01827 };
01828
01829 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
01830 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
01831
01832 static inline void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
01833
01834 #if CMK_IBVERBS_STATS
01835 double _startRegTime;
01836 #endif
01837
01838 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
01839
01840
01841
01842
01843
01844 struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
01845
01846
01847
01848
01849
01850
01851
01852 {
01853 int size;
01854 int rank, srcpe, seqno, magic, i;
01855 unsigned int broot;
01856 char *msg = buffer->buf;
01857 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
01858 size = CmiMsgHeaderGetLength(msg);
01859
01860 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
01861 }
01862 MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
01863
01864
01865 free(buffer);
01866
01867 OtherNode node=&nodes[rdmaPacket->fromNodeNo];
01868
01869
01870 #if CMK_IBVERBS_TOKENS_FLOW
01871
01872 context->tokensLeft++;
01873 #endif
01874
01875
01876 if(toBuffer){
01877 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
01878 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
01879 context->bufferedRdmaAcks = rdmaPacket;
01880 rdmaPacket->next = tmp;
01881 rdmaPacket->prev = NULL;
01882 if(tmp != NULL){
01883 tmp->prev = rdmaPacket;
01884 }
01885 }else{
01886 EnqueueRdmaAck(rdmaPacket);
01887 free(rdmaPacket);
01888 }
01889 }
01890
01891 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
01892 infiPacket packet;
01893 OtherNode node=&nodes[rdmaPacket->fromNodeNo];
01894
01895
01896 MallocInfiPacket(packet);
01897 {
01898 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
01899 *ackPacket = *rdmaPacket;
01900 packet->size = sizeof(struct infiRdmaPacket);
01901 packet->buf = (char *)ackPacket;
01902 packet->header.code = INFIRDMA_ACK;
01903 packet->ogm=NULL;
01904
01905 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
01906
01907
01908 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
01909 }
01910 };
01911
01912
01913 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
01914 MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
01915 rdmaPacket->ogm->refcount--;
01916 GarbageCollectMsg(rdmaPacket->ogm);
01917 }
01918
01919
01920
01921
01922
01923
01924
01925
01926
01927 static inline infiBufferedBcastPool createBcastPool(){
01928 int i;
01929 infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
01930 ret->count = 0;
01931 ret->next = ret->prev = NULL;
01932 for(i=0;i<BCASTLIST_SIZE;i++){
01933 ret->bcastList[i].valid = 0;
01934 }
01935 return ret;
01936 };
01937
01938
01939
01940
01941
01942
01943
01944
01945 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
01946 if(context->bufferedBcastList == NULL){
01947 context->bufferedBcastList = createBcastPool();
01948 }else{
01949 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
01950 infiBufferedBcastPool tmp;
01951 tmp = createBcastPool();
01952 context->bufferedBcastList->prev = tmp;
01953 tmp->next = context->bufferedBcastList;
01954 context->bufferedBcastList = tmp;
01955 }
01956 }
01957 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
01958 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
01959 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
01960 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
01961 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
01962
01963 MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
01964
01965 context->bufferedBcastList->count++;
01966 }
01967
01968
01969
01970
01971
01972 static inline void processBufferedBcast(){
01973 infiBufferedBcastPool start;
01974
01975 if(context->bufferedBcastList == NULL){
01976 return;
01977 }
01978 start = context->bufferedBcastList;
01979 if(context->insideProcessBufferedBcasts==1){
01980 return;
01981 }
01982 context->insideProcessBufferedBcasts=1;
01983
01984 while(start->next != NULL){
01985 start = start->next;
01986 }
01987
01988 while(start != NULL){
01989 int i=0;
01990 infiBufferedBcastPool tmp;
01991 if(start->count != 0){
01992 MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
01993 }
01994 for(i=0;i<start->count;i++){
01995 if(start->bcastList[i].valid == 0){
01996 continue;
01997 }
01998 start->bcastList[i].valid=0;
01999 MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
02000 #if CMK_BROADCAST_SPANNING_TREE
02001 if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
02002 #if CMK_NODE_QUEUE_AVAILABLE
02003 || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
02004 #endif
02005 ){
02006 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
02007 CmiFree(start->bcastList[i].msg);
02008 }
02009 #elif CMK_BROADCAST_HYPERCUBE
02010 if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
02011 #if CMK_NODE_QUEUE_AVAILABLE
02012 || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
02013 #endif
02014 ){
02015 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
02016 CmiFree(start->bcastList[i].msg);
02017 }
02018 #endif
02019 }
02020 if(start->count != 0){
02021 MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
02022 }
02023
02024 tmp = start;
02025 start = start->prev;
02026 free(tmp);
02027 if(start != NULL){
02028
02029 start->next = NULL;
02030 }
02031 }
02032
02033 context->bufferedBcastList = NULL;
02034
02035
02036 context->insideProcessBufferedBcasts=0;
02037 MACHSTATE(2,"processBufferedBcast done ");
02038 };
02039
02040
02041 static inline void processBufferedRdmaAcks(){
02042 struct infiRdmaPacket *start = context->bufferedRdmaAcks;
02043 if(start == NULL){
02044 return;
02045 }
02046 while(start->next != NULL){
02047 start = start->next;
02048 }
02049 while(start != NULL){
02050 struct infiRdmaPacket *rdmaPacket=start;
02051 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
02052 EnqueueRdmaAck(rdmaPacket);
02053 start = start->prev;
02054 free(rdmaPacket);
02055 }
02056 context->bufferedRdmaAcks=NULL;
02057 }
02058
02059
02060
02061 static inline void processBufferedRdmaRequests(){
02062 struct infiRdmaPacket *start = context->bufferedRdmaRequests;
02063 if(start == NULL){
02064 return;
02065 }
02066
02067
02068 while(start->next != NULL){
02069 start = start->next;
02070 }
02071 while(start != NULL){
02072 struct infiRdmaPacket *rdmaPacket=start;
02073 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
02074 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
02075 start = start->prev;
02076 }
02077
02078 context->bufferedRdmaRequests=NULL;
02079 }
02080
02081
02082
02083
02084
02085 static inline void processAllBufferedMsgs(){
02086 #if CMK_IBVERBS_STATS
02087 double _startTime = CmiWallTimer();
02088 processBufferedCount++;
02089 #endif
02090 processBufferedBcast();
02091
02092 processBufferedRdmaAcks();
02093 processBufferedRdmaRequests();
02094 #if CMK_IBVERBS_STATS
02095 processBufferedTime += (CmiWallTimer()-_startTime);
02096 #endif
02097 };
02098
02099
02100
02101
02102
02103 static inline void increaseTokens(OtherNode node){
02104 int err;
02105 int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
02106 if(node->infiData->totalTokens + increase > maxTokens){
02107 increase = maxTokens-node->infiData->totalTokens;
02108 }
02109 node->infiData->totalTokens += increase;
02110 node->infiData->tokensLeft += increase;
02111 MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
02112
02113 int currentCqSize = context->sendCqSize;
02114 if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
02115 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
02116 CmiAssert(0);
02117 }
02118 context->sendCqSize+= increase;
02119 };
02120
02121 static void increasePostedRecvs(int nodeNo){
02122 OtherNode node = &nodes[nodeNo];
02123 int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;
02124 int recvIncrease = tokenIncrease;
02125 if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
02126 tokenIncrease = maxTokens - node->infiData->postedRecvs;
02127 }
02128 if(tokenIncrease+context->srqSize > maxRecvBuffers){
02129 recvIncrease = maxRecvBuffers-context->srqSize;
02130 }
02131 node->infiData->postedRecvs+= recvIncrease;
02132 context->srqSize += recvIncrease;
02133 MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
02134
02135 int currentCqSize = context->recvCqSize;
02136 if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
02137 CmiAssert(0);
02138 }
02139 context->recvCqSize += tokenIncrease;
02140 if(recvIncrease > 0){
02141
02142 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
02143 newPool->next = context->recvBufferPool;
02144 context->recvBufferPool = newPool;
02145 postInitialRecvs(newPool,recvIncrease,packetSize);
02146 }
02147
02148 };
02149
02150
02151
02152
02153
02154
02155
02156
02157
02164 static void initInfiCmiChunkPools(){
02165 int i,j;
02166 int size = firstBinSize;
02167 int nodeSize;
02168
02169 #if THREAD_MULTI_POOL
02170 nodeSize = CmiMyNodeSize() + 1;
02171 infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
02172 for(i = 0; i < nodeSize; i++){
02173 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
02174 }
02175 for(j = 0; j < nodeSize; j++){
02176 size = firstBinSize;
02177 for(i=0;i<INFINUMPOOLS;i++){
02178 infiCmiChunkPools[j][i].size = size;
02179 infiCmiChunkPools[j][i].startBuf = NULL;
02180 infiCmiChunkPools[j][i].count = 0;
02181 size *= 2;
02182 }
02183 }
02184
02185
02186 queuePool = malloc(sizeof(PCQueue *) * nodeSize);
02187 for(i = 0; i < nodeSize; i++){
02188 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
02189 }
02190 for(i = 0; i < nodeSize; i++)
02191 for(j = 0; j < nodeSize; j++)
02192 queuePool[i][j] = PCQueueCreate();
02193
02194 #else
02195
02196 size = firstBinSize;
02197 for(i=0;i<INFINUMPOOLS;i++){
02198 infiCmiChunkPools[i].size = size;
02199 infiCmiChunkPools[i].startBuf = NULL;
02200 infiCmiChunkPools[i].count = 0;
02201 size *= 2;
02202 }
02203 #endif
02204
02205 }
02206
02207
02208
02209
02210 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
02211 infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
02212 char *res=msg-sizeof(infiCmiChunkHeader);
02213 metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02214 #if CMK_IBVERBS_STATS
02215 numCurReg++;
02216 numReg++;
02217 numMultiSend++;
02218 #endif
02219 CmiAssert(metaData->key!=NULL);
02220 metaData->owner = NULL;
02221 metaData->poolIdx = INFIMULTIPOOL;
02222
02223 return metaData;
02224 };
02225
02226
02227 #if THREAD_MULTI_POOL
02228
02229
02230 static inline void fillBufferPools(){
02231 int nodeSize, poolIdx, thread;
02232 infiCmiChunkMetaData *metaData;
02233 infiCmiChunkHeader *hdr;
02234 int allocSize;
02235 int count=1;
02236 int i;
02237 struct ibv_mr *key;
02238 void *res;
02239
02240
02241 nodeSize = CmiMyNodeSize() + 1;
02242
02243
02244 for(thread = 0; thread < nodeSize; thread++){
02245 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
02246 allocSize = infiCmiChunkPools[thread][poolIdx].size;
02247 if(poolIdx < blockThreshold){
02248 count = blockAllocRatio;
02249 }else{
02250 count = 1;
02251 }
02252 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
02253 hdr = res;
02254 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02255 CmiAssert(key != NULL);
02256 #if CMK_IBVERBS_STATS
02257 numCurReg++;
02258 numReg++;
02259 #endif
02260 res += sizeof(infiCmiChunkHeader);
02261 for(i=0;i<count;i++){
02262 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
02263 metaData->key = key;
02264 metaData->owner = hdr;
02265 metaData->poolIdx = poolIdx;
02266 metaData->parentPe = thread;
02267 if(i == 0){
02268 metaData->owner->metaData->count = count;
02269 metaData->nextBuf = NULL;
02270 }else{
02271 void *startBuf = res - sizeof(infiCmiChunkHeader);
02272 metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
02273 infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
02274 infiCmiChunkPools[thread][poolIdx].count++;
02275 }
02276 if(i != count-1){
02277 res += (allocSize+sizeof(infiCmiChunkHeader));
02278 }
02279 }
02280 }
02281 }
02282 }
02283
02284 static inline void *getInfiCmiChunkThread(int dataSize){
02285
02286
02287 int ratio = dataSize/firstBinSize;
02288 int poolIdx=0;
02289 void *res;
02290 int i,j,nodeSize;
02291 void *pointer;
02292
02293
02294 MACHSTATE1(2,"Rank=%d",CmiMyRank());
02295 MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
02296
02297 while(ratio > 0){
02298 ratio = ratio >> 1;
02299 poolIdx++;
02300 }
02301 MACHSTATE1(2,"This is %d",CmiMyRank());
02302 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
02303
02304
02305 nodeSize = CmiMyNodeSize() + 1;
02306 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
02307 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
02308 for(i = 0; i < nodeSize; i++){
02309 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
02310 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
02311 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
02312 infi_CmiFreeDirect(pointer);
02313 }
02314 }
02315 }
02316 }
02317
02318 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
02319 infiCmiChunkMetaData *metaData;
02320 infiCmiChunkHeader *hdr;
02321 int allocSize;
02322 int count=1;
02323 int i;
02324 struct ibv_mr *key;
02325 void *origres;
02326
02327
02328 if(poolIdx < INFINUMPOOLS ){
02329 allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
02330 }else{
02331 allocSize = dataSize;
02332 }
02333
02334 if(poolIdx < blockThreshold){
02335 count = blockAllocRatio;
02336 }
02337 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
02338 _MEMCHECK(res);
02339 hdr = res;
02340
02341 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02342 CmiAssert(key != NULL);
02343 #if CMK_IBVERBS_STATS
02344 numCurReg++;
02345 numReg++;
02346 #endif
02347
02348 origres = (res += sizeof(infiCmiChunkHeader));
02349
02350 for(i=0;i<count;i++){
02351 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
02352 _MEMCHECK(metaData);
02353 metaData->key = key;
02354 metaData->owner = hdr;
02355 metaData->poolIdx = poolIdx;
02356 metaData->parentPe = CmiMyRank();
02357
02358 if(i == 0){
02359 metaData->owner->metaData->count = count;
02360 metaData->nextBuf = NULL;
02361 }else{
02362 void *startBuf = res - sizeof(infiCmiChunkHeader);
02363 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
02364 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
02365 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
02366
02367 }
02368 if(i != count-1){
02369 res += (allocSize+sizeof(infiCmiChunkHeader));
02370 }
02371 }
02372
02373
02374 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
02375
02376 return origres;
02377 }
02378 if(poolIdx < INFINUMPOOLS){
02379 infiCmiChunkMetaData *metaData;
02380
02381 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
02382 res += sizeof(infiCmiChunkHeader);
02383
02384 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
02385 metaData = METADATAFIELD(res);
02386
02387 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
02388 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
02389
02390 metaData->nextBuf = NULL;
02391
02392
02393 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
02394 return res;
02395 }
02396
02397 CmiAssert(0);
02398
02399
02400 };
02401 #else
02402 static inline void *getInfiCmiChunk(int dataSize){
02403
02404
02405 int ratio = dataSize/firstBinSize;
02406 int poolIdx=0;
02407 char *res;
02408 #if CMK_IBVERBS_STATS
02409 if(numAlloc>10000 && numAlloc%1000==0)
02410 {
02411 printf("[%d] numReg %d numUnReg %d numCurReg %d numAlloc %d numFree %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, numAlloc, numFree, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
02412
02413 }
02414 #endif
02415 while(ratio > 0){
02416 ratio = ratio >> 1;
02417 poolIdx++;
02418 }
02419 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
02420 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
02421 infiCmiChunkMetaData *metaData;
02422 infiCmiChunkHeader *hdr;
02423 int allocSize;
02424 int count=1;
02425 int i;
02426 struct ibv_mr *key;
02427 void *origres;
02428
02429
02430 if(poolIdx < INFINUMPOOLS ){
02431 allocSize = infiCmiChunkPools[poolIdx].size;
02432 }else{
02433 allocSize = dataSize;
02434 }
02435
02436 if(poolIdx < blockThreshold){
02437 count = blockAllocRatio;
02438 }
02439 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
02440 hdr = (infiCmiChunkHeader *)res;
02441
02442 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02443 CmiAssert(key != NULL);
02444 #if CMK_IBVERBS_STATS
02445 numCurReg++;
02446 numReg++;
02447 #endif
02448 origres = (res += sizeof(infiCmiChunkHeader));
02449
02450 for(i=0;i<count;i++){
02451 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
02452 metaData->key = key;
02453 metaData->owner = hdr;
02454 metaData->poolIdx = poolIdx;
02455
02456 if(i == 0){
02457 metaData->owner->metaData->count = count;
02458 metaData->nextBuf = NULL;
02459 }else{
02460 void *startBuf = res - sizeof(infiCmiChunkHeader);
02461 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
02462 infiCmiChunkPools[poolIdx].startBuf = startBuf;
02463 infiCmiChunkPools[poolIdx].count++;
02464
02465 }
02466 if(i != count-1){
02467 res += (allocSize+sizeof(infiCmiChunkHeader));
02468 }
02469 }
02470
02471
02472 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
02473
02474 return origres;
02475 }
02476 if(poolIdx < INFINUMPOOLS){
02477 infiCmiChunkMetaData *metaData;
02478
02479 res = infiCmiChunkPools[poolIdx].startBuf;
02480 res += sizeof(infiCmiChunkHeader);
02481
02482 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
02483 metaData = METADATAFIELD(res);
02484
02485 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
02486 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
02487
02488 metaData->nextBuf = NULL;
02489
02490
02491 infiCmiChunkPools[poolIdx].count--;
02492 return res;
02493 }
02494
02495 CmiAssert(0);
02496
02497
02498 };
02499 #endif
02500
02501
02502 void * infi_CmiAlloc(int size){
02503 char *res;
02504 #if CMK_IBVERBS_STATS
02505 numAlloc++;
02506 #endif
02507 if (Cmi_charmrun_fd == -1) return malloc(size);
02508 #if THREAD_MULTI_POOL
02509 res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
02510 res -= sizeof(CmiChunkHeader);
02511
02512 return res;
02513 #else
02514 #if CMK_SMP
02515 CmiMemLock();
02516 #endif
02517
02518 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
02519
02520 res = (char*)getInfiCmiChunk(size-sizeof(CmiChunkHeader));
02521 res -= sizeof(CmiChunkHeader);
02522 #if CMK_SMP
02523 CmiMemUnlock();
02524 #endif
02525
02526
02527
02528
02529 return res;
02530 #endif
02531 }
02532
02533 #if THREAD_MULTI_POOL
02534
02535 void infi_CmiFreeDirect(void *ptr){
02536 int size;
02537 int parentPe;
02538 void *freePtr = ptr;
02539 #if CMK_IBVERBS_STATS
02540 numFree++;
02541 #endif
02542
02543
02544 size = SIZEFIELD (ptr);
02545
02546 infiCmiChunkMetaData *metaData;
02547 int poolIdx;
02548
02549 freePtr = ptr - sizeof(infiCmiChunkHeader);
02550 metaData = METADATAFIELD(ptr);
02551 poolIdx = metaData->poolIdx;
02552 infiCmiChunkPool *pool = infiCmiChunkPools[CmiMyRank()] + poolIdx;
02553 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
02554
02555 if(poolIdx < INFINUMPOOLS && pool->count < INFIMAXPERPOOL &&
02556 pool->count < ((1 << INFINUMPOOLS) >> poolIdx) ){
02557 metaData->nextBuf = pool->startBuf;
02558 pool->startBuf = freePtr;
02559 pool->count++;
02560 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,pool->startBuf,pool->count);
02561 }else{
02562 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
02563 metaData->owner->metaData->count--;
02564 if(metaData->owner->metaData == metaData){
02565
02566 if(metaData->owner->metaData->count == 0){
02567
02568 int unregstat=ibv_dereg_mr(metaData->key);
02569 #if CMK_IBVERBS_STATS
02570 numUnReg++;
02571 numCurReg--;
02572 #endif
02573
02574 CmiAssert(unregstat==0);
02575 free(freePtr);
02576 free(metaData);
02577 }
02578
02579
02580 }else{
02581 if(metaData->owner->metaData->count == 0){
02582
02583 freePtr = metaData->owner;
02584 int unregstat=ibv_dereg_mr(metaData->key);
02585 #if CMK_IBVERBS_STATS
02586 numUnReg++;
02587 numCurReg--;
02588 #endif
02589
02590 CmiAssert(unregstat==0);
02591 free(metaData->owner->metaData);
02592 free(freePtr);
02593 }
02594 free(metaData);
02595 }
02596 }
02597 }
02598
02599
02600 void infi_CmiFree(void *ptr){
02601
02602 int i,j;
02603 int size;
02604 int parentPe;
02605 int nodeSize;
02606 void *pointer;
02607 void *freePtr = ptr;
02608 nodeSize = CmiMyNodeSize() + 1;
02609
02610 MACHSTATE(3,"Freeing");
02611
02612 ptr += sizeof(CmiChunkHeader);
02613 size = SIZEFIELD (ptr);
02614
02615 infiCmiChunkMetaData *metaData;
02616 int poolIdx;
02617
02618 freePtr = ptr - sizeof(infiCmiChunkHeader);
02619 metaData = METADATAFIELD(ptr);
02620 poolIdx = metaData->poolIdx;
02621
02622 if(poolIdx == INFIMULTIPOOL){
02626 #if CMK_IBVERBS_STATS
02627 numMultiSendFree++;
02628 #endif
02629 return;
02630 }
02631
02632
02633
02634 parentPe = metaData->parentPe;
02635 if(parentPe != CmiMyRank()){
02636 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
02637 return;
02638 }
02639
02640
02641 infi_CmiFreeDirect(ptr);
02642
02643 }
02644
02645 #else
02646 void infi_CmiFree(void *ptr){
02647 int size;
02648 void *freePtr = ptr;
02649 #if CMK_IBVERBS_STATS
02650 numFree++;
02651 #endif
02652
02653 if (Cmi_charmrun_fd == -1) return free(ptr);
02654 #if CMK_SMP
02655 CmiMemLock();
02656 #endif
02657 ptr += sizeof(CmiChunkHeader);
02658 size = SIZEFIELD (ptr);
02659
02660 infiCmiChunkMetaData *metaData;
02661 int poolIdx;
02662
02663 freePtr = (char*)ptr - sizeof(infiCmiChunkHeader);
02664 metaData = METADATAFIELD(ptr);
02665 poolIdx = metaData->poolIdx;
02666 if(poolIdx == INFIMULTIPOOL){
02670 #if CMK_IBVERBS_STATS
02671 numMultiSendFree++;
02672 #endif
02673 return;
02674 }
02675 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
02676
02677 if(poolIdx < INFINUMPOOLS &&
02678 infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL &&
02679 infiCmiChunkPools[poolIdx].count < ((1 << INFINUMPOOLS) >> poolIdx) ){
02680 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
02681 infiCmiChunkPools[poolIdx].startBuf = freePtr;
02682 infiCmiChunkPools[poolIdx].count++;
02683
02684 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
02685 }else{
02686 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
02687 metaData->owner->metaData->count--;
02688 if(metaData->owner->metaData == metaData){
02689
02690 if(metaData->owner->metaData->count == 0){
02691
02692 int unregstat=ibv_dereg_mr(metaData->key);
02693 #if CMK_IBVERBS_STATS
02694 numUnReg++;
02695 numCurReg--;
02696 #endif
02697
02698 CmiAssert(unregstat==0);
02699 free(freePtr);
02700 free(metaData);
02701 }
02702
02703
02704 }else{
02705 if(metaData->owner->metaData->count == 0){
02706
02707 freePtr = metaData->owner;
02708 int unregstat=ibv_dereg_mr(metaData->key);
02709 #if CMK_IBVERBS_STATS
02710 numUnReg++;
02711 numCurReg--;
02712 #endif
02713
02714 CmiAssert(unregstat==0);
02715 free(metaData->owner->metaData);
02716 free(freePtr);
02717 }
02718 free(metaData);
02719 }
02720 }
02721 #if CMK_SMP
02722 CmiMemUnlock();
02723 #endif
02724
02725
02726
02727 }
02728 #endif
02729
02730
02731
02732
02733
02734
02735
02736
02737 struct infiDirectRequestPacket{
02738 int senderProc;
02739 int handle;
02740 struct ibv_mr senderKey;
02741 void *senderBuf;
02742 int senderBufSize;
02743 };
02744
02745 #include "cmidirect.h"
02746
02747 #define MAXHANDLES 512
02748
02749 struct infiDirectHandleStruct;
02750
02751
02752 typedef struct directPollingQNodeStruct {
02753 struct infiDirectHandleStruct *handle;
02754 struct directPollingQNodeStruct *next;
02755 double *lastDouble;
02756 } directPollingQNode;
02757
02758 typedef struct infiDirectHandleStruct{
02759 int id;
02760 void *buf;
02761 int size;
02762 struct ibv_mr *key;
02763 void (*callbackFnPtr)(void *);
02764 void *callbackData;
02765
02766 struct infiDirectUserHandle userHandle;
02767 struct infiRdmaPacket *rdmaPacket;
02768 directPollingQNode pollingQNode;
02769 } infiDirectHandle;
02770
02771 typedef struct infiDirectHandleTableStruct{
02772 infiDirectHandle handles[MAXHANDLES];
02773 struct infiDirectHandleTableStruct *next;
02774 } infiDirectHandleTable;
02775
02776
02777
02778
02779 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
02780
02781 static infiDirectHandleTable **sendHandleTable=NULL;
02782 static infiDirectHandleTable **recvHandleTable=NULL;
02783
02784 static int *recvHandleCount=NULL;
02785
02786 void addHandleToPollingQ(infiDirectHandle *handle){
02787
02788 directPollingQNode *newNode = &(handle->pollingQNode);
02789 newNode->handle = handle;
02790 newNode->next = NULL;
02791 if(headDirectPollingQ==NULL){
02792
02793 headDirectPollingQ = newNode;
02794 tailDirectPollingQ = newNode;
02795 }else{
02796 tailDirectPollingQ->next = newNode;
02797 tailDirectPollingQ = newNode;
02798 }
02799 };
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817
02818 static inline infiDirectHandleTable **createHandleTable(){
02819 infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
02820 int i;
02821 for(i=0;i<_Cmi_numnodes;i++){
02822 table[i] = NULL;
02823 }
02824 return table;
02825 }
02826
02827 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
02828 *tableIdx = handle/MAXHANDLES;
02829 *idx = handle%MAXHANDLES;
02830 };
02831
02832 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
02833 {
02835 int index = recvBufSize - sizeof(double);
02836 double *lastDouble = (double *)(((char *)recvBuf)+index);
02837 *lastDouble = initialValue;
02838 }
02839
02840
02844 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
02845 int newHandle;
02846 int tableIdx,idx;
02847 int i;
02848 infiDirectHandleTable *table;
02849 struct infiDirectUserHandle userHandle;
02850
02851 CmiAssert(recvBufSize > sizeof(double));
02852
02853 if(recvHandleTable == NULL){
02854 recvHandleTable = createHandleTable();
02855 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
02856 for(i=0;i<_Cmi_numnodes;i++){
02857 recvHandleCount[i] = -1;
02858 }
02859 }
02860 if(recvHandleTable[senderNode] == NULL){
02861 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
02862 recvHandleTable[senderNode]->next = NULL;
02863 }
02864
02865 newHandle = ++recvHandleCount[senderNode];
02866 CmiAssert(newHandle >= 0);
02867
02868 calcHandleTableIdx(newHandle,&tableIdx,&idx);
02869
02870 table = recvHandleTable[senderNode];
02871 for(i=0;i<tableIdx;i++){
02872 if(table->next ==NULL){
02873 table->next = malloc(sizeof(infiDirectHandleTable));
02874 table->next->next = NULL;
02875 }
02876 table = table->next;
02877 }
02878 table->handles[idx].id = newHandle;
02879 table->handles[idx].buf = recvBuf;
02880 table->handles[idx].size = recvBufSize;
02881 #if CMI_DIRECT_DEBUG
02882 CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
02883 #endif
02884 table->handles[idx].callbackFnPtr = callbackFnPtr;
02885 table->handles[idx].callbackData = callbackData;
02886 table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02887 CmiAssert(table->handles[idx].key != NULL);
02888 #if CMK_IBVERBS_STATS
02889 numCurReg++;
02890 numReg++;
02891 #endif
02892
02893
02894
02895
02896 userHandle.handle = newHandle;
02897 userHandle.recverNode = _Cmi_mynode;
02898 userHandle.senderNode = senderNode;
02899 userHandle.recverBuf = recvBuf;
02900 userHandle.recverBufSize = recvBufSize;
02901 memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
02902 userHandle.initialValue = initialValue;
02903
02904 table->handles[idx].userHandle = userHandle;
02905
02906 initializeLastDouble(recvBuf,recvBufSize,initialValue);
02907
02908 {
02909 int index = table->handles[idx].size - sizeof(double);
02910 table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
02911 }
02912
02913 addHandleToPollingQ(&(table->handles[idx]));
02914
02915
02916
02917 return userHandle;
02918 }
02919
02920
02921
02922
02923 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
02924 int tableIdx,idx;
02925 int i;
02926 int handle = userHandle->handle;
02927 int recverNode = userHandle->recverNode;
02928
02929 infiDirectHandleTable *table;
02930
02931 if(sendHandleTable == NULL){
02932 sendHandleTable = createHandleTable();
02933 }
02934 if(sendHandleTable[recverNode] == NULL){
02935 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
02936 sendHandleTable[recverNode]->next = NULL;
02937 }
02938
02939 CmiAssert(handle >= 0);
02940 calcHandleTableIdx(handle,&tableIdx,&idx);
02941
02942 table = sendHandleTable[recverNode];
02943 for(i=0;i<tableIdx;i++){
02944 if(table->next ==NULL){
02945 table->next = malloc(sizeof(infiDirectHandleTable));
02946 table->next->next = NULL;
02947 }
02948 table = table->next;
02949 }
02950
02951 table->handles[idx].id = handle;
02952 table->handles[idx].buf = sendBuf;
02953
02954 table->handles[idx].size = sendBufSize;
02955 #if CMI_DIRECT_DEBUG
02956 CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
02957 #endif
02958 table->handles[idx].callbackFnPtr = NULL;
02959 table->handles[idx].callbackData = NULL;
02960 table->handles[idx].key = ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
02961 CmiAssert(table->handles[idx].key != NULL);
02962 #if CMK_IBVERBS_STATS
02963 numCurReg++;
02964 numReg++;
02965 #endif
02966 table->handles[idx].userHandle = *userHandle;
02967 CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
02968
02969 table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
02970 table->handles[idx].rdmaPacket->type = INFI_DIRECT;
02971 table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
02972
02973
02974
02975
02976
02977
02978
02979
02980
02981 MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
02982 };
02983
02984
02985
02986
02987
02988
02989
02990
02991 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
02992 int handle = userHandle->handle;
02993 int recverNode = userHandle->recverNode;
02994 if(recverNode == _Cmi_mynode){
02995
02996
02997
02998
02999 infiDirectHandleTable *senderTable;
03000 infiDirectHandleTable *recverTable;
03001
03002 int tableIdx,idx,i;
03003
03004
03005
03006 calcHandleTableIdx(handle,&tableIdx,&idx);
03007 CmiAssert(sendHandleTable!= NULL);
03008 senderTable = sendHandleTable[_Cmi_mynode];
03009 CmiAssert(senderTable != NULL);
03010 for(i=0;i<tableIdx;i++){
03011 senderTable = senderTable->next;
03012 }
03013
03015 recverTable = recvHandleTable[recverNode];
03016 CmiAssert(recverTable != NULL);
03017 for(i=0;i< tableIdx;i++){
03018 recverTable = recverTable->next;
03019 }
03020
03021 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
03022 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
03023 #if CMI_DIRECT_DEBUG
03024 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
03025 #endif
03026
03027
03028
03029
03030 }else{
03031 infiPacket packet;
03032 int tableIdx,idx;
03033 int i;
03034 OtherNode node;
03035 infiDirectHandleTable *table;
03036
03037 calcHandleTableIdx(handle,&tableIdx,&idx);
03038
03039 table = sendHandleTable[recverNode];
03040 CmiAssert(table != NULL);
03041 for(i=0;i<tableIdx;i++){
03042 table = table->next;
03043 }
03044
03045
03046 #if CMI_DIRECT_DEBUG
03047 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
03048 #endif
03049
03050
03051 {
03052
03053 OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
03054 struct ibv_sge list = {
03055 .addr = (uintptr_t )table->handles[idx].buf,
03056 .length = table->handles[idx].size,
03057 .lkey = table->handles[idx].key->lkey
03058 };
03059
03060 struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
03061
03062 struct ibv_send_wr *bad_wr;
03063 struct ibv_send_wr wr = {
03064 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
03065 .sg_list = &list,
03066 .num_sge = 1,
03067 .opcode = IBV_WR_RDMA_WRITE,
03068 .send_flags = IBV_SEND_SIGNALED,
03069
03070 .wr.rdma = {
03071 .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
03072 .rkey = remoteKey->rkey
03073 }
03074 };
03076 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
03077 CmiAssert(0);
03078 }
03079 }
03080
03081
03082
03083
03084
03085
03086
03087
03088 }
03089
03090 };
03091
03092
03093 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
03094 initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
03095 }
03096
03097
03098 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
03099 int handle = userHandle->handle;
03100 int tableIdx,idx,i;
03101 infiDirectHandleTable *table;
03102 calcHandleTableIdx(handle,&tableIdx,&idx);
03103
03104 table = recvHandleTable[userHandle->senderNode];
03105 CmiAssert(table != NULL);
03106 for(i=0;i<tableIdx;i++){
03107 table = table->next;
03108 }
03109 #if CMI_DIRECT_DEBUG
03110 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
03111 #endif
03112 addHandleToPollingQ(&(table->handles[idx]));
03113
03114
03115 }
03116
03117
03118 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
03119 int handle = userHandle->handle;
03120 int tableIdx,idx,i;
03121 infiDirectHandleTable *table;
03122
03123 initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
03124
03125 calcHandleTableIdx(handle,&tableIdx,&idx);
03126
03127 table = recvHandleTable[userHandle->senderNode];
03128 CmiAssert(table != NULL);
03129 for(i=0;i<tableIdx;i++){
03130 table = table->next;
03131 }
03132 #if CMI_DIRECT_DEBUG
03133 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
03134 #endif
03135 addHandleToPollingQ(&(table->handles[idx]));
03136
03137 }
03138
03139
03140 static int receivedDirectMessage(infiDirectHandle *handle){
03141
03142
03143 if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
03144 return 0;
03145 }else{
03146 (*(handle->callbackFnPtr))(handle->callbackData);
03147 return 1;
03148 }
03149
03150 }
03151
03152
03153 static void pollCmiDirectQ(){
03154 directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
03155 while(ptr != NULL){
03156 if(receivedDirectMessage(ptr->handle)){
03157 #if CMI_DIRECT_DEBUG
03158 CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
03159 #endif
03160 directPollingQNode *delPtr = ptr;
03162 if(prevPtr == NULL){
03164 if(headDirectPollingQ == tailDirectPollingQ){
03166 headDirectPollingQ = tailDirectPollingQ = NULL;
03167 }else{
03168 headDirectPollingQ = headDirectPollingQ->next;
03169 }
03170 }else{
03171 if(ptr == tailDirectPollingQ){
03173 tailDirectPollingQ = prevPtr;
03174 }
03175 prevPtr->next = ptr->next;
03176 }
03177 ptr = ptr->next;
03178
03179 }else{
03180 prevPtr = ptr;
03181 ptr = ptr->next;
03182 }
03183 }
03184 }
03185
03186
03187
03188
03189
03190
03191
03192
03193
03194
03195
03196
03197
03198
03199
03200
03201
03202
03203
03204
03205
03206
03207
03208
03209
03210
03211
03212
03213
03214
03215
03216
03217
03218
03219
03220
03221
03222
03223
03224
03225
03226
03227
03228
03229
03230
03231
03232
03233
03234
03235
03236
03237
03238
03239
03240
03241 #if 0
03242
03243
03244
03245 static void sendBarrierMessage(int pe)
03246 {
03247
03248 int size=32;
03249 OtherNode node = nodes + pe;
03250 infiPacket packet;
03251 MallocInfiPacket(packet);
03252 packet->size = size;
03253 packet->buf = CmiAlloc(size);
03254 packet->header.code = INFIBARRIERPACKET;
03255 struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
03256 MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
03257
03258 EnqueuePacket(node,packet,size,key);
03259 }
03260
03261 static void recvBarrierMessage()
03262 {
03263 int i;
03264 int ne;
03265
03266 struct ibv_wc wc[1];
03267 struct ibv_wc *recvWC;
03268
03269 int toBuffer=1;
03270 int barrierReached=0;
03271 struct infiBuffer *buffer = NULL;
03272 struct infiPacketHeader *header = NULL;
03273 int nodeNo=-1;
03274 int len=-1;
03275 while(!barrierReached)
03276 {
03277
03278 ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
03279
03280 if(ne != 0){
03281 MACHSTATE1(3,"recvBarrier ne %d",ne);
03282 }
03283 pollSendCq(1);
03284 for(i=0;i<ne;i++){
03285 if(wc[i].status != IBV_WC_SUCCESS){
03286 CmiAssert(0);
03287 }
03288 switch(wc[i].opcode){
03289 case IBV_WC_RECV:
03290 recvWC=&wc[i];
03291 buffer = (struct infiBuffer *) recvWC->wr_id;
03292 header = (struct infiPacketHeader *)buffer->buf;
03293 nodeNo = header->nodeNo;
03294 len = recvWC->byte_len-sizeof(struct infiPacketHeader);
03295
03296 if(header->code & INFIPACKETCODE_DATA){
03297 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
03298 }
03299 if(header->code & INFIDUMMYPACKET){
03300 MACHSTATE(3,"Dummy packet");
03301 }
03302 if(header->code & INFIBARRIERPACKET){
03303 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);
03304
03305 barrierReached=1;
03306
03307
03308
03309 }
03310 if(rdma && header->code & INFIRDMA_START){
03311 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
03312
03313
03314 struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
03315 struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
03316 *copyPacket = *rdmaPacket;
03317 copyPacket->fromNodeNo = nodeNo;
03318 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
03319 context->bufferedRdmaRequests = copyPacket;
03320 copyPacket->next = tmp;
03321 copyPacket->prev = NULL;
03322 if(tmp != NULL){
03323 tmp->prev = copyPacket;
03324 }
03325
03326
03327
03328 }
03329 if(rdma && header->code & INFIRDMA_ACK){
03330 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
03331 processRdmaAck(rdmaPacket);
03332 }
03333 {
03334 struct ibv_sge list = {
03335 .addr = (uintptr_t) buffer->buf,
03336 .length = buffer->size,
03337 .lkey = buffer->key->lkey
03338 };
03339
03340 struct ibv_recv_wr wr = {
03341 .wr_id = (uint64_t)buffer,
03342 .sg_list = &list,
03343 .num_sge = 1,
03344 .next = NULL
03345 };
03346 struct ibv_recv_wr *bad_wr;
03347
03348 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
03349 CmiAssert(0);
03350 }
03351 }
03352
03353 break;
03354 default:
03355 CmiAbort("Wrong type of work completion object in recvq");
03356 break;
03357 }
03358 }
03359 }
03360
03361
03362 }
03363
03364
03365
03366 int CmiBarrier()
03367 {
03368 int len, size, i;
03369 int status;
03370 int count = 0;
03371 OtherNode node;
03372 int numnodes = CmiNumNodes();
03373 if (CmiMyRank() == 0) {
03374
03375 if (CmiMyNode() != 0) {
03376 sendBarrierMessage(0);
03377 }
03378
03379 if (CmiMyNode() == 0)
03380 {
03381 for (count = 1; count < numnodes; count ++)
03382 {
03383 recvBarrierMessage();
03384 }
03385
03386 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
03387 int p = i;
03388 if (p > numnodes - 1) break;
03389
03390 sendBarrierMessage(p);
03391 }
03392 }
03393
03394 if (CmiMyNode() != 0)
03395 {
03396 recvBarrierMessage();
03397 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
03398 int p = CmiMyNode();
03399 p = BROADCAST_SPANNING_FACTOR*p + i;
03400 if (p > numnodes - 1) break;
03401 p = p%numnodes;
03402
03403 sendBarrierMessage(p);
03404 }
03405 }
03406 }
03407 CmiNodeAllBarrier();
03408 processAllBufferedMsgs();
03409
03410 }
03411
03412
03413 int CmiBarrierZero()
03414 {
03415 int i;
03416
03417 if (CmiMyRank() == 0) {
03418 if (CmiMyNode()) {
03419 sendBarrierMessage(0);
03420 }
03421 else {
03422 for (i=0; i<CmiNumNodes()-1; i++)
03423 {
03424 recvBarrierMessage();
03425 }
03426 }
03427 }
03428 CmiNodeAllBarrier();
03429 processAllBufferedMsgs();
03430 }
03431
03432
03433 #endif