00001
00024
00025
00026
00027 #include <infiniband/verbs.h>
00028
00029 #define WC_LIST_SIZE 32
00030
00031 #define INFIPACKETCODE_DATA 1
00032 #define INFIDUMMYPACKET 64
00033 #define INFIBARRIERPACKET 128
00034
00035 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
00036
00037
00038 enum ibv_mtu mtu = IBV_MTU_2048;
00039 static int mtu_size;
00040 static int maxrecvbuffers;
00041 static int maxtokens;
00042 static int firstBinSize;
00043 static int blockThreshold;
00044 static int blockAllocRatio;
00045 static int packetsize;
00046
00047
00048 struct ibudstruct {
00049 struct ibv_device **devlist;
00050 struct ibv_device *dev;
00051 };
00052
00053 struct ibudstruct ibud;
00054
00055 struct infiPacketHeader{
00056 char code;
00057 int nodeNo;
00058 };
00059
00062 struct infiAddr {
00063 int lid,qpn,psn;
00064 };
00065
00066 typedef struct infiPacketStruct {
00067 char *buf;
00068 int size;
00069 char extra[40];
00070 struct infiPacketHeader header;
00071 struct ibv_mr *keyHeader;
00072 struct OtherNodeStruct *destNode;
00073 struct infiPacketStruct *next;
00074 OutgoingMsg ogm;
00075 struct ibv_sge elemList[2];
00076 struct ibv_send_wr wr;
00077 }* infiPacket;
00078
00079 struct infiContext {
00080 struct ibv_context *context;
00081
00082 fd_set asyncFds;
00083 struct timeval tmo;
00084
00085 int ibPort;
00086 struct ibv_pd *pd;
00087 struct ibv_cq *sendCq;
00088 struct ibv_cq *recvCq;
00089 struct ibv_srq *srq;
00090 struct ibv_mr *mr;
00091 struct ibv_ah **ah;
00092
00093 struct ibv_qp *qp;
00094
00095 struct infiAddr localAddr;
00096
00097 struct infiBufferPool *recvBufferPool;
00098
00099 infiPacket infiPacketFreeList;
00100
00101 struct infiPacketHeader header;
00102 int sendCqSize,recvCqSize;
00103
00104 void *buffer;
00105
00106 };
00107
00108 static struct infiContext *context;
00109
00110
00111
00112 struct infiOtherNodeData{
00113 int state;
00114 int totalTokens;
00115 int tokensLeft;
00116 int nodeNo;
00117
00118 int postedRecvs;
00119 int broot;
00120 struct infiAddr qp;
00121 };
00122
00123 enum { INFI_HEADER_DATA=21,INFI_DATA};
00124
00125 typedef struct {
00126 int sleepMs;
00127 int nIdles;
00128 CmiState cs;
00129 } CmiIdleState;
00130
00131 #define BUFFER_RECV 1
00132 struct infiBuffer{
00133 int type;
00134 char *buf;
00135 int size;
00136 struct ibv_mr *key;
00137 };
00138
00139
00140
00141
00142 typedef struct infiCmiChunkMetaDataStruct {
00143 struct ibv_mr *key;
00144 int poolIdx;
00145 void *nextBuf;
00146 struct infiCmiChunkHeaderStruct *owner;
00147 int count;
00148 } infiCmiChunkMetaData;
00149
00150 struct infiBufferPool{
00151 int numBuffers;
00152 struct infiBuffer *buffers;
00153 struct infiBufferPool *next;
00154 };
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 static const char *const __ibv_wc_status_str[] = {
00168 "Success",
00169 "Local Length Error",
00170 "Local QP Operation Error",
00171 "Local EE Context Operation Error",
00172 "Local Protection Error",
00173 "Work Request Flushed Error",
00174 "Memory Management Operation Error",
00175 "Bad Response Error",
00176 "Local Access Error",
00177 "Remote Invalid Request Error",
00178 "Remote Access Error",
00179 "Remote Operation Error",
00180 "Transport Retry Counter Exceeded",
00181 "RNR Retry Counter Exceeded",
00182 "Local RDD Violation Error",
00183 "Remote Invalid RD Request",
00184 "Aborted Error",
00185 "Invalid EE Context Number",
00186 "Invalid EE Context State",
00187 "Fatal Error",
00188 "Response Timeout Error",
00189 "General Error"
00190 };
00191 const char *ibv_wc_status_str(enum ibv_wc_status status) {
00192 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
00193 status = IBV_WC_GENERAL_ERR;
00194 return (__ibv_wc_status_str[status]);
00195 }
00196
00197
00198 typedef struct {
00199 int size;
00200 void *startBuf;
00201 int count;
00202 } infiCmiChunkPool;
00203
00204
00205 #define INFIMULTIPOOL -5
00206 #define INFINUMPOOLS 20
00207 #define INFIMAXPERPOOL 100
00208
00209 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
00210
00211 #define FreeInfiPacket(pkt){ \
00212 pkt->size = -1;\
00213 pkt->ogm=NULL;\
00214 pkt->next = context->infiPacketFreeList; \
00215 context->infiPacketFreeList = pkt; \
00216 }
00217
00218 #define MallocInfiPacket(pkt) { \
00219 infiPacket p = context->infiPacketFreeList; \
00220 if(p == NULL){ p = newPacket();} \
00221 else{context->infiPacketFreeList = p->next; } \
00222 pkt = p;\
00223 }
00224
00225 void infi_unregAndFreeMeta(void *md) {
00226 if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL)) {
00227 ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
00228 free(((infiCmiChunkMetaData *)md));
00229 }
00230 }
00231
00232 static inline void *getInfiCmiChunk(int dataSize){
00233
00234
00235 int ratio = dataSize/firstBinSize;
00236 int poolIdx=0;
00237 void *res;
00238
00239 while(ratio > 0){
00240 ratio = ratio >> 1;
00241 poolIdx++;
00242 }
00243 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
00244 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
00245 infiCmiChunkMetaData *metaData;
00246 infiCmiChunkHeader *hdr;
00247 int allocSize;
00248 int count=1;
00249 int i;
00250 struct ibv_mr *key;
00251 void *origres;
00252
00253
00254 if(poolIdx < INFINUMPOOLS ){
00255 allocSize = infiCmiChunkPools[poolIdx].size;
00256
00257 }else{
00258 allocSize = dataSize;
00259 }
00260
00261 if(poolIdx < blockThreshold){
00262 count = blockAllocRatio;
00263 }
00264 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
00265 hdr = res;
00266
00267 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_LOCAL_WRITE);
00268 CmiAssert(key != NULL);
00269
00270 origres = (res += sizeof(infiCmiChunkHeader));
00271
00272 for(i=0;i<count;i++){
00273 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
00274 metaData->key = key;
00275 metaData->owner = hdr;
00276 metaData->poolIdx = poolIdx;
00277
00278 if(i == 0){
00279 metaData->owner->metaData->count = count;
00280 metaData->nextBuf = NULL;
00281 }else{
00282 void *startBuf = res - sizeof(infiCmiChunkHeader);
00283 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
00284 infiCmiChunkPools[poolIdx].startBuf = startBuf;
00285 infiCmiChunkPools[poolIdx].count++;
00286
00287 }
00288 if(i != count-1){
00289 res += (allocSize+sizeof(infiCmiChunkHeader));
00290 }
00291 }
00292 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
00293
00294 return origres;
00295 }
00296 if(poolIdx < INFINUMPOOLS){
00297 infiCmiChunkMetaData *metaData;
00298
00299 res = infiCmiChunkPools[poolIdx].startBuf;
00300 res += sizeof(infiCmiChunkHeader);
00301
00302 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
00303 metaData = METADATAFIELD(res);
00304
00305 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
00306 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
00307
00308 metaData->nextBuf = NULL;
00309
00310
00311 infiCmiChunkPools[poolIdx].count--;
00312 return res;
00313 }
00314
00315 CmiAssert(0);
00316 }
00317
00318 void * infi_CmiAlloc(int size){
00319 void *res;
00320
00321 #if CMK_SMP
00322 CmiMemLock();
00323 #endif
00324 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
00325
00326 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));
00327 res -= sizeof(CmiChunkHeader);
00328 #if CMK_SMP
00329 CmiMemUnlock();
00330 #endif
00331 return res;
00332 }
00333
00334
00335 void infi_CmiFree(void *ptr){
00336 int size;
00337 void *freePtr = ptr;
00338 infiCmiChunkMetaData *metaData;
00339 int poolIdx;
00340
00341 #if CMK_SMP
00342 CmiMemLock();
00343 #endif
00344 ptr += sizeof(CmiChunkHeader);
00345 size = SIZEFIELD (ptr);
00346
00347 freePtr = ptr - sizeof(infiCmiChunkHeader);
00348 metaData = METADATAFIELD(ptr);
00349 poolIdx = metaData->poolIdx;
00350 if(poolIdx == INFIMULTIPOOL){
00353 return;
00354 }
00355 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
00356 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
00357 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
00358 infiCmiChunkPools[poolIdx].startBuf = freePtr;
00359 infiCmiChunkPools[poolIdx].count++;
00360 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
00361 }else{
00362 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
00363 metaData->owner->metaData->count--;
00364 if(metaData->owner->metaData == metaData){
00365
00366 if(metaData->owner->metaData->count == 0){
00367
00368 ibv_dereg_mr(metaData->key);
00369 free(freePtr);
00370 free(metaData);
00371 }
00372
00373
00374 }else {
00375 if(metaData->owner->metaData->count == 0){
00376
00377 freePtr = metaData->owner;
00378 ibv_dereg_mr(metaData->key);
00379 free(metaData->owner->metaData);
00380 free(freePtr);
00381 }
00382 free(metaData);
00383 }
00384 }
00385 #if CMK_SMP
00386 CmiMemUnlock();
00387 #endif
00388 }
00389
00390
00391 static void initInfiCmiChunkPools(){
00392 int i,j;
00393 int size = firstBinSize;
00394 int nodeSize;
00395
00396 size = firstBinSize;
00397 for(i=0;i<INFINUMPOOLS;i++){
00398 infiCmiChunkPools[i].size = size;
00399 infiCmiChunkPools[i].startBuf = NULL;
00400 infiCmiChunkPools[i].count = 0;
00401 size *= 2;
00402 }
00403
00404 }
00405
00406
00407
00408
00409
00410
00411
00412
00413 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
00414 int j,err;
00415 struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
00416 struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
00417 struct ibv_recv_wr *bad_wr;
00418
00419 int startBufferIdx=0;
00420 MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
00421 for(j=0;j<numRecvs;j++){
00422 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
00423 sgElements[j].length = sizePerBuffer + 40;
00424 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
00425 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
00426 workRequests[j].sg_list = &sgElements[j];
00427 workRequests[j].num_sge = 1;
00428 if(j != numRecvs-1){
00429 workRequests[j].next = &workRequests[j+1];
00430 }
00431 }
00432 workRequests[numRecvs-1].next = NULL;
00433 MACHSTATE(3,"About to call ibv_post_recv");
00434 CmiAssert(ibv_post_recv(context->qp,workRequests,&bad_wr)==0);
00435
00436 free(workRequests);
00437 free(sgElements);
00438
00439 }
00440
00441 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
00442 int numBuffers;
00443 int i;
00444 int bigSize;
00445 char *bigBuf;
00446 struct infiBufferPool *ret;
00447 struct ibv_mr *bigKey;
00448 int page_size;
00449
00450 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
00451
00452 page_size = sysconf(_SC_PAGESIZE);
00453 ret = malloc(sizeof(struct infiBufferPool));
00454 ret->next = NULL;
00455 numBuffers=ret->numBuffers = numRecvs;
00456 ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
00457 bigSize = numBuffers*sizePerBuffer;
00458 bigBuf = memalign(page_size,bigSize);
00459 bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
00460 CmiAssert(bigKey != NULL);
00461
00462 for(i=0;i<numBuffers;i++){
00463 struct infiBuffer *buffer = &(ret->buffers[i]);
00464 buffer->type = BUFFER_RECV;
00465 buffer->size = sizePerBuffer;
00466 buffer->buf = &bigBuf[i*sizePerBuffer];
00467 buffer->key = bigKey;
00468
00469 if(buffer->key == NULL){
00470 MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
00471 CmiAssert(buffer->key != NULL);
00472 }
00473 }
00474 return ret;
00475 };
00476
00477
00478
00479 void infiPostInitialRecvs(){
00480
00481 int numPosts;
00482
00483
00484 context->recvBufferPool = allocateInfiBufferPool(maxrecvbuffers, packetsize + 40);
00485 postInitialRecvs(context->recvBufferPool,maxrecvbuffers,packetsize);
00486
00487 }
00488
00489
00490
00491
00492
00493
00494
00495 static CmiIdleState *CmiNotifyGetState(void) {
00496 return NULL;
00497 }
00498
00499 static void CmiNotifyStillIdle(CmiIdleState *s);
00500 static void CmiNotifyBeginIdle(CmiIdleState *s) {
00501 CmiNotifyStillIdle(s);
00502 }
00503
00504
00505 static inline void CommunicationServer_lock(int toBuffer);
00506 static inline void CommunicationServer_nolock(int toBuffer);
00507
00508 static void CmiNotifyStillIdle(CmiIdleState *s) {
00509 #if CMK_SMP
00510 CommunicationServer_lock(0);
00511 #else
00512 CommunicationServer_nolock(0);
00513 #endif
00514 }
00515
00516
00517
00518
00519
00520
00521
00522 void CmiNotifyIdle(void) {
00523 CmiNotifyStillIdle(NULL);
00524 }
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537 int CheckSocketsReady(int withDelayMs)
00538 {
00539 int nreadable,dataWrite=writeableDgrams || writeableAcks;
00540 CMK_PIPE_DECL(withDelayMs);
00541
00542
00543 #if CMK_USE_KQUEUE && 0
00544
00545
00546
00547
00548
00549 static int first = 1;
00550 if(first){
00551 first = 0;
00552 CmiStdoutAdd(CMK_PIPE_SUB);
00553 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
00554 else return 0;
00555 if (dataskt!=-1) {
00556 CMK_PIPE_ADDREAD(dataskt);
00557 CMK_PIPE_ADDWRITE(dataskt);
00558 }
00559 }
00560
00561 #else
00562 CmiStdoutAdd(CMK_PIPE_SUB);
00563 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
00564 else return 0;
00565 if (dataskt!=-1) {
00566 { CMK_PIPE_ADDREAD(dataskt); }
00567 if (dataWrite)
00568 CMK_PIPE_ADDWRITE(dataskt);
00569 }
00570 #endif
00571
00572 nreadable=CMK_PIPE_CALL();
00573 ctrlskt_ready_read = 0;
00574 dataskt_ready_read = 0;
00575 dataskt_ready_write = 0;
00576
00577 if (nreadable == 0) {
00578 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
00579 return nreadable;
00580 }
00581 if (nreadable==-1) {
00582 CMK_PIPE_CHECKERR();
00583 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
00584 return CheckSocketsReady(0);
00585 }
00586
00587 CmiStdoutCheck(CMK_PIPE_SUB);
00588 if (Cmi_charmrun_fd!=-1)
00589 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00590 if (dataskt!=-1) {
00591 dataskt_ready_read = CMK_PIPE_CHECKREAD(dataskt);
00592 if (dataWrite)
00593 dataskt_ready_write = CMK_PIPE_CHECKWRITE(dataskt);
00594 }
00595 return nreadable;
00596 }
00597
00598
00599
00600 static inline infiPacket newPacket(){
00601 infiPacket pkt=(infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
00602
00603 pkt->size = -1;
00604 pkt->header = context->header;
00605 pkt->next = NULL;
00606 pkt->destNode = NULL;
00607 pkt->keyHeader = METADATAFIELD(pkt)->key;
00608 pkt->ogm=NULL;
00609 CmiAssert(pkt->keyHeader!=NULL);
00610
00611 pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
00612 pkt->elemList[0].length = sizeof(struct infiPacketHeader);
00613 pkt->elemList[0].lkey = pkt->keyHeader->lkey;
00614
00615 pkt->wr.wr_id = (uint64_t)pkt;
00616 pkt->wr.sg_list = &(pkt->elemList[0]);
00617 pkt->wr.num_sge = 2;
00618 pkt->wr.opcode = IBV_WR_SEND;
00619 pkt->wr.send_flags = IBV_SEND_SIGNALED;
00620 pkt->wr.next = NULL;
00621
00622 return pkt;
00623 };
00624
00625 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664 int retval;
00665 struct ibv_send_wr *bad_wr=NULL;
00666 MACHSTATE(2," here");
00667
00668
00669 packet->elemList[1].addr = (uintptr_t)packet->buf;
00670 packet->elemList[1].length = size;
00671 packet->elemList[1].lkey = dataKey->lkey;
00672 MACHSTATE(2," here");
00673
00674 packet->destNode = node;
00675
00676 MACHSTATE(2," here1");
00677 MACHSTATE1(2," here qp=%i",context->qp);
00678 MACHSTATE1(2," here wr=%i",&(packet->wr));
00679 MACHSTATE1(2," here wr=%i",&bad_wr);
00680
00681
00682 if(ibv_post_send(context->qp,&(packet->wr),&bad_wr)){
00683 MACHSTATE(2," problem sending");
00684 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
00685 CmiAssert(0);
00686 }
00687 MACHSTATE(2," here");
00688 MACHSTATE2(3,"Packet send size %d node %d ",size,packet->destNode->infiData->nodeNo);
00689 MACHSTATE2(2," addr %p lkey %p ",(uintptr_t)packet->buf,dataKey->lkey);
00690
00691 }
00692
00693 static void inline EnqueueDataPacket(OutgoingMsg ogm, char *data, int size, OtherNode node, int rank, int broot) {
00694 struct ibv_mr *key;
00695 infiPacket packet;
00696 MallocInfiPacket(packet);
00697 packet->size = size;
00698 packet->buf=data;
00699
00700
00701 packet->header.code = INFIPACKETCODE_DATA;
00702
00703 ogm->refcount++;
00704 packet->ogm = ogm;
00705
00706 key = METADATAFIELD(ogm->data)->key;
00707 CmiAssert(key != NULL);
00708
00709 EnqueuePacket(node,packet,size,key);
00710 }
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy) {
00723 int size; char *data;
00724 MACHSTATE(3,"DeliverViaNetwork");
00725 size=ogm->size;
00726 data=ogm->data;
00727 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
00728 CmiMsgHeaderSetLength(data,size);
00729 while(size>Cmi_dgram_max_data) {
00730 EnqueueDataPacket(ogm, data, Cmi_dgram_max_data, node, rank, broot);
00731 size -= Cmi_dgram_max_data;
00732 data += Cmi_dgram_max_data;
00733
00734 }
00735 if(size>0)
00736 EnqueueDataPacket(ogm, data, size, node, rank, broot);
00737
00738 }
00739
00740
00741 static void ServiceCharmrun_nolock() {
00742 int again = 1;
00743 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
00744 while (again) {
00745 again = 0;
00746 CheckSocketsReady(0);
00747 if (ctrlskt_ready_read) {
00748 ctrl_getone();
00749 again=1;
00750 }
00751 if (CmiStdoutNeedsService())
00752 CmiStdoutService();
00753 }
00754 MACHSTATE(2,"} ServiceCharmrun_nolock end")
00755 }
00756
00757
00758 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
00759 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
00760 if (rank == DGRAM_BROADCAST
00761 #if CMK_NODE_QUEUE_AVAILABLE
00762 || rank == DGRAM_NODEBROADCAST
00763 #endif
00764 ){
00765 if(toBuffer){
00766 insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
00767 }else{
00768 #if CMK_BROADCAST_SPANNING_TREE
00769 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
00770 #else
00771 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
00772 #endif
00773 }
00774 }
00775 #endif
00776
00777 switch (rank) {
00778 case DGRAM_BROADCAST: {
00779 int i;
00780 for (i=1; i<_Cmi_mynodesize; i++){
00781 CmiPushPE(i, CopyMsg(newmsg, total_size));
00782 }
00783 CmiPushPE(0, newmsg);
00784 break;
00785 }
00786 #if CMK_NODE_QUEUE_AVAILABLE
00787 case DGRAM_NODEBROADCAST:
00788 case DGRAM_NODEMESSAGE: {
00789 CmiPushNode(newmsg);
00790 break;
00791 }
00792 #endif
00793 default: {
00794 CmiPushPE(rank, newmsg);
00795 }
00796 }
00797
00798
00799
00800 }
00801
00802
00803
00804 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
00805 char *newmsg;
00806 OtherNode node = &nodes[nodeNo];
00807 newmsg = node->asm_msg;
00808
00809 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
00810
00811 switch(node->infiData->state){
00812 case INFI_HEADER_DATA: {
00813 int size;
00814 int rank, srcpe, seqno, magic, i;
00815 unsigned int broot;
00816 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
00817 size = CmiMsgHeaderGetLength(msg);
00818 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
00819 newmsg = (char *)CmiAlloc(size);
00820 _MEMCHECK(newmsg);
00821 memcpy(newmsg, msg, len);
00822 node->asm_rank = rank;
00823 node->asm_total = size;
00824 node->asm_fill = len;
00825 node->asm_msg = newmsg;
00826 node->infiData->broot = broot;
00827 if(len>size) {
00828
00829 node->infiData->state = INFI_DATA;
00830 } else if(len == size){
00831
00832 node->infiData->state = INFI_HEADER_DATA;
00833 } else {
00834 CmiPrintf("size: %d, len:%d.\n", size, len);
00835 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00836 }
00837
00838 break;
00839 }
00840 case INFI_DATA: {
00841 if(node->asm_fill+len<node->asm_total&&len!=Cmi_dgram_max_data){
00842 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
00843 CmiAbort("packet in the middle does not have expected length");
00844 }
00845 if(node->asm_fill+len > node->asm_total){
00846 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
00847 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
00848 }
00849
00850 memcpy(newmsg + node->asm_fill,msg,len);
00851 node->asm_fill += len;
00852 if(node->asm_fill == node->asm_total){
00853 node->infiData->state = INFI_HEADER_DATA;
00854 }else{
00855 node->infiData->state = INFI_DATA;
00856 }
00857
00858 break;
00859 }
00860 }
00861 if(node->infiData->state == INFI_HEADER_DATA){
00862 int total_size = node->asm_total;
00863 node->asm_msg = NULL;
00864
00865 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
00866 }
00867 }
00868
00869
00870 void processSendWC(struct ibv_wc *sendWC) {
00871 MACHSTATE(3,"processSendWC {");
00872 infiPacket packet = (infiPacket )sendWC->wr_id;
00873 FreeInfiPacket(packet);
00874 MACHSTATE(3,"} processSendWC ");
00875 }
00876
00877 void processRecvWC(struct ibv_wc *recvWC,const int toBuffer) {
00878
00879 struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;
00880 struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
00881 int nodeNo = header->nodeNo;
00882
00883 int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
00884 MACHSTATE(3,"processRecvWC {");
00885 MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);
00886
00887 if(header->code & INFIPACKETCODE_DATA){
00888 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
00889 }
00890 else if(header->code & INFIDUMMYPACKET){
00891 MACHSTATE(3,"Dummy packet");
00892 }
00893 else if(header->code & INFIBARRIERPACKET){
00894 MACHSTATE(3,"Barrier packet");
00895 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
00896 }
00897
00898 {
00899 struct ibv_sge list = {
00900 .addr = (uintptr_t) buffer->buf,
00901 .length = buffer->size,
00902 .lkey = buffer->key->lkey,
00903 };
00904
00905 struct ibv_recv_wr wr = {
00906 .wr_id = (uint64_t)buffer,
00907 .sg_list = &list,
00908 .num_sge = 1,
00909 .next = NULL
00910 };
00911 struct ibv_recv_wr *bad_wr;
00912
00913 CmiAssert(ibv_post_recv(context->qp,&wr,&bad_wr)==0);
00914 }
00915 MACHSTATE(3,"} processRecvWC ");
00916 }
00917
00918
00919 static inline int pollCq(const int toBuffer,struct ibv_cq *cq) {
00920
00921 int i;
00922 int ne;
00923 struct ibv_wc wc[WC_LIST_SIZE];
00924
00925 MACHSTATE1(2,"pollCq %d (((",toBuffer);
00926 ne = ibv_poll_cq(cq,WC_LIST_SIZE,&wc[0]);
00927
00928 if(ne != 0){
00929 MACHSTATE1(3,"pollCq ne %d",ne);
00930 if(ne<0)
00931 CmiAbort("ibv_poll_cq error");
00932 }
00933
00934 for(i=0;i<ne;i++){
00935
00936 if(wc[i].status!=IBV_WC_SUCCESS) {
00937 MACHSTATE3(3,"wc[%i].status=%i (%s)",i,wc[i].status,ibv_wc_status_str(wc[i].status));
00938 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc[i].wr_id,wc[i].qp_num,wc[i].vendor_err);
00939 MACHSTATE1(3," key=%p ",
00940 ((struct infiBuffer *)((&wc[i])->wr_id))->key);
00941
00942
00943
00944
00945
00946
00947 CmiAssert(wc[i].status==IBV_WC_SUCCESS);
00948 }
00949
00950
00951 switch(wc[i].opcode){
00952 case IBV_WC_SEND:
00953 processSendWC(&wc[i]);
00954 break;
00955 case IBV_WC_RECV:
00956 processRecvWC(&wc[i],toBuffer);
00957 break;
00958 default:
00959 CmiAbort("Wrong type of work completion object in cq");
00960 break;
00961 }
00962
00963 }
00964 MACHSTATE1(2,"))) pollCq %d",toBuffer);
00965 return ne;
00966
00967 }
00968
00969 static inline void CommunicationServer_lock(int toBuffer) {
00970 CmiCommLock();
00971 CommunicationServer_nolock(0);
00972 CmiCommUnlock();
00973 }
00974
00975 static inline void CommunicationServer_nolock(int toBuffer) {
00976
00977
00978
00979
00980
00981
00982 MACHSTATE(2,"CommServer_nolock{");
00983
00984
00985
00986 pollCq(toBuffer,context->sendCq);
00987 pollCq(toBuffer,context->recvCq);
00988
00989
00990
00991
00992 MACHSTATE(2,"} CommServer_nolock ne");
00993 }
00994
00995
00996
00997
00998 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
00999 struct ibv_port_attr attr;
01000 if (ibv_query_port(dev_context, port, &attr))
01001 return 0;
01002
01003 return attr.lid;
01004 }
01005
01006
01007 struct infiAddr* initinfiAddr(int node,int lid,int qpn,int psn) {
01008 struct infiAddr *addr=malloc(sizeof(struct infiAddr));
01009
01010 addr->lid=lid;
01011 addr->qpn=qpn;
01012 addr->psn=psn;
01013
01014 return addr;
01015 }
01016
01017 struct infiOtherNodeData *initinfiData(int node,int lid,int qpn,int psn) {
01018
01019 struct infiOtherNodeData *ret=malloc(sizeof(struct infiOtherNodeData));
01020
01021 ret->qp.lid=lid;
01022 ret->qp.qpn=qpn;
01023 ret->qp.psn=psn;
01024
01025 ret->nodeNo = node;
01026 ret->state = INFI_HEADER_DATA;
01027
01028 MACHSTATE4(3,"Storing node[%i] (lid=%i qpn=%i psn=%i)",node,lid,qpn,psn);
01029
01030
01031
01032
01033
01034 return ret;
01035 }
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048
01049 void CmiHandleImmediate();
01050 static void CommunicationServer(int sleepTime, int where) {
01051
01052
01053
01054
01055
01056 if(where==COMM_SERVER_FROM_INTERRUPT)
01057 return;
01058 #if CMK_SMP
01059 if(where == COMM_SERVER_FROM_WORKER)
01060 return;
01061 if(where == COMM_SERVER_FROM_SMP) {
01062 ServiceCharmrun_nolock();
01063 }
01064 CommunicationServer_lock(0);
01065 #else
01066 ServiceCharmrun_nolock();
01067 CommunicationServer_nolock(0);
01068 #endif
01069 }
01070
01071
01072
01073
01074
01075
01076
01077 static void sendBarrierMessage(int pe) {
01078
01079 int size=32;
01080 OtherNode node=nodes+pe;
01081 infiPacket packet;
01082 MallocInfiPacket(packet);
01083 packet->size = size;
01084 packet->buf = CmiAlloc(size);
01085 packet->header.code=INFIBARRIERPACKET;
01086 packet->wr.wr.ud.ah=context->ah[pe];
01087 packet->wr.wr.ud.remote_qpn=nodes[pe].infiData->qp.qpn;
01088 packet->wr.wr.ud.remote_qkey = 0x11111111;
01089
01090 MACHSTATE1(3,"HERE -> %d",packet->header.code);
01091 MACHSTATE2(3,"sending to qpn=%i pe=%i",nodes[pe].infiData->qp.qpn,pe);
01092 struct ibv_mr *key=METADATAFIELD(packet->buf)->key;
01093 MACHSTATE3(3,"Barrier packet to %d size %d wr_id %d",node->infiData->nodeNo,size,packet->wr.wr_id);
01094 EnqueuePacket(node,packet,size,key);
01095 }
01096
01097
01098 static void recvBarrierMessage() {
01099 int i;
01100 int ne;
01101
01102 struct ibv_wc wc[1];
01103 struct ibv_wc *recvWC;
01104
01105 int toBuffer=1;
01106 int barrierReached=0;
01107 struct infiBuffer *buffer = NULL;
01108 struct infiPacketHeader *header = NULL;
01109 int nodeNo=-1;
01110 int len=-1;
01111 int count=0;
01112 MACHSTATE(3,"recvBarrierMessage 0");
01113 while(!barrierReached) {
01114
01115 pollCq(toBuffer,context->sendCq);
01116 ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
01117 if(ne!=0){
01118 MACHSTATE1(3,"recvBarrier ne %d",ne);
01119 CmiAssert(ne>0);
01120 }
01121 for(i=0;i<ne;i++){
01122 if(wc[i].status != IBV_WC_SUCCESS){
01123 MACHSTATE3(3,"wc[%i].status=%i (%s)",i,wc[i].status,ibv_wc_status_str(wc[i].status));
01124 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc[i].wr_id,wc[i].qp_num,wc[i].vendor_err);
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135 CmiAbort("wc.status !=IBV_WC_SUCCESS");
01136 }
01137 switch(wc[i].opcode){
01138 case IBV_WC_RECV:
01139 MACHSTATE(3," IN HERE !!!!!!!!!!");
01140 recvWC=&wc[i];
01141
01142 buffer = (struct infiBuffer *) recvWC->wr_id;
01143 header = (struct infiPacketHeader *)(buffer->buf + 40);
01144
01145 nodeNo = header->nodeNo;
01146 len = recvWC->byte_len-sizeof(struct infiPacketHeader);
01147 if(header->code & INFIPACKETCODE_DATA){
01148 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
01149 } else if(header->code & INFIDUMMYPACKET){
01150 MACHSTATE(3,"Dummy packet");
01151 } else if(header->code & INFIBARRIERPACKET){
01152 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);
01153 barrierReached=1;
01154 }else
01155 MACHSTATE2(3,"Ups... %d %d",header->code,nodeNo);
01156 {
01157 struct ibv_sge list = {
01158 .addr = (uintptr_t) buffer->buf,
01159 .length = buffer->size,
01160 .lkey = buffer->key->lkey
01161 };
01162
01163 struct ibv_recv_wr wr = {
01164 .wr_id = (uint64_t)buffer,
01165 .sg_list = &list,
01166 .num_sge = 1,
01167 .next = NULL
01168 };
01169 struct ibv_recv_wr *bad_wr;
01170
01171 CmiAssert(ibv_post_recv(context->qp,&wr,&bad_wr)==0);
01172
01173 }
01174 break;
01175 default:
01176 CmiAbort("Wrong type of work completion object in recvq");
01177 break;
01178 }
01179 }
01180 }
01181
01182
01183 }
01184
01185
01186
01187
01188
01189 int CmiBarrier() {
01190 int len, size, i;
01191 int status;
01192 int count = 0;
01193 OtherNode node;
01194 int numnodes = CmiNumNodes();
01195 MACHSTATE1(3,"Barrier 1 rank=%i",CmiMyRank());
01196 if (CmiMyRank() == 0) {
01197 if (CmiMyNode() != 0) {
01198
01199 MACHSTATE(3,"Barrier sendmsg");
01200 sendBarrierMessage(0);
01201 recvBarrierMessage();
01202 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
01203 int p = CmiMyNode();
01204 p = BROADCAST_SPANNING_FACTOR*p + i;
01205 if (p > numnodes - 1) break;
01206 p = p%numnodes;
01207
01208 sendBarrierMessage(p);
01209 }
01210 } else {
01211 MACHSTATE(3,"Barrier else");
01212 for (count = 1; count < numnodes; count ++) {
01213 recvBarrierMessage();
01214 }
01215
01216 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
01217 int p = i;
01218 if (p > numnodes - 1) break;
01219
01220 sendBarrierMessage(p);
01221 }
01222 }
01223 MACHSTATE(3,"Barrier 3");
01224 }
01225 MACHSTATE(3,"Barrier 4");
01226 CmiNodeAllBarrier();
01227
01228
01229 MACHSTATE(3,"Barrier e");
01230 }
01231
01232
01233
01234 int CmiBarrierZero() {
01235 int i;
01236
01237 if (CmiMyRank() == 0) {
01238 if (CmiMyNode()) {
01239 sendBarrierMessage(0);
01240 } else {
01241 for (i=0; i<CmiNumNodes()-1; i++) {
01242 recvBarrierMessage();
01243 }
01244 }
01245 }
01246 CmiNodeAllBarrier();
01247
01248 }
01249
01250
01251 void createqp(struct ibv_device *dev){
01252
01253 context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
01254 CmiAssert(context->sendCq != NULL);
01255 MACHSTATE1(3,"sendCq created %p",context->sendCq);
01256
01257 context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
01258 CmiAssert(context->recvCq != NULL);
01259 MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
01260
01261 {
01262 struct ibv_qp_init_attr attr = {
01263 .qp_context = context->context,
01264 .qp_type = IBV_QPT_UD,
01265 .send_cq = context->sendCq,
01266 .recv_cq = context->recvCq,
01267 .srq = NULL,
01268 .sq_sig_all=0,
01269 .cap = {
01270 .max_send_wr = context->sendCqSize,
01271 .max_recv_wr = context->recvCqSize,
01272 .max_send_sge = 1,
01273 .max_recv_sge = 1,
01274 },
01275 };
01276 context->qp = ibv_create_qp(context->pd,&attr);
01277 CmiAssert(context->qp != NULL);
01278 MACHSTATE1(3,"qp created %p",context->qp);
01279 }
01280 {
01281 struct ibv_qp_attr attr;
01282 attr.qp_state = IBV_QPS_INIT;
01283 attr.pkey_index = 0;
01284 attr.port_num = context->ibPort;
01285 attr.qkey = 0x11111111;
01286 if(ibv_modify_qp(context->qp, &attr,
01287 IBV_QP_STATE |
01288 IBV_QP_PKEY_INDEX |
01289 IBV_QP_PORT |
01290 IBV_QP_QKEY))
01291 CmiAbort("Could not modify QP to INIT");
01292 }
01293 {
01294 struct ibv_qp_attr attr;
01295 attr.qp_state = IBV_QPS_RTR;
01296 if(ibv_modify_qp(context->qp, &attr, IBV_QP_STATE))
01297 CmiAbort("Could not modify QP to RTR");
01298 }
01299 {
01300 struct ibv_qp_attr attr;
01301 attr.qp_state = IBV_QPS_RTS;
01302 attr.sq_psn=context->localAddr.psn;
01303 if(ibv_modify_qp(context->qp, &attr, IBV_QP_STATE|IBV_QP_SQ_PSN))
01304 CmiAbort("Could not modify QP to RTS");
01305 }
01306
01307 context->localAddr.lid=getLocalLid(context->context,context->ibPort);
01308 context->localAddr.qpn = context->qp->qp_num;
01309 context->localAddr.psn = lrand48() & 0xffffff;
01310
01311 MACHSTATE3(4,"qp information (lid=%i qpn=%i psn=%i)",context->localAddr.lid,context->localAddr.qpn,context->localAddr.psn);
01312 }
01313
01314 void createah() {
01315 int i,numnodes;
01316
01317 numnodes=_Cmi_numnodes;
01318 context->ah=(struct ibv_ah **)malloc(sizeof(struct ibv_ah *)*numnodes);
01319
01320 for(i=0;i<numnodes;i++) {
01321
01322 {
01323 struct ibv_ah_attr ah_attr = {
01324 .is_global = 0,
01325 .dlid = nodes[i].infiData->qp.lid,
01326 .sl = 0,
01327 .src_path_bits = 0,
01328 .port_num = context->ibPort,
01329 };
01330 context->ah[i]=ibv_create_ah(context->pd,&ah_attr);
01331 CmiAssert(context->ah[i]!=0);
01332 MACHSTATE2(4,"ah for node %i lid=%i ",i,ah_attr.dlid);
01333 }
01334
01335 }
01336 }
01337
01338
01339 void CmiMachineInit(char **argv)
01340 {
01341 int i;
01342 int calcmaxsize;
01343 int lid;
01344
01345 MACHSTATE(3,"CmiMachineInit {");
01346 MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
01347 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
01348
01349
01350 firstBinSize = 120;
01351 blockThreshold=8;
01352 blockAllocRatio=16;
01353
01354 mtu_size=1200;
01355 packetsize = mtu_size*4;
01356 Cmi_dgram_max_data=packetsize-sizeof(struct infiPacketHeader);
01357 CmiAssert(Cmi_dgram_max_data>1);
01358
01359 calcmaxsize=8000;
01360
01361 maxrecvbuffers=calcmaxsize;
01362 maxtokens = calcmaxsize;
01363
01364 initInfiCmiChunkPools();
01365
01366 ibud.devlist = ibv_get_device_list(NULL);
01367 CmiAssert(ibud.devlist != NULL);
01368
01369 ibud.dev = *(ibud.devlist);
01370 CmiAssert(ibud.dev != NULL);
01371
01372 MACHSTATE1(3,"device name %s",ibv_get_device_name(ibud.dev));
01373
01374 context = (struct infiContext *)malloc(sizeof(struct infiContext));
01375
01376 MACHSTATE1(3,"context allocated %p",context);
01377
01378 context->sendCqSize = 2;
01379 context->recvCqSize = calcmaxsize+1;
01380 context->ibPort = 1;
01381 context->context = ibv_open_device(ibud.dev);
01382 CmiAssert(context->context != NULL);
01383
01384 MACHSTATE1(3,"device opened %p",context->context);
01385
01386 context->pd = ibv_alloc_pd(context->context);
01387 CmiAssert(context->pd != NULL);
01388
01389 context->header.nodeNo = _Cmi_mynode;
01390
01391 if(_Cmi_numnodes>1) {
01392 createqp(ibud.dev);
01393
01394 }
01395
01396 MACHSTATE(3,"} CmiMachineInit");
01397 }
01398
01399 void CmiCommunicationInit(char **argv) {
01400 MACHSTATE(3,"CmiCommunicationInit {");
01401 if(_Cmi_numnodes>1) {
01402 infiPostInitialRecvs();
01403 createah();
01404 }
01405 MACHSTATE(3,"} CmiCommunicationInit");
01406 }
01407
01408 void CmiMachineExit()
01409 {
01410 ibv_destroy_qp(context->qp);
01411 ibv_dealloc_pd(context->pd);
01412 ibv_close_device(context->context);
01413 ibv_free_device_list(ibud.devlist);
01414 }
01415