arch/net/machine-ibverbs.c

Go to the documentation of this file.
00001 
00021 #include <stdio.h>
00022 #include <stdlib.h>
00023 #include <unistd.h>
00024 #include <string.h>
00025 #include <sys/types.h>
00026 #include <sys/socket.h>
00027 #include <malloc.h>
00028 #include <getopt.h>
00029 #include <time.h>
00030 
00031 #include <infiniband/verbs.h>
00032 
00033 enum ibv_mtu mtu = IBV_MTU_2048;
00034 static int page_size;
00035 static int mtu_size;
00036 static int packetSize;
00037 static int dataSize;
00038 static int rdma;
00039 static int rdmaThreshold;
00040 static int firstBinSize;
00041 static int blockAllocRatio;
00042 static int blockThreshold;
00043 
00044 
00045 static int maxRecvBuffers;
00046 static int maxTokens;
00047 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
00048 static int sendPacketPoolSize; /*total number of send buffers created*/
00049 
00050 static double _startTime=0;
00051 static int regCount;
00052 
00053 static int pktCount;
00054 static int msgCount;
00055 static int minTokensLeft;
00056 
00057 
00058 static double regTime;
00059 
00060 static double processBufferedTime;
00061 static int processBufferedCount;
00062 
00063 #define CMK_IBVERBS_STATS 0
00064 #define CMK_IBVERBS_TOKENS_FLOW 1
00065 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
00066 #define CMK_IBVERBS_DEBUG 0
00067 #define CMI_DIRECT_DEBUG 0
00068 #define WC_LIST_SIZE 32
00069 /*#define WC_BUFFER_SIZE 100*/
00070 
00071 
00072 
00073 
00074 #define INCTOKENS_FRACTION 0.04
00075 #define INCTOKENS_INCREASE .50
00076 
00077 // flag for using a pool for every thread
00078 #define THREAD_MULTI_POOL 0
00079 
00080 #if THREAD_MULTI_POOL 
00081 #include "pcqueue.h"
00082 PCQueue **queuePool;
00083 #endif
00084 
00085 #define INFIBARRIERPACKET 128
00086 
00087 struct infiIncTokenAckPacket{
00088         int a;
00089 };
00090 
00091 typedef struct {
00092 char none;  
00093 } CmiIdleState;
00094 
00095 //TODO:ERASE this
00096 static int TESTneighbor;
00097 static int TESTfrees;
00098 
00099 
00100 /********
00101 ** The notify idle methods
00102 ***/
00103 
00104 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
00105 
00106 static void CmiNotifyStillIdle(CmiIdleState *s);
00107 
00108 
00109 static void CmiNotifyBeginIdle(CmiIdleState *s)
00110 {
00111   CmiNotifyStillIdle(s);
00112 }
00113 
00114 void CmiNotifyIdle(void) {
00115   CmiNotifyStillIdle(NULL);
00116 }
00117 
00118 /***************
00119 Data Structures 
00120 ***********************/
00121 
00122 /******
00123         This is a header attached to the beginning of every infiniband packet
00124 *******/
00125 #define INFIPACKETCODE_DATA 1
00126 #define INFIPACKETCODE_INCTOKENS 2
00127 #define INFIRDMA_START 4
00128 #define INFIRDMA_ACK 8
00129 #define INFIDIRECT_REQUEST 16
00130 #define INFIPACKETCODE_INCTOKENSACK 32
00131 #define INFIDUMMYPACKET 64
00132 
00133 struct infiPacketHeader{
00134         char code;
00135         int nodeNo;
00136 #if     CMK_IBVERBS_DEBUG
00137         int psn;
00138 #endif  
00139 };
00140 
00141 /*
00142         Types of rdma packets
00143 */
00144 #define INFI_MESG 1 
00145 #define INFI_DIRECT 2
00146 
00147 struct infiRdmaPacket{
00148         int fromNodeNo;
00149         int type;
00150         struct ibv_mr key;
00151         struct ibv_mr *keyPtr;
00152         int remoteSize;
00153         char *remoteBuf;
00154         void *localBuffer;
00155         OutgoingMsg ogm;
00156         struct infiRdmaPacket *next,*prev;
00157 };
00158 
00159 
00162 #define BUFFER_RECV 1
00163 #define BUFFER_RDMA 2
00164 struct infiBuffer{
00165         int type;
00166         char *buf;
00167         int size;
00168         struct ibv_mr *key;
00169 };
00170 
00171 
00172 
00176 struct infiBufferPool{
00177         int numBuffers;
00178         struct infiBuffer *buffers;
00179         struct infiBufferPool *next;
00180 };
00181 
00182 /*****
00183         It is the structure for the send buffers that are used
00184         to send messages to other nodes
00185 ********/
00186 
00187 
00188 typedef struct infiPacketStruct {       
00189         char *buf;
00190         int size;
00191         struct infiPacketHeader header;
00192         struct ibv_mr *keyHeader;
00193         struct OtherNodeStruct *destNode;
00194         struct infiPacketStruct *next;
00195         OutgoingMsg ogm;
00196         struct ibv_sge elemList[2];
00197         struct ibv_send_wr wr;
00198 }* infiPacket;
00199 
00200 /*
00201 typedef struct infiBufferedWCStruct{
00202         struct ibv_wc wcList[WC_BUFFER_SIZE];
00203         int count;
00204         struct infiBufferedWCStruct *next,*prev;
00205 } * infiBufferedWC;
00206 */
00207 
00208 #define BCASTLIST_SIZE 50
00209 
00210 struct infiBufferedBcastStruct{
00211         char *msg;
00212         int size;
00213         int broot;
00214         int asm_rank;
00215         int valid;
00216 };
00217 
00218 typedef struct infiBufferedBcastPoolStruct{
00219         struct infiBufferedBcastPoolStruct *next,*prev;
00220         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
00221         int count;
00222 
00223 } *infiBufferedBcastPool;
00224 
00225 
00226 
00227 
00228 /***
00229         This structure represents the data needed by the infiniband
00230         communication routines of a node
00231         TODO: add locking for the smp version
00232 */
00233 struct infiContext {
00234         struct ibv_context      *context;
00235         
00236         fd_set  asyncFds;
00237         struct timeval tmo;
00238         
00239         int ibPort;
00240 //      struct ibv_comp_channel *channel;
00241         struct ibv_pd           *pd;
00242         struct ibv_cq           *sendCq;
00243         struct ibv_cq   *recvCq;
00244         struct ibv_srq  *srq;
00245         
00246         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
00247                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
00248                                                                                                 //when the qps are stored in the corresponding OtherNodes
00249 
00250         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
00251 
00252         infiPacket infiPacketFreeList; 
00253         
00254         struct infiBufferPool *recvBufferPool;
00255 
00256         struct infiPacketHeader header;
00257 
00258         int srqSize;
00259         int sendCqSize,recvCqSize;
00260         int tokensLeft;
00261 
00262         infiBufferedBcastPool bufferedBcastList;
00263         
00264         struct infiRdmaPacket *bufferedRdmaAcks;
00265         
00266         struct infiRdmaPacket *bufferedRdmaRequests;
00267 /*      infiBufferedWC infiBufferedRecvList;*/
00268         
00269         int insideProcessBufferedBcasts;
00270 };
00271 
00272 static struct infiContext *context;
00273 
00274 
00275 
00276 
00280 struct infiAddr {
00281         int lid,qpn,psn;
00282 };
00283 
00288 enum { INFI_HEADER_DATA=21,INFI_DATA};
00289 
00290 struct infiOtherNodeData{
00291         struct ibv_qp *qp ;
00292         int state;// does it expect a packet with a header (first packet) or one without
00293         int totalTokens;
00294         int tokensLeft;
00295         int nodeNo;
00296         
00297         int postedRecvs;
00298         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
00299 #if     CMK_IBVERBS_DEBUG       
00300         int psn;
00301         int recvPsn;
00302 #endif  
00303 };
00304 
00305 
00306 /********************************
00307 Memory management structures and types
00308 *****************/
00309 
00310 struct infiCmiChunkHeaderStruct;
00311 
00312 typedef struct infiCmiChunkMetaDataStruct {
00313         struct ibv_mr *key;
00314         int poolIdx;
00315         void *nextBuf;
00316         struct infiCmiChunkHeaderStruct *owner;
00317         int count;
00318 
00319 #if THREAD_MULTI_POOL
00320         int parentPe;                                           // the PE that allocated the buffer and must release it
00321 #endif
00322 } infiCmiChunkMetaData;
00323 
00324 
00325 
00326 
00327 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
00328 
00329 typedef struct {
00330         int size;//without infiCmiChunkHeader
00331         void *startBuf;
00332         int count;
00333 } infiCmiChunkPool;
00334 
00335 #define INFINUMPOOLS 20
00336 #define INFIMAXPERPOOL 100
00337 #define INFIMULTIPOOL -5
00338 
00339 #if THREAD_MULTI_POOL
00340 infiCmiChunkPool **infiCmiChunkPools;
00341 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
00342 #else
00343 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
00344 #endif
00345 
00346 static void initInfiCmiChunkPools();
00347 
00348 
00349 static inline infiPacket newPacket(){
00350         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
00351         pkt->size = -1;
00352         pkt->header = context->header;
00353         pkt->next = NULL;
00354         pkt->destNode = NULL;
00355         pkt->keyHeader = METADATAFIELD(pkt)->key;
00356         pkt->ogm=NULL;
00357         CmiAssert(pkt->keyHeader!=NULL);
00358         
00359         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
00360         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
00361         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
00362         
00363         pkt->wr.wr_id = (uint64_t)pkt;
00364         pkt->wr.sg_list = &(pkt->elemList[0]);
00365         pkt->wr.num_sge = 2;
00366         pkt->wr.opcode = IBV_WR_SEND;
00367         pkt->wr.send_flags = IBV_SEND_SIGNALED;
00368         pkt->wr.next = NULL;
00369         
00370         return pkt;
00371 };
00372 
00373 #define FreeInfiPacket(pkt){ \
00374         pkt->size = -1;\
00375         pkt->ogm=NULL;\
00376         pkt->next = context->infiPacketFreeList; \
00377         context->infiPacketFreeList = pkt; \
00378 }
00379 
00380 #define MallocInfiPacket(pkt) { \
00381         infiPacket p = context->infiPacketFreeList; \
00382         if(p == NULL){ p = newPacket();} \
00383                  else{context->infiPacketFreeList = p->next; } \
00384         pkt = p;\
00385 }
00386 
00387 
00388 
00389 
00390 /******************CmiMachineInit and its helper functions*/
00391 static inline int pollSendCq(const int toBuffer);
00392 
00393 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
00394 static uint16_t getLocalLid(struct ibv_context *context, int port);
00395 static int  checkQp(struct ibv_qp *qp){
00396         struct ibv_qp_attr attr;
00397         struct ibv_qp_init_attr init_attr;
00398                  
00399         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
00400         if(attr.cur_qp_state != IBV_QPS_RTS){
00401                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
00402                 return 0;
00403         }
00404         return 1;
00405 }
00406 static void checkAllQps(){
00407         int i;
00408         for(i=0;i<_Cmi_numnodes;i++){
00409                 if(i != _Cmi_mynode){
00410                         if(!checkQp(nodes[i].infiData->qp)){
00411                                 pollSendCq(0);
00412                                 CmiAssert(0);
00413                         }
00414                 }
00415         }
00416 }
00417 
00418 #if CMK_IBVERBS_FAST_START
00419 static void send_partial_init();
00420 #endif
00421 
00422 static void CmiMachineInit(char **argv){
00423         struct ibv_device **devList;
00424         struct ibv_device *dev;
00425         int ibPort;
00426         int i;
00427         int calcMaxSize;
00428         infiPacket *pktPtrs;
00429         struct infiRdmaPacket **rdmaPktPtrs;
00430 
00431         MACHSTATE(3,"CmiMachineInit {");
00432         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
00433         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
00434         
00435         //TODO: make the device and ibport configureable by commandline parameter
00436         //Check example for how to do that
00437         devList =  ibv_get_device_list(NULL);
00438         CmiAssert(devList != NULL);
00439 
00440         dev = *devList;
00441         CmiAssert(dev != NULL);
00442 
00443         ibPort=1;
00444 
00445         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
00446 
00447         context = (struct infiContext *)malloc(sizeof(struct infiContext));
00448         
00449         MACHSTATE1(3,"context allocated %p",context);
00450         
00451         //localAddr will store the local addresses of all the qps
00452         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
00453         
00454         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
00455         
00456         context->ibPort = ibPort;
00457         //the context for this infiniband device 
00458         context->context = ibv_open_device(dev);
00459         CmiAssert(context->context != NULL);
00460         
00461         MACHSTATE1(3,"device opened %p",context->context);
00462 
00463 /*      FD_ZERO(&context->asyncFds);
00464         FD_SET(context->context->async_fd,&context->asyncFds);
00465         context->tmo.tv_sec=0;
00466         context->tmo.tv_usec=0;
00467         
00468         MACHSTATE(3,"asyncFds zeroed and set");*/
00469 
00470         //protection domain
00471         context->pd = ibv_alloc_pd(context->context);
00472         CmiAssert(context->pd != NULL);
00473         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
00474 
00475   /******** At this point we know that this node is more or less serviceable
00476         So, this is a good point for sending the partial init message for the fast
00477         start case
00478         Moreover, no work dependent on the number of nodes has started yet.
00479         ************/
00480 
00481 #if CMK_IBVERBS_FAST_START
00482   send_partial_init();
00483 #endif
00484 
00485 
00486         context->header.nodeNo = _Cmi_mynode;
00487 
00488         mtu_size=1200;
00489         packetSize = mtu_size*4;
00490         dataSize = packetSize-sizeof(struct infiPacketHeader);
00491         
00492         calcMaxSize=8000;
00493 /*      if(_Cmi_numnodes*50 > calcMaxSize){
00494                 calcMaxSize = _Cmi_numnodes*50;
00495                 if(calcMaxSize > 10000){
00496                         calcMaxSize = 10000;
00497                 }
00498         }*/
00499 //      maxRecvBuffers=80;
00500         maxRecvBuffers=calcMaxSize;
00501         maxTokens = maxRecvBuffers;
00502 //      maxTokens = 80;
00503         context->tokensLeft=maxTokens;
00504         //tokensPerProcessor=4;
00505         if(_Cmi_numnodes > 1){
00506                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
00507         }
00508         
00509         
00510         //TURN ON RDMA
00511         rdma=1;
00512 //      rdmaThreshold=32768;
00513         rdmaThreshold=22000;
00514         firstBinSize = 120;
00515         CmiAssert(rdmaThreshold > firstBinSize);
00516         blockAllocRatio=16;
00517         blockThreshold=8;
00518 
00519         //TODO:erase this
00520         int ppn = 0;
00521         CmiGetArgInt(argv,"+ppn",&CmiMyNodeSize());
00522         MACHSTATE1(3,"CmiMyNodeSize %d",CmiMyNodeSize());
00523         
00524 #if !THREAD_MULTI_POOL
00525         initInfiCmiChunkPools();
00526 #endif
00527 
00528         /*create the pool of send packets*/
00529         sendPacketPoolSize = maxTokens/2;       
00530         if(sendPacketPoolSize > 2000){
00531                 sendPacketPoolSize = 2000;
00532         }
00533         
00534         context->infiPacketFreeList=NULL;
00535         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
00536 
00537         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
00538 #if !THREAD_MULTI_POOL
00539         for(i=0;i<sendPacketPoolSize;i++){
00540                 MallocInfiPacket(pktPtrs[i]);   
00541         }
00542 
00543         for(i=0;i<sendPacketPoolSize;i++){
00544                 FreeInfiPacket(pktPtrs[i]);     
00545         }
00546         free(pktPtrs);
00547 #endif
00548         
00549         context->bufferedBcastList=NULL;
00550         context->bufferedRdmaAcks = NULL;
00551         context->bufferedRdmaRequests = NULL;
00552         context->insideProcessBufferedBcasts=0;
00553         
00554         
00555         if(rdma){
00556 /*              int numPkts;
00557                 int k;
00558                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
00559                         numPkts = _Cmi_numnodes*4;
00560                 }else{
00561                         numPkts = maxRecvBuffers/4;
00562                 }
00563                 
00564                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
00565                 for(k=0;k<numPkts;k++){
00566                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
00567                 }
00568                 
00569                 for(k=0;k<numPkts;k++){
00570                         CmiFree(rdmaPktPtrs[k]);
00571                 }
00572                 free(rdmaPktPtrs);*/
00573         }
00574         
00575 /*      context->infiBufferedRecvList = NULL;*/
00576 #if CMK_IBVERBS_STATS   
00577         regCount =0;
00578         regTime  = 0;
00579 
00580         pktCount=0;
00581         msgCount=0;
00582 
00583         processBufferedCount=0;
00584         processBufferedTime=0;
00585 
00586         minTokensLeft = maxTokens;
00587 #endif  
00588 
00589         
00590 
00591         MACHSTATE(3,"} CmiMachineInit");
00592 }
00593 
00594 void CmiCommunicationInit(char **argv)
00595 {
00596 #if THREAD_MULTI_POOL
00597         initInfiCmiChunkPools();
00598 #endif
00599 }
00600 
00601 /*********
00602         Open a qp for every processor
00603 *****/
00604 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
00605         int myLid;
00606         int i;
00607         
00608         
00609         //find my lid
00610         myLid = getLocalLid(context->context,ibPort);
00611         
00612         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
00613 
00614         context->sendCqSize = maxTokens+2;
00615         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
00616         CmiAssert(context->sendCq != NULL);
00617         
00618         MACHSTATE1(3,"sendCq created %p",context->sendCq);
00619         
00620         
00621         context->recvCqSize = maxRecvBuffers;
00622         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
00623         
00624         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
00625         CmiAssert(context->recvCq != NULL);
00626         
00627         //array of queue pairs
00628 
00629         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
00630 
00631         if(numNodes > 1)
00632         {
00633                 context->srqSize = (maxRecvBuffers+2);
00634                 struct ibv_srq_init_attr srqAttr = {
00635                         .attr = {
00636                         .max_wr  = context->srqSize,
00637                         .max_sge = 1
00638                         }
00639                 };
00640                 context->srq = ibv_create_srq(context->pd,&srqAttr);
00641                 CmiAssert(context->srq != NULL);
00642         
00643                 struct ibv_qp_init_attr initAttr = {
00644                         .qp_type = IBV_QPT_RC,
00645                         .send_cq = context->sendCq,
00646                         .recv_cq = context->recvCq,
00647                         .srq             = context->srq,
00648                         .sq_sig_all = 0,
00649                         .qp_context = NULL,
00650                         .cap     = {
00651                                 .max_send_wr  = maxTokens,
00652                                 .max_send_sge = 2,
00653                         },
00654                 };
00655                 struct ibv_qp_attr attr;
00656 
00657                 attr.qp_state        = IBV_QPS_INIT;
00658                 attr.pkey_index      = 0;
00659                 attr.port_num        = ibPort;
00660                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
00661 
00662 /*              MACHSTATE1(3,"context->pd %p",context->pd);
00663                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
00664                 MACHSTATE1(3,"TEST QP %p",qp);*/
00665 
00666                 for( i=0;i<numNodes;i++){
00667                         if(i == myNode){
00668                         }else{
00669                                 localAddr[i].lid = myLid;
00670                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
00671                         
00672                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
00673                         
00674                                 CmiAssert(context->qp[i] != NULL);
00675                         
00676                         
00677                                 ibv_modify_qp(context->qp[i], &attr,
00678                                           IBV_QP_STATE              |
00679                                           IBV_QP_PKEY_INDEX         |
00680                                         IBV_QP_PORT               |
00681                                         IBV_QP_ACCESS_FLAGS);           
00682 
00683                                 localAddr[i].qpn = context->qp[i]->qp_num;
00684                                 localAddr[i].psn = lrand48() & 0xffffff;
00685                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
00686                         }
00687                 }
00688         }
00689         MACHSTATE(3,"qps created");
00690 };
00691 
00692 void copyInfiAddr(ChInfiAddr *qpList){
00693         int qpListIdx=0;
00694         int i;
00695         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
00696         for(i=0;i<_Cmi_numnodes;i++){
00697                 if(i == _Cmi_mynode){
00698                 }else{
00699                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
00700                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
00701                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
00702                         qpListIdx++;
00703                 }
00704         }
00705 }
00706 
00707 
00708 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
00709         struct ibv_port_attr attr;
00710 
00711         if (ibv_query_port(dev_context, port, &attr))
00712                 return 0;
00713 
00714         return attr.lid;
00715 }
00716 
00717 /**************** END OF CmiMachineInit and its helper functions*/
00718 
00719 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
00720 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
00721 
00722 /* Initial the infiniband specific data for a remote node
00723         1. connect the qp and store it in and return it
00724 **/
00725 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
00726         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
00727         int err;
00728         ret->state = INFI_HEADER_DATA;
00729         ret->qp = context->qp[node];
00730 //      ret->totalTokens = tokensPerProcessor;
00731 //      ret->tokensLeft = tokensPerProcessor;
00732         ret->nodeNo = node;
00733 //      ret->postedRecvs = tokensPerProcessor;
00734 #if     CMK_IBVERBS_DEBUG       
00735         ret->psn = 0;
00736         ret->recvPsn = 0;
00737 #endif
00738         
00739         struct ibv_qp_attr attr = {
00740                 .qp_state               = IBV_QPS_RTR,
00741                 .path_mtu               = mtu,
00742                 .dest_qp_num            = addr[1],
00743                 .rq_psn                 = addr[2],
00744                 .max_dest_rd_atomic     = 1,
00745                 .min_rnr_timer          = 31,
00746                 .ah_attr                = {
00747                         .is_global      = 0,
00748                         .dlid           = addr[0],
00749                         .sl             = 0,
00750                         .src_path_bits  = 0,
00751                         .port_num       = context->ibPort
00752                 }
00753         };
00754         
00755         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
00756         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
00757         
00758         if (err = ibv_modify_qp(ret->qp, &attr,
00759           IBV_QP_STATE              |
00760           IBV_QP_AV                 |
00761           IBV_QP_PATH_MTU           |
00762           IBV_QP_DEST_QPN           |
00763           IBV_QP_RQ_PSN             |
00764           IBV_QP_MAX_DEST_RD_ATOMIC |
00765           IBV_QP_MIN_RNR_TIMER)) {
00766                         MACHSTATE1(3,"ERROR %d",err);
00767                         CmiAbort("failed to change qp state to RTR");
00768         }
00769 
00770         MACHSTATE(3,"qp state changed to RTR");
00771         
00772         attr.qp_state       = IBV_QPS_RTS;
00773         attr.timeout        = 26;
00774         attr.retry_cnt      = 20;
00775         attr.rnr_retry      = 7;
00776         attr.sq_psn         = context->localAddr[node].psn;
00777         attr.max_rd_atomic  = 1;
00778 
00779         
00780         if (ibv_modify_qp(ret->qp, &attr,
00781           IBV_QP_STATE              |
00782           IBV_QP_TIMEOUT            |
00783           IBV_QP_RETRY_CNT          |
00784           IBV_QP_RNR_RETRY          |
00785           IBV_QP_SQ_PSN             |
00786           IBV_QP_MAX_QP_RD_ATOMIC)) {
00787                         fprintf(stderr, "Failed to modify QP to RTS\n");
00788                         exit(1);
00789         }
00790         MACHSTATE(3,"qp state changed to RTS");
00791 
00792         MACHSTATE(3,"} initInfiOtherNodeData");
00793         return ret;
00794 }
00795 
00796 
00797 void    infiPostInitialRecvs(){
00798         //create the pool and post the receives
00799         int numPosts;
00800 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
00801                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
00802         }else{
00803                 numPosts = maxRecvBuffers;
00804         }*/
00805 
00806         if(_Cmi_numnodes > 1){
00807                 numPosts = maxRecvBuffers;
00808         }else{
00809                 numPosts = 0;
00810         }
00811         if(numPosts > 0){
00812                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
00813                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
00814         }
00815 
00816 
00817         free(context->qp);
00818         context->qp = NULL;
00819         free(context->localAddr);
00820         context->localAddr= NULL;
00821 }
00822 
00823 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
00824         int numBuffers;
00825         int i;
00826         int bigSize;
00827         char *bigBuf;
00828         struct infiBufferPool *ret;
00829         struct ibv_mr *bigKey;
00830 
00831         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
00832 
00833         page_size = sysconf(_SC_PAGESIZE);
00834         ret = malloc(sizeof(struct infiBufferPool));
00835         ret->next = NULL;
00836         numBuffers=ret->numBuffers = numRecvs;
00837         
00838         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
00839         
00840         bigSize = numBuffers*sizePerBuffer;
00841         bigBuf=malloc(bigSize);
00842         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
00843         CmiAssert(bigKey != NULL);
00844         
00845         for(i=0;i<numBuffers;i++){
00846                 struct infiBuffer *buffer =  &(ret->buffers[i]);
00847                 buffer->type = BUFFER_RECV;
00848                 buffer->size = sizePerBuffer;
00849                 
00850                 /*buffer->buf = malloc(sizePerBuffer);
00851                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
00852                 buffer->buf = &bigBuf[i*sizePerBuffer];
00853                 buffer->key = bigKey;
00854 
00855                 if(buffer->key == NULL){
00856                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
00857                         CmiAssert(buffer->key != NULL);
00858                 }
00859         }
00860         return ret;
00861 };
00862 
00863 
00864 
00868 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
00869         int j,err;
00870         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
00871         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
00872         struct ibv_recv_wr *bad_wr;
00873         
00874         int startBufferIdx=0;
00875         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
00876         for(j=0;j<numRecvs;j++){
00877                 
00878                 
00879                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
00880                 sgElements[j].length = sizePerBuffer;
00881                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
00882                 
00883                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
00884                 workRequests[j].sg_list = &sgElements[j];
00885                 workRequests[j].num_sge = 1;
00886                 if(j != numRecvs-1){
00887                         workRequests[j].next = &workRequests[j+1];
00888                 }
00889                 
00890         }
00891         workRequests[numRecvs-1].next = NULL;
00892         MACHSTATE(3,"About to call ibv_post_srq_recv");
00893         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
00894                 CmiAssert(0);
00895         }
00896 
00897         free(workRequests);
00898         free(sgElements);
00899 }
00900 
00901 
00902 
00903 
00904 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
00905 
00906 static void CmiMachineExit()
00907 {
00908 #if CMK_IBVERBS_STATS   
00909         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
00910 #endif
00911 
00912         //TODO: ERASE this
00913         MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
00914 }
00915 static void ServiceCharmrun_nolock();
00916 
00917 static void CmiNotifyStillIdle(CmiIdleState *s) {
00918 #if CMK_SMP
00919         CmiCommLock();
00920 /*      if(where == COMM_SERVER_FROM_SMP)*/
00921 #endif
00922 /*              ServiceCharmrun_nolock();*/
00923 
00924         CommunicationServer_nolock(0);
00925 #if CMK_SMP
00926         CmiCommUnlock();
00927 #endif
00928 }
00929 
00930 static inline void increaseTokens(OtherNode node);
00931 
00932 static inline int pollRecvCq(const int toBuffer);
00933 static inline int pollSendCq(const int toBuffer);
00934 
00935 
00936 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
00937 #if !CMK_IBVERBS_TOKENS_FLOW
00938         return;
00939 #else
00940         //if(infiData->tokensLeft == 0){
00941         if(context->tokensLeft == 0){
00942                 MACHSTATE(3,"GET FREE TOKENS {{{");
00943         }else{
00944                 return;
00945         }
00946         while(context->tokensLeft == 0){
00947                 CommunicationServer_nolock(1); 
00948         }
00949         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
00950 #endif
00951 }
00952 
00953 
00960 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
00961         int incTokens=0;
00962         int retval;
00963 #if     CMK_IBVERBS_DEBUG
00964         packet->header.psn = (++node->infiData->psn);
00965 #endif  
00966 
00967 
00968 
00969         packet->elemList[1].addr = (uintptr_t)packet->buf;
00970         packet->elemList[1].length = size;
00971         packet->elemList[1].lkey = dataKey->lkey;
00972         
00973         
00974         packet->destNode = node;
00975         
00976 #if CMK_IBVERBS_STATS   
00977         pktCount++;
00978 #endif  
00979         
00980         getFreeTokens(node->infiData);
00981 
00982 #if CMK_IBVERBS_INCTOKENS       
00983         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
00984                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
00985                 incTokens=1;
00986         }
00987 #endif
00988 /*
00989         if(!checkQp(node->infiData->qp)){
00990                 pollSendCq(1);
00991                 CmiAssert(0);
00992         }*/
00993 
00994         struct ibv_send_wr *bad_wr=NULL;
00995         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
00996                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
00997                 CmiAssert(0);
00998         }
00999 #if     CMK_IBVERBS_TOKENS_FLOW
01000         context->tokensLeft--;
01001 #if     CMK_IBVERBS_STATS
01002         if(context->tokensLeft < minTokensLeft){
01003                 minTokensLeft = context->tokensLeft;
01004         }
01005 #endif
01006 #endif
01007 
01008 /*      if(!checkQp(node->infiData->qp)){
01009                 pollSendCq(1);
01010                 CmiAssert(0);
01