00001
00021 #include <stdio.h>
00022 #include <stdlib.h>
00023 #include <unistd.h>
00024 #include <string.h>
00025 #include <sys/types.h>
00026 #include <sys/socket.h>
00027 #include <malloc.h>
00028 #include <getopt.h>
00029 #include <time.h>
00030
00031 #include <infiniband/verbs.h>
00032
00033 enum ibv_mtu mtu = IBV_MTU_2048;
00034 static int page_size;
00035 static int mtu_size;
00036 static int packetSize;
00037 static int dataSize;
00038 static int rdma;
00039 static int rdmaThreshold;
00040 static int firstBinSize;
00041 static int blockAllocRatio;
00042 static int blockThreshold;
00043
00044
00045 static int maxRecvBuffers;
00046 static int maxTokens;
00047
00048 static int sendPacketPoolSize;
00049
00050 static double _startTime=0;
00051 static int regCount;
00052
00053 static int pktCount;
00054 static int msgCount;
00055 static int minTokensLeft;
00056
00057
00058 static double regTime;
00059
00060 static double processBufferedTime;
00061 static int processBufferedCount;
00062
00063 #define CMK_IBVERBS_STATS 0
00064 #define CMK_IBVERBS_TOKENS_FLOW 1
00065 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on
00066 #define CMK_IBVERBS_DEBUG 0
00067 #define CMI_DIRECT_DEBUG 0
00068 #define WC_LIST_SIZE 32
00069
00070
00071
00072
00073
00074 #define INCTOKENS_FRACTION 0.04
00075 #define INCTOKENS_INCREASE .50
00076
00077
00078 #define THREAD_MULTI_POOL 0
00079
00080 #if THREAD_MULTI_POOL
00081 #include "pcqueue.h"
00082 PCQueue **queuePool;
00083 #endif
00084
00085 #define INFIBARRIERPACKET 128
00086
00087 struct infiIncTokenAckPacket{
00088 int a;
00089 };
00090
00091 typedef struct {
00092 char none;
00093 } CmiIdleState;
00094
00095
00096 static int TESTneighbor;
00097 static int TESTfrees;
00098
00099
00100
00101
00102
00103
00104 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00105
00106 static void CmiNotifyStillIdle(CmiIdleState *s);
00107
00108
00109 static void CmiNotifyBeginIdle(CmiIdleState *s)
00110 {
00111 CmiNotifyStillIdle(s);
00112 }
00113
00114 void CmiNotifyIdle(void) {
00115 CmiNotifyStillIdle(NULL);
00116 }
00117
00118
00119
00120
00121
00122
00123
00124
00125 #define INFIPACKETCODE_DATA 1
00126 #define INFIPACKETCODE_INCTOKENS 2
00127 #define INFIRDMA_START 4
00128 #define INFIRDMA_ACK 8
00129 #define INFIDIRECT_REQUEST 16
00130 #define INFIPACKETCODE_INCTOKENSACK 32
00131 #define INFIDUMMYPACKET 64
00132
00133 struct infiPacketHeader{
00134 char code;
00135 int nodeNo;
00136 #if CMK_IBVERBS_DEBUG
00137 int psn;
00138 #endif
00139 };
00140
00141
00142
00143
00144 #define INFI_MESG 1
00145 #define INFI_DIRECT 2
00146
00147 struct infiRdmaPacket{
00148 int fromNodeNo;
00149 int type;
00150 struct ibv_mr key;
00151 struct ibv_mr *keyPtr;
00152 int remoteSize;
00153 char *remoteBuf;
00154 void *localBuffer;
00155 OutgoingMsg ogm;
00156 struct infiRdmaPacket *next,*prev;
00157 };
00158
00159
00162 #define BUFFER_RECV 1
00163 #define BUFFER_RDMA 2
00164 struct infiBuffer{
00165 int type;
00166 char *buf;
00167 int size;
00168 struct ibv_mr *key;
00169 };
00170
00171
00172
00176 struct infiBufferPool{
00177 int numBuffers;
00178 struct infiBuffer *buffers;
00179 struct infiBufferPool *next;
00180 };
00181
00182
00183
00184
00185
00186
00187
00188 typedef struct infiPacketStruct {
00189 char *buf;
00190 int size;
00191 struct infiPacketHeader header;
00192 struct ibv_mr *keyHeader;
00193 struct OtherNodeStruct *destNode;
00194 struct infiPacketStruct *next;
00195 OutgoingMsg ogm;
00196 struct ibv_sge elemList[2];
00197 struct ibv_send_wr wr;
00198 }* infiPacket;
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208 #define BCASTLIST_SIZE 50
00209
00210 struct infiBufferedBcastStruct{
00211 char *msg;
00212 int size;
00213 int broot;
00214 int asm_rank;
00215 int valid;
00216 };
00217
00218 typedef struct infiBufferedBcastPoolStruct{
00219 struct infiBufferedBcastPoolStruct *next,*prev;
00220 struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
00221 int count;
00222
00223 } *infiBufferedBcastPool;
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 struct infiContext {
00234 struct ibv_context *context;
00235
00236 fd_set asyncFds;
00237 struct timeval tmo;
00238
00239 int ibPort;
00240
00241 struct ibv_pd *pd;
00242 struct ibv_cq *sendCq;
00243 struct ibv_cq *recvCq;
00244 struct ibv_srq *srq;
00245
00246 struct ibv_qp **qp;
00247
00248
00249
00250 struct infiAddr *localAddr;
00251
00252 infiPacket infiPacketFreeList;
00253
00254 struct infiBufferPool *recvBufferPool;
00255
00256 struct infiPacketHeader header;
00257
00258 int srqSize;
00259 int sendCqSize,recvCqSize;
00260 int tokensLeft;
00261
00262 infiBufferedBcastPool bufferedBcastList;
00263
00264 struct infiRdmaPacket *bufferedRdmaAcks;
00265
00266 struct infiRdmaPacket *bufferedRdmaRequests;
00267
00268
00269 int insideProcessBufferedBcasts;
00270 };
00271
00272 static struct infiContext *context;
00273
00274
00275
00276
00280 struct infiAddr {
00281 int lid,qpn,psn;
00282 };
00283
00288 enum { INFI_HEADER_DATA=21,INFI_DATA};
00289
00290 struct infiOtherNodeData{
00291 struct ibv_qp *qp ;
00292 int state;
00293 int totalTokens;
00294 int tokensLeft;
00295 int nodeNo;
00296
00297 int postedRecvs;
00298 int broot;
00299 #if CMK_IBVERBS_DEBUG
00300 int psn;
00301 int recvPsn;
00302 #endif
00303 };
00304
00305
00306
00307
00308
00309
00310 struct infiCmiChunkHeaderStruct;
00311
00312 typedef struct infiCmiChunkMetaDataStruct {
00313 struct ibv_mr *key;
00314 int poolIdx;
00315 void *nextBuf;
00316 struct infiCmiChunkHeaderStruct *owner;
00317 int count;
00318
00319 #if THREAD_MULTI_POOL
00320 int parentPe;
00321 #endif
00322 } infiCmiChunkMetaData;
00323
00324
00325
00326
00327 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
00328
00329 typedef struct {
00330 int size;
00331 void *startBuf;
00332 int count;
00333 } infiCmiChunkPool;
00334
00335 #define INFINUMPOOLS 20
00336 #define INFIMAXPERPOOL 100
00337 #define INFIMULTIPOOL -5
00338
00339 #if THREAD_MULTI_POOL
00340 infiCmiChunkPool **infiCmiChunkPools;
00341
00342 #else
00343 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
00344 #endif
00345
00346 static void initInfiCmiChunkPools();
00347
00348
00349 static inline infiPacket newPacket(){
00350 infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
00351 pkt->size = -1;
00352 pkt->header = context->header;
00353 pkt->next = NULL;
00354 pkt->destNode = NULL;
00355 pkt->keyHeader = METADATAFIELD(pkt)->key;
00356 pkt->ogm=NULL;
00357 CmiAssert(pkt->keyHeader!=NULL);
00358
00359 pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
00360 pkt->elemList[0].length = sizeof(struct infiPacketHeader);
00361 pkt->elemList[0].lkey = pkt->keyHeader->lkey;
00362
00363 pkt->wr.wr_id = (uint64_t)pkt;
00364 pkt->wr.sg_list = &(pkt->elemList[0]);
00365 pkt->wr.num_sge = 2;
00366 pkt->wr.opcode = IBV_WR_SEND;
00367 pkt->wr.send_flags = IBV_SEND_SIGNALED;
00368 pkt->wr.next = NULL;
00369
00370 return pkt;
00371 };
00372
00373 #define FreeInfiPacket(pkt){ \
00374 pkt->size = -1;\
00375 pkt->ogm=NULL;\
00376 pkt->next = context->infiPacketFreeList; \
00377 context->infiPacketFreeList = pkt; \
00378 }
00379
00380 #define MallocInfiPacket(pkt) { \
00381 infiPacket p = context->infiPacketFreeList; \
00382 if(p == NULL){ p = newPacket();} \
00383 else{context->infiPacketFreeList = p->next; } \
00384 pkt = p;\
00385 }
00386
00387
00388
00389
00390
00391 static inline int pollSendCq(const int toBuffer);
00392
00393 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
00394 static uint16_t getLocalLid(struct ibv_context *context, int port);
00395 static int checkQp(struct ibv_qp *qp){
00396 struct ibv_qp_attr attr;
00397 struct ibv_qp_init_attr init_attr;
00398
00399 ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP ,&init_attr);
00400 if(attr.cur_qp_state != IBV_QPS_RTS){
00401 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
00402 return 0;
00403 }
00404 return 1;
00405 }
00406 static void checkAllQps(){
00407 int i;
00408 for(i=0;i<_Cmi_numnodes;i++){
00409 if(i != _Cmi_mynode){
00410 if(!checkQp(nodes[i].infiData->qp)){
00411 pollSendCq(0);
00412 CmiAssert(0);
00413 }
00414 }
00415 }
00416 }
00417
00418 #if CMK_IBVERBS_FAST_START
00419 static void send_partial_init();
00420 #endif
00421
00422 static void CmiMachineInit(char **argv){
00423 struct ibv_device **devList;
00424 struct ibv_device *dev;
00425 int ibPort;
00426 int i;
00427 int calcMaxSize;
00428 infiPacket *pktPtrs;
00429 struct infiRdmaPacket **rdmaPktPtrs;
00430
00431 MACHSTATE(3,"CmiMachineInit {");
00432 MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
00433 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
00434
00435
00436
00437 devList = ibv_get_device_list(NULL);
00438 CmiAssert(devList != NULL);
00439
00440 dev = *devList;
00441 CmiAssert(dev != NULL);
00442
00443 ibPort=1;
00444
00445 MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
00446
00447 context = (struct infiContext *)malloc(sizeof(struct infiContext));
00448
00449 MACHSTATE1(3,"context allocated %p",context);
00450
00451
00452 context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
00453
00454 MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
00455
00456 context->ibPort = ibPort;
00457
00458 context->context = ibv_open_device(dev);
00459 CmiAssert(context->context != NULL);
00460
00461 MACHSTATE1(3,"device opened %p",context->context);
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471 context->pd = ibv_alloc_pd(context->context);
00472 CmiAssert(context->pd != NULL);
00473 MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
00474
00475
00476
00477
00478
00479
00480
00481 #if CMK_IBVERBS_FAST_START
00482 send_partial_init();
00483 #endif
00484
00485
00486 context->header.nodeNo = _Cmi_mynode;
00487
00488 mtu_size=1200;
00489 packetSize = mtu_size*4;
00490 dataSize = packetSize-sizeof(struct infiPacketHeader);
00491
00492 calcMaxSize=8000;
00493
00494
00495
00496
00497
00498
00499
00500 maxRecvBuffers=calcMaxSize;
00501 maxTokens = maxRecvBuffers;
00502
00503 context->tokensLeft=maxTokens;
00504
00505 if(_Cmi_numnodes > 1){
00506 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
00507 }
00508
00509
00510
00511 rdma=1;
00512
00513 rdmaThreshold=22000;
00514 firstBinSize = 120;
00515 CmiAssert(rdmaThreshold > firstBinSize);
00516 blockAllocRatio=16;
00517 blockThreshold=8;
00518
00519
00520 int ppn = 0;
00521 CmiGetArgInt(argv,"+ppn",&CmiMyNodeSize());
00522 MACHSTATE1(3,"CmiMyNodeSize %d",CmiMyNodeSize());
00523
00524 #if !THREAD_MULTI_POOL
00525 initInfiCmiChunkPools();
00526 #endif
00527
00528
00529 sendPacketPoolSize = maxTokens/2;
00530 if(sendPacketPoolSize > 2000){
00531 sendPacketPoolSize = 2000;
00532 }
00533
00534 context->infiPacketFreeList=NULL;
00535 pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
00536
00537
00538 #if !THREAD_MULTI_POOL
00539 for(i=0;i<sendPacketPoolSize;i++){
00540 MallocInfiPacket(pktPtrs[i]);
00541 }
00542
00543 for(i=0;i<sendPacketPoolSize;i++){
00544 FreeInfiPacket(pktPtrs[i]);
00545 }
00546 free(pktPtrs);
00547 #endif
00548
00549 context->bufferedBcastList=NULL;
00550 context->bufferedRdmaAcks = NULL;
00551 context->bufferedRdmaRequests = NULL;
00552 context->insideProcessBufferedBcasts=0;
00553
00554
00555 if(rdma){
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573 }
00574
00575
00576 #if CMK_IBVERBS_STATS
00577 regCount =0;
00578 regTime = 0;
00579
00580 pktCount=0;
00581 msgCount=0;
00582
00583 processBufferedCount=0;
00584 processBufferedTime=0;
00585
00586 minTokensLeft = maxTokens;
00587 #endif
00588
00589
00590
00591 MACHSTATE(3,"} CmiMachineInit");
00592 }
00593
00594 void CmiCommunicationInit(char **argv)
00595 {
00596 #if THREAD_MULTI_POOL
00597 initInfiCmiChunkPools();
00598 #endif
00599 }
00600
00601
00602
00603
00604 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
00605 int myLid;
00606 int i;
00607
00608
00609
00610 myLid = getLocalLid(context->context,ibPort);
00611
00612 MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
00613
00614 context->sendCqSize = maxTokens+2;
00615 context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
00616 CmiAssert(context->sendCq != NULL);
00617
00618 MACHSTATE1(3,"sendCq created %p",context->sendCq);
00619
00620
00621 context->recvCqSize = maxRecvBuffers;
00622 context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
00623
00624 MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
00625 CmiAssert(context->recvCq != NULL);
00626
00627
00628
00629 context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
00630
00631 if(numNodes > 1)
00632 {
00633 context->srqSize = (maxRecvBuffers+2);
00634 struct ibv_srq_init_attr srqAttr = {
00635 .attr = {
00636 .max_wr = context->srqSize,
00637 .max_sge = 1
00638 }
00639 };
00640 context->srq = ibv_create_srq(context->pd,&srqAttr);
00641 CmiAssert(context->srq != NULL);
00642
00643 struct ibv_qp_init_attr initAttr = {
00644 .qp_type = IBV_QPT_RC,
00645 .send_cq = context->sendCq,
00646 .recv_cq = context->recvCq,
00647 .srq = context->srq,
00648 .sq_sig_all = 0,
00649 .qp_context = NULL,
00650 .cap = {
00651 .max_send_wr = maxTokens,
00652 .max_send_sge = 2,
00653 },
00654 };
00655 struct ibv_qp_attr attr;
00656
00657 attr.qp_state = IBV_QPS_INIT;
00658 attr.pkey_index = 0;
00659 attr.port_num = ibPort;
00660 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
00661
00662
00663
00664
00665
00666 for( i=0;i<numNodes;i++){
00667 if(i == myNode){
00668 }else{
00669 localAddr[i].lid = myLid;
00670 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
00671
00672 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
00673
00674 CmiAssert(context->qp[i] != NULL);
00675
00676
00677 ibv_modify_qp(context->qp[i], &attr,
00678 IBV_QP_STATE |
00679 IBV_QP_PKEY_INDEX |
00680 IBV_QP_PORT |
00681 IBV_QP_ACCESS_FLAGS);
00682
00683 localAddr[i].qpn = context->qp[i]->qp_num;
00684 localAddr[i].psn = lrand48() & 0xffffff;
00685 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
00686 }
00687 }
00688 }
00689 MACHSTATE(3,"qps created");
00690 };
00691
00692 void copyInfiAddr(ChInfiAddr *qpList){
00693 int qpListIdx=0;
00694 int i;
00695 MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
00696 for(i=0;i<_Cmi_numnodes;i++){
00697 if(i == _Cmi_mynode){
00698 }else{
00699 qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
00700 qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
00701 qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);
00702 qpListIdx++;
00703 }
00704 }
00705 }
00706
00707
00708 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
00709 struct ibv_port_attr attr;
00710
00711 if (ibv_query_port(dev_context, port, &attr))
00712 return 0;
00713
00714 return attr.lid;
00715 }
00716
00717
00718
00719 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
00720 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
00721
00722
00723
00724
00725 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
00726 struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
00727 int err;
00728 ret->state = INFI_HEADER_DATA;
00729 ret->qp = context->qp[node];
00730
00731
00732 ret->nodeNo = node;
00733
00734 #if CMK_IBVERBS_DEBUG
00735 ret->psn = 0;
00736 ret->recvPsn = 0;
00737 #endif
00738
00739 struct ibv_qp_attr attr = {
00740 .qp_state = IBV_QPS_RTR,
00741 .path_mtu = mtu,
00742 .dest_qp_num = addr[1],
00743 .rq_psn = addr[2],
00744 .max_dest_rd_atomic = 1,
00745 .min_rnr_timer = 31,
00746 .ah_attr = {
00747 .is_global = 0,
00748 .dlid = addr[0],
00749 .sl = 0,
00750 .src_path_bits = 0,
00751 .port_num = context->ibPort
00752 }
00753 };
00754
00755 MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
00756 MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
00757
00758 if (err = ibv_modify_qp(ret->qp, &attr,
00759 IBV_QP_STATE |
00760 IBV_QP_AV |
00761 IBV_QP_PATH_MTU |
00762 IBV_QP_DEST_QPN |
00763 IBV_QP_RQ_PSN |
00764 IBV_QP_MAX_DEST_RD_ATOMIC |
00765 IBV_QP_MIN_RNR_TIMER)) {
00766 MACHSTATE1(3,"ERROR %d",err);
00767 CmiAbort("failed to change qp state to RTR");
00768 }
00769
00770 MACHSTATE(3,"qp state changed to RTR");
00771
00772 attr.qp_state = IBV_QPS_RTS;
00773 attr.timeout = 26;
00774 attr.retry_cnt = 20;
00775 attr.rnr_retry = 7;
00776 attr.sq_psn = context->localAddr[node].psn;
00777 attr.max_rd_atomic = 1;
00778
00779
00780 if (ibv_modify_qp(ret->qp, &attr,
00781 IBV_QP_STATE |
00782 IBV_QP_TIMEOUT |
00783 IBV_QP_RETRY_CNT |
00784 IBV_QP_RNR_RETRY |
00785 IBV_QP_SQ_PSN |
00786 IBV_QP_MAX_QP_RD_ATOMIC)) {
00787 fprintf(stderr, "Failed to modify QP to RTS\n");
00788 exit(1);
00789 }
00790 MACHSTATE(3,"qp state changed to RTS");
00791
00792 MACHSTATE(3,"} initInfiOtherNodeData");
00793 return ret;
00794 }
00795
00796
00797 void infiPostInitialRecvs(){
00798
00799 int numPosts;
00800
00801
00802
00803
00804
00805
00806 if(_Cmi_numnodes > 1){
00807 numPosts = maxRecvBuffers;
00808 }else{
00809 numPosts = 0;
00810 }
00811 if(numPosts > 0){
00812 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
00813 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
00814 }
00815
00816
00817 free(context->qp);
00818 context->qp = NULL;
00819 free(context->localAddr);
00820 context->localAddr= NULL;
00821 }
00822
00823 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
00824 int numBuffers;
00825 int i;
00826 int bigSize;
00827 char *bigBuf;
00828 struct infiBufferPool *ret;
00829 struct ibv_mr *bigKey;
00830
00831 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
00832
00833 page_size = sysconf(_SC_PAGESIZE);
00834 ret = malloc(sizeof(struct infiBufferPool));
00835 ret->next = NULL;
00836 numBuffers=ret->numBuffers = numRecvs;
00837
00838 ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
00839
00840 bigSize = numBuffers*sizePerBuffer;
00841 bigBuf=malloc(bigSize);
00842 bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
00843 CmiAssert(bigKey != NULL);
00844
00845 for(i=0;i<numBuffers;i++){
00846 struct infiBuffer *buffer = &(ret->buffers[i]);
00847 buffer->type = BUFFER_RECV;
00848 buffer->size = sizePerBuffer;
00849
00850
00851
00852 buffer->buf = &bigBuf[i*sizePerBuffer];
00853 buffer->key = bigKey;
00854
00855 if(buffer->key == NULL){
00856 MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
00857 CmiAssert(buffer->key != NULL);
00858 }
00859 }
00860 return ret;
00861 };
00862
00863
00864
00868 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
00869 int j,err;
00870 struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
00871 struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
00872 struct ibv_recv_wr *bad_wr;
00873
00874 int startBufferIdx=0;
00875 MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
00876 for(j=0;j<numRecvs;j++){
00877
00878
00879 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
00880 sgElements[j].length = sizePerBuffer;
00881 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
00882
00883 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
00884 workRequests[j].sg_list = &sgElements[j];
00885 workRequests[j].num_sge = 1;
00886 if(j != numRecvs-1){
00887 workRequests[j].next = &workRequests[j+1];
00888 }
00889
00890 }
00891 workRequests[numRecvs-1].next = NULL;
00892 MACHSTATE(3,"About to call ibv_post_srq_recv");
00893 if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
00894 CmiAssert(0);
00895 }
00896
00897 free(workRequests);
00898 free(sgElements);
00899 }
00900
00901
00902
00903
00904 static inline void CommunicationServer_nolock(int toBuffer);
00905
00906 static void CmiMachineExit()
00907 {
00908 #if CMK_IBVERBS_STATS
00909 printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
00910 #endif
00911
00912
00913 MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
00914 }
00915 static void ServiceCharmrun_nolock();
00916
00917 static void CmiNotifyStillIdle(CmiIdleState *s) {
00918 #if CMK_SMP
00919 CmiCommLock();
00920
00921 #endif
00922
00923
00924 CommunicationServer_nolock(0);
00925 #if CMK_SMP
00926 CmiCommUnlock();
00927 #endif
00928 }
00929
00930 static inline void increaseTokens(OtherNode node);
00931
00932 static inline int pollRecvCq(const int toBuffer);
00933 static inline int pollSendCq(const int toBuffer);
00934
00935
00936 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
00937 #if !CMK_IBVERBS_TOKENS_FLOW
00938 return;
00939 #else
00940
00941 if(context->tokensLeft == 0){
00942 MACHSTATE(3,"GET FREE TOKENS {{{");
00943 }else{
00944 return;
00945 }
00946 while(context->tokensLeft == 0){
00947 CommunicationServer_nolock(1);
00948 }
00949 MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
00950 #endif
00951 }
00952
00953
00960 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
00961 int incTokens=0;
00962 int retval;
00963 #if CMK_IBVERBS_DEBUG
00964 packet->header.psn = (++node->infiData->psn);
00965 #endif
00966
00967
00968
00969 packet->elemList[1].addr = (uintptr_t)packet->buf;
00970 packet->elemList[1].length = size;
00971 packet->elemList[1].lkey = dataKey->lkey;
00972
00973
00974 packet->destNode = node;
00975
00976 #if CMK_IBVERBS_STATS
00977 pktCount++;
00978 #endif
00979
00980 getFreeTokens(node->infiData);
00981
00982 #if CMK_IBVERBS_INCTOKENS
00983 if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
00984 packet->header.code |= INFIPACKETCODE_INCTOKENS;
00985 incTokens=1;
00986 }
00987 #endif
00988
00989
00990
00991
00992
00993
00994 struct ibv_send_wr *bad_wr=NULL;
00995 if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
00996 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
00997 CmiAssert(0);
00998 }
00999 #if CMK_IBVERBS_TOKENS_FLOW
01000 context->tokensLeft--;
01001 #if CMK_IBVERBS_STATS
01002 if(context->tokensLeft < minTokensLeft){
01003 minTokensLeft = context->tokensLeft;
01004 }
01005 #endif
01006 #endif
01007
01008
01009
01010
01