00001
00025 #ifndef ALIGN8
00026 #define ALIGN8(x) (int)((~7)&((x)+7))
00027 #endif
00028
00029
00030 #define MAX_PINNED_MEMORY 100000000
00031
00032
00033 #define WASTE_TIME 600
00034
00035 #define CC_POST_CHECK(routine,args,nodeTo) {\
00036 int retry=10000000; \
00037 while (ammasso_check_post_err(routine args, #routine, __LINE__, &nWR, nodeTo, retry) == 1) { \
00038 int i; \
00039 retry += WASTE_TIME; \
00040 for (i=0; i<=WASTE_TIME; ++i) { retry --; } \
00041 } \
00042 }
00043
00044 #define CC_CHECK(routine,args) \
00045 ammasso_check_err(routine args, #routine, __LINE__);
00046
00047 static void ammasso_check_err(cc_status_t returnCode,const char *routine,int line) {
00048 if (returnCode!=CC_OK) {
00049 char buf[128];
00050 char *errMsg = cc_status_to_string(returnCode);
00051
00052
00053 cc_rnic_close(contextBlock->rnic);
00054
00055
00056 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00057 MACHSTATE2(5," Description: %d, %s\n",returnCode,errMsg);
00058 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00059 " Description: %d, %s\n", routine, __FILE__, line,returnCode,errMsg);
00060 CmiAbort(buf);
00061 }
00062 }
00063
00064
00065
00066 static int ammasso_check_post_err(cc_status_t returnCode,const char *routine,int line, int *nWR, int nodeTo, int retry) {
00067 if (returnCode == CCERR_TOO_MANY_WRS_POSTED && *nWR != 1 && retry>0) {
00068 cc_wc_t wc;
00069
00070 while (cc_cq_poll(contextBlock->rnic, nodes[nodeTo].send_cq, &wc) == CC_OK) {
00071 MACHSTATE1(5, "Error posting send request - INFO: Send completed with node %d... now waiting for acknowledge...", nodeTo);
00072 }
00073 MACHSTATE(5, "Error posting send request - Retrying...");
00074 return 1;
00075 }
00076
00077 if (returnCode != CC_OK || *nWR != 1) {
00078 char buf[128];
00079 char *errMsg = cc_status_to_string(returnCode);
00080
00081
00082 cc_rnic_close(contextBlock->rnic);
00083
00084
00085 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00086 MACHSTATE3(5," Description: %d, %s (nWR = %d)\n",returnCode,errMsg,nWR);
00087 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00088 " Description: %d, %s (nWR = %d)\n", routine, __FILE__, line,returnCode,errMsg,*nWR);
00089 CmiAbort(buf);
00090 }
00091 return 0;
00092 }
00093
00094
00096
00097
00098 void CmiMachineInit(char **argv);
00099
00100 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00101
00102 void AmmassoDoIdle();
00103 void CmiNotifyIdle(void);
00104 static CmiIdleState* CmiNotifyGetState(void);
00105 static void CmiNotifyBeginIdle(CmiIdleState *s);
00106 static void CmiNotifyStillIdle(CmiIdleState *s);
00107
00108 void sendAck(OtherNode node);
00109 AmmassoToken *getQPSendToken(OtherNode node);
00110 int sendDataOnQP(char* data, int len, OtherNode node, char flags);
00111 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy);
00112 static void CommunicationServer(int withDelayMs, int where);
00113 void CmiMachineExit();
00114
00115 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *eventRecord, void *cb);
00116 void CheckRecvBufForMessage(OtherNode node);
00117
00118
00119
00120 void CmiAmmassoOpenQueuePairs();
00121
00122 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from);
00123 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from);
00124
00125 OtherNode getNodeFromQPId(cc_qp_id_t qp_id);
00126 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp);
00127
00128 void establishQPConnection(OtherNode node, int reuseQPFlag);
00129 void reestablishQPConnection(OtherNode node);
00130 void closeQPConnection(OtherNode node, int destroyQPFlag);
00131
00132 void BufferAlloc(int n);
00133 void TokenAlloc(int n);
00134 void RequestTokens(OtherNode node, int n);
00135 void GrantTokens(OtherNode node, int n);
00136 void RequestReleaseTokens(OtherNode node, int n);
00137 void ReleaseTokens(OtherNode node, int n);
00138
00140
00141
00142
00143 void AllocatorCheck () {
00144 int i, limit;
00145 char buf[24];
00146 for (i=0; i<contextBlock->numNodes; ++i) {
00147 if (i==contextBlock->myNode) continue;
00148 limit = nodes[i].num_sendTokens - nodes[i].max_used_tokens - 10;
00149 CmiPrintf("[%d] AllocatorCheck called: node %d, limit %d\n",CmiMyPe(),i,limit);
00150 if (limit > 0) {
00151 ReleaseTokens(&nodes[i], limit);
00152 CmiPrintf("[%d] Releasing %d tokens to %d\n", CmiMyPe(), limit, i);
00153 nodes[i].max_used_tokens = 0;
00154 }
00155 }
00156 }
00157
00158
00159 void BufferAlloc(int n) {
00160 int i;
00161 char buf[128];
00162 cc_stag_index_t newStagIndex;
00163 AmmassoBuffer *newBuffers;
00164
00165 MACHSTATE1(3, "Allocating %d new Receive Buffers",n);
00166
00167
00168 newBuffers = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00169
00170 if (newBuffers == NULL) {
00171
00172
00173 cc_rnic_close(contextBlock->rnic);
00174
00175
00176 MACHSTATE(5, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00177 sprintf(buf, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00178 CmiAbort(buf);
00179 }
00180
00181 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00182 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00183 CC_ADDR_TYPE_VA_BASED,
00184 (cc_byte_t*)newBuffers,
00185 n*sizeof(AmmassoBuffer),
00186 contextBlock->pd_id,
00187 0, 0,
00188 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
00189 &newStagIndex)
00190 );
00191
00192 for (i=0; i<n; ++i) {
00193 newBuffers[i].tail.length = 0;
00194 newBuffers[i].next = &(newBuffers[i+1]);
00195 newBuffers[i].stag = newStagIndex;
00196 }
00197 newBuffers[n-1].next = NULL;
00198 if (contextBlock->freeRecvBuffers == NULL) {
00199 contextBlock->freeRecvBuffers = newBuffers;
00200 } else {
00201 contextBlock->last_freeRecvBuffers->next = newBuffers;
00202 }
00203 contextBlock->last_freeRecvBuffers = &newBuffers[n-1];
00204 contextBlock->num_freeRecvBuffers += n;
00205 }
00206
00207 void TokenAlloc(int n) {
00208 int i;
00209 char buf[128];
00210 cc_stag_index_t newStagIndex;
00211 AmmassoToken *sendToken, *tokenScanner;
00212 cc_data_addr_t *sendSgl;
00213 AmmassoBuffer *sendBuffer;
00214
00215 MACHSTATE1(3, "Allocating %d new Tokens",n);
00216
00217
00218 sendBuffer = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00219
00220 if (sendBuffer == NULL) {
00221
00222
00223 cc_rnic_close(contextBlock->rnic);
00224
00225
00226 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00227 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00228 CmiAbort(buf);
00229 }
00230
00231 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00232 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00233 CC_ADDR_TYPE_VA_BASED,
00234 (cc_byte_t*)sendBuffer,
00235 n*sizeof(AmmassoBuffer),
00236 contextBlock->pd_id,
00237 0, 0,
00238 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
00239 &newStagIndex)
00240 );
00241
00242
00243 sendToken = (AmmassoToken*) CmiAlloc(n*ALIGN8(sizeof(AmmassoToken)));
00244
00245 if (sendToken == NULL) {
00246
00247
00248 cc_rnic_close(contextBlock->rnic);
00249
00250
00251 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00252 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00253 CmiAbort(buf);
00254 }
00255
00256 sendSgl = (cc_data_addr_t*) CmiAlloc(n*ALIGN8(sizeof(cc_data_addr_t)));
00257
00258 if (sendSgl == NULL) {
00259
00260
00261 cc_rnic_close(contextBlock->rnic);
00262
00263
00264 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00265 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00266 CmiAbort(buf);
00267 }
00268
00269 tokenScanner = sendToken;
00270 for (i=0; i<n; ++i) {
00271 sendSgl->stag = newStagIndex;
00272 sendSgl->length = AMMASSO_BUFSIZE + sizeof(Tailer);
00273 sendSgl->to = (unsigned long)&(sendBuffer[i]);
00274 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
00275 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
00276 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
00277 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = sendSgl;
00278 tokenScanner->wr.signaled = 1;
00279 tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
00280 LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
00281 sendSgl = (cc_data_addr_t*)(((char*)sendSgl)+ALIGN8(sizeof(cc_data_addr_t)));
00282 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
00283 }
00284 }
00285
00286 void RequestTokens(OtherNode node, int n) {
00287 char buf[24];
00288 *((int*)buf) = n;
00289 sendDataOnQP(buf, sizeof(int), node, AMMASSO_MOREBUFFERS);
00290 }
00291
00292 void GrantTokens(OtherNode node, int n) {
00293 int i;
00294 char *buf;
00295 AmmassoBuffer *buffer;
00296 AmmassoBuffer *prebuffer;
00297 AmmassoTokenDescription *tokenDesc;
00298 if (node->pending != NULL) return;
00299 if (n*sizeof(AmmassoTokenDescription) + sizeof(int) > AMMASSO_BUFSIZE) {
00300 n = (AMMASSO_BUFSIZE-sizeof(int)) / sizeof(AmmassoTokenDescription);
00301 }
00302 if (contextBlock->num_freeRecvBuffers < n) {
00303 int quantity = (n - contextBlock->num_freeRecvBuffers + 1023) & (~1023);
00304 BufferAlloc(quantity);
00305 }
00306 buf = (char*) CmiAlloc(n*sizeof(AmmassoTokenDescription) + sizeof(int));
00307 *((int*)buf) = n;
00308 tokenDesc = (AmmassoTokenDescription*)(buf+sizeof(int));
00309 buffer = contextBlock->freeRecvBuffers;
00310 for (i=0; i<n; ++i) {
00311 tokenDesc[i].stag = buffer->stag;
00312 tokenDesc[i].to = (unsigned long)buffer;
00313 prebuffer = buffer;
00314 buffer = buffer->next;
00315 }
00316 node->pending = contextBlock->freeRecvBuffers;
00317 node->last_pending = prebuffer;
00318 node->num_pending = n;
00319 prebuffer->next = NULL;
00320 contextBlock->num_freeRecvBuffers -= n;
00321 contextBlock->freeRecvBuffers = buffer;
00322 sendDataOnQP(buf, n*sizeof(AmmassoTokenDescription) + sizeof(int), node, AMMASSO_ALLOCATE);
00323 CmiFree(buf);
00324 }
00325
00326 void RequestReleaseTokens(OtherNode node, int n) {
00327 char buf[24];
00328 *((int*)buf) = n;
00329 sendDataOnQP(buf, sizeof(int), node, AMMASSO_RELEASE);
00330 }
00331
00332 void ReleaseTokens(OtherNode node, int n) {
00333 int i, nWR;
00334 AmmassoToken *token;
00335 AmmassoBuffer *tokenBuf;
00336 cc_data_addr_t *tokenSgl;
00337
00338 if (node->num_sendTokens < n) n = node->num_sendTokens - 1;
00339 if (n <= 0) return;
00340 token = node->sendTokens;
00341 tokenBuf = token->localBuf;
00342
00343 tokenBuf->tail.length = 1;
00344 tokenBuf->tail.ack = 0;
00345 tokenBuf->tail.flags = AMMASSO_RELEASED;
00346
00347
00348 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
00349 tokenSgl->length = sizeof(Tailer);
00350 tokenSgl->to = (unsigned long)&tokenBuf->tail;
00351 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00352
00353 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00354
00355 if (contextBlock->freeTokens == NULL) {
00356 contextBlock->freeTokens = node->sendTokens;
00357 } else {
00358 contextBlock->last_freeTokens->next = node->sendTokens;
00359 }
00360 for (i=1; i<n; ++i) token = token->next;
00361 contextBlock->last_freeTokens = token;
00362 node->sendTokens = token->next;
00363 token->next = NULL;
00364 contextBlock->num_freeTokens += n;
00365 node->num_sendTokens -= n;
00366 }
00367
00368
00369
00370
00371 void CmiMachineInit(char **argv) {
00372
00373 char buf[128];
00374 cc_status_t rtn;
00375
00376 AMMASSO_STATS_INIT
00377
00378 AMMASSO_STATS_START(MachineInit)
00379
00380 MACHSTATE(2, "CmiMachineInit() - INFO: (***** Ammasso Specific*****) - Called... Initializing RNIC...");
00381 MACHSTATE1(1, "CmiMachineInit() - INFO: Cmi_charmrun_pid = %d", Cmi_charmrun_pid);
00382
00383
00384
00385
00386
00387
00388 if (contextBlock != NULL) {
00389 MACHSTATE(5, "CmiMachineInit() - ERROR: contextBlock != NULL");
00390 sprintf(buf, "CmiMachineInit() - ERROR: contextBlock != NULL");
00391 CmiAbort(buf);
00392 }
00393 contextBlock = (mycb_t*)malloc(sizeof(mycb_t));
00394 if (contextBlock == NULL) {
00395 MACHSTATE(5, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00396 sprintf(buf, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00397 CmiAbort(buf);
00398 }
00399
00400
00401 memset(contextBlock, 0, sizeof(mycb_t));
00402 contextBlock->rnic = -1;
00403
00404 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-OPEN_RNIC)");
00405
00406
00407 if (Cmi_charmrun_pid != 0) {
00408
00409
00410
00411
00412 rtn = cc_rnic_open(0, CC_PBL_PAGE_MODE, contextBlock, &(contextBlock->rnic));
00413 if (rtn != CC_OK) {
00414 MACHSTATE2(5, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00415 sprintf(buf, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00416 CmiAbort(buf);
00417 }
00418
00419 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-SET-ASYNC-HANDLER)");
00420
00421
00422 CC_CHECK(cc_eh_set_async_handler,(contextBlock->rnic, AsynchronousEventHandler, contextBlock));
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-PD-ALLOC)");
00433
00434
00435 CC_CHECK(cc_pd_alloc,(contextBlock->rnic, &(contextBlock->pd_id)));
00436
00437 MACHSTATE(1, "CmiMachineInit() - INFO: RNIC Open For Business!!!");
00438
00439 } else {
00440
00441
00442 contextBlock->rnic = -1;
00443 }
00444
00445 MACHSTATE(2, "CmiMachineInit() - INFO: Completed Successfully !!!");
00446
00447 AMMASSO_STATS_END(MachineInit)
00448 }
00449
00450 void CmiCommunicationInit(char **argv)
00451 {
00452 }
00453
00454 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port) {
00455
00456
00457
00458
00459
00460
00461 MACHSTATE1(2, "CmiNodeAddressesStoreHandler() - INFO: pe = %d", pe);
00462 MACHSTATE1(1, " addr = { sin_family = %d,", addr->sin_family);
00463 MACHSTATE1(1, " sin_port = %d,", addr->sin_port);
00464 MACHSTATE4(1, " sin_addr.s_addr = %d.%d.%d.%d }", (addr->sin_addr.s_addr & 0xFF), ((addr->sin_addr.s_addr >> 8) & 0xFF), ((addr->sin_addr.s_addr >> 16) & 0xFF), ((addr->sin_addr.s_addr >> 24) & 0xFF));
00465 MACHSTATE1(1, " port = %d", port);
00466 }
00467
00468
00469 void AmmassoDoIdle() {
00470
00471 int i;
00472 cc_wc_t wc;
00473
00474 AMMASSO_STATS_START(AmmassoDoIdle)
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 for (i = 0; i < contextBlock->numNodes; i++) {
00485 if (i == contextBlock->myNode) continue;
00486 CheckRecvBufForMessage(&(nodes[i]));
00487 while (cc_cq_poll(contextBlock->rnic, nodes[i].send_cq, &wc) == CC_OK) {
00488 MACHSTATE1(3, "AmmassoDoIdle() - INFO: Send completed with node %d... now waiting for acknowledge...", i);
00489 }
00490 }
00491
00492 AMMASSO_STATS_END(AmmassoDoIdle)
00493 }
00494
00495 void CmiNotifyIdle(void) {
00496 AmmassoDoIdle();
00497 }
00498
00499 static CmiIdleState* CmiNotifyGetState(void) {
00500 return NULL;
00501 }
00502
00503 static void CmiNotifyBeginIdle(CmiIdleState *s) {
00504 AmmassoDoIdle();
00505 }
00506
00507 static void CmiNotifyStillIdle(CmiIdleState *s) {
00508 AmmassoDoIdle();
00509 }
00510
00511
00512
00513 void sendAck(OtherNode node) {
00514
00515 int nWR;
00516
00517 AMMASSO_STATS_START(sendAck)
00518
00519 MACHSTATE2(3, "sendAck() - Ammasso - INFO: Called... sending ACK %d to node %d", *node->remoteAck, node->myNode);
00520
00521 if (*node->remoteAck < ACK_MASK) {
00522
00523
00524 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00525
00526 } else {
00527
00528
00529
00530 AmmassoToken *token;
00531 AmmassoBuffer *tokenBuf;
00532 int tmp_ack = *node->remoteAck;
00533 *node->remoteAck = 0;
00534 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00535
00536 sleep(1);
00537
00538 token = getQPSendToken(node);
00539 tokenBuf = token->localBuf;
00540 tokenBuf->tail.ack = tmp_ack;
00541 tokenBuf->tail.flags = ACK_WRAPPING;
00542 tokenBuf->tail.length = 1;
00543 token->wr.wr_u.rdma_write.local_sgl.sge_list->length = sizeof(Tailer);
00544 token->wr.wr_u.rdma_write.local_sgl.sge_list->to = (unsigned long)&tokenBuf->tail;
00545 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00546 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00547 LIST_ENQUEUE(node->,usedTokens,token);
00548 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00549 *node->remoteAck = tmp_ack & ACK_MASK;
00550 }
00551
00552 node->messagesNotYetAcknowledged = 0;
00553
00554 AMMASSO_STATS_END(sendAck)
00555 }
00556
00557
00558
00559
00560
00561 AmmassoToken *getQPSendToken(OtherNode node) {
00562 AmmassoToken *token;
00563 int i;
00564 cc_wc_t wc;
00565 ammasso_ack_t newAck;
00566 while (node->connectionState != QP_CONN_STATE_CONNECTED ||
00567 node->sendTokens == NULL) {
00568
00569
00570
00571 MACHSTATE(3, "getQPSendBuffer() - INFO: No tokens available");
00572 if (*node->directAck > node->localAck) {
00573 newAck = *node->directAck;
00574 for (i=node->localAck; i<newAck; ++i) {
00575 LIST_DEQUEUE(node->,usedTokens,token);
00576 LIST_ENQUEUE(node->,sendTokens,token);
00577 }
00578 node->localAck = newAck;
00579 }
00580
00581 CheckRecvBufForMessage(node);
00582
00583 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00584 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00585 }
00586 }
00587 LIST_DEQUEUE(node->,sendTokens,token);
00588 return token;
00589 }
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 int sendDataOnQP(char* data, int len, OtherNode node, char flags) {
00683
00684 AmmassoToken *sendBufToken;
00685 AmmassoBuffer *tokenBuf;
00686 cc_data_addr_t *tokenSgl;
00687 int toSendLength;
00688 cc_wc_t wc;
00689 cc_uint32_t nWR;
00690 char isFirst = 1;
00691 char *origMsgStart = data;
00692 char *sendBufBegin;
00693
00694 int origSize = len;
00695
00696 if (origSize <= 1024) {
00697 AMMASSO_STATS_START(sendDataOnQP_1024)
00698 } else if (origSize <= 2048) {
00699 AMMASSO_STATS_START(sendDataOnQP_2048)
00700 } else if (origSize <= 4096) {
00701 AMMASSO_STATS_START(sendDataOnQP_4096)
00702 } else if (origSize <= 16384) {
00703 AMMASSO_STATS_START(sendDataOnQP_16384)
00704 } else {
00705 AMMASSO_STATS_START(sendDataOnQP_over)
00706 }
00707
00708 AMMASSO_STATS_START(sendDataOnQP)
00709
00710
00711
00712 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00713 MACHSTATE1(3, "sendDataOnQP() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00714 }
00715
00716
00717 MACHSTATE2(2, "sendDataOnQP() - Ammasso - INFO: Called (send to node %d, len = %d)...", node->myNode, len);
00718
00719
00720 CmiAssert(flags==0 || len<=AMMASSO_BUFSIZE);
00721
00722
00723
00724
00725
00726 while (len > 0) {
00727
00728 AMMASSO_STATS_START(sendDataOnQP_pre_send)
00729
00730
00731 sendBufToken = getQPSendToken(node);
00732 tokenBuf = sendBufToken->localBuf;
00733
00734
00735 LIST_ENQUEUE(node->,usedTokens,sendBufToken);
00736 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00737
00738
00739
00740
00741
00742
00743
00744
00745 if (isFirst) {
00746
00747 toSendLength = len > AMMASSO_BUFSIZE ? AMMASSO_BUFSIZE : len;
00748 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00749
00750 memcpy(sendBufBegin, data, toSendLength);
00751
00752 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending 1st Fragment - toSendLength = %d...", toSendLength);
00753
00754 } else {
00755
00756 toSendLength = len > (AMMASSO_BUFSIZE - DGRAM_HEADER_SIZE) ? AMMASSO_BUFSIZE : (len+DGRAM_HEADER_SIZE);
00757 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00758
00759 memcpy(sendBufBegin+DGRAM_HEADER_SIZE, data, toSendLength-DGRAM_HEADER_SIZE);
00760
00761
00762
00763
00764
00765
00766
00767
00768 memcpy(sendBufBegin, origMsgStart, DGRAM_HEADER_SIZE);
00769
00770 ((DgramHeader*)sendBufBegin)->seqno = node->send_next;
00771 node->send_next = ((node->send_next+1) & DGRAM_SEQNO_MASK);
00772
00773 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending Continuation Fragment - toSendLength = %d...", toSendLength);
00774 }
00775
00776
00777
00778 tokenBuf->tail.length = toSendLength;
00779 node->messagesNotYetAcknowledged = 0;
00780 tokenBuf->tail.ack = *node->remoteAck;
00781 if (*node->remoteAck > ACK_MASK) {
00782
00783 tokenBuf->tail.ack = 0;
00784
00785
00786
00787
00788
00789
00790 sendAck(node);
00791 }
00792 tokenBuf->tail.flags = flags;
00793
00794
00795 tokenSgl = sendBufToken->wr.wr_u.rdma_write.local_sgl.sge_list;
00796 tokenSgl->length = ALIGN8(toSendLength) + sizeof(Tailer);
00797 tokenSgl->to = (unsigned long)sendBufBegin;
00798 sendBufToken->wr.wr_u.rdma_write.remote_to = (unsigned long)(((char*)sendBufToken->remoteBuf)+AMMASSO_BUFSIZE-ALIGN8(toSendLength));
00799
00800
00801
00802 AMMASSO_STATS_END(sendDataOnQP_pre_send)
00803 AMMASSO_STATS_START(sendDataOnQP_send)
00804
00805 MACHSTATE(3, "sendDataOnQP() - Ammasso - INFO: Enqueuing RDMA Write WR...");
00806
00807 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tokenSgl->to = %p", tokenSgl->to);
00808 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: sendBufToken->wr.wr_u.rdma_write.remote_to = %p", sendBufToken->wr.wr_u.rdma_write.remote_to);
00809 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.ack = %d", tokenBuf->tail.ack);
00810 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.flags = %d", tokenBuf->tail.flags);
00811
00812 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &sendBufToken->wr, 1, &nWR),node->myNode);
00813
00814 MACHSTATE(1, "sendDataOnQP() - Ammasso - INFO: RDMA Write WR Enqueue Completed");
00815
00816 AMMASSO_STATS_END(sendDataOnQP_send)
00817 AMMASSO_STATS_START(sendDataOnQP_post_send)
00818
00819
00820 data += toSendLength;
00821 len -= toSendLength;
00822 if (isFirst == 0) {
00823 data -= DGRAM_HEADER_SIZE;
00824 len += DGRAM_HEADER_SIZE;
00825 }
00826 isFirst = 0;
00827
00828 AMMASSO_STATS_END(sendDataOnQP_post_send)
00829 }
00830
00831 AMMASSO_STATS_END(sendDataOnQP)
00832
00833 if (origSize <= 1024) {
00834 AMMASSO_STATS_END(sendDataOnQP_1024)
00835 } else if (origSize <= 2048) {
00836 AMMASSO_STATS_END(sendDataOnQP_2048)
00837 } else if (origSize <= 4096) {
00838 AMMASSO_STATS_END(sendDataOnQP_4096)
00839 } else if (origSize <= 16384) {
00840 AMMASSO_STATS_END(sendDataOnQP_16384)
00841 } else {
00842 AMMASSO_STATS_END(sendDataOnQP_over)
00843 }
00844
00845 }
00846
00847
00848
00849
00850
00851 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy) {
00852
00853 cc_status_t rtn;
00854 cc_stag_index_t stag;
00855 cc_data_addr_t sgl;
00856 cc_sq_wr_t wr;
00857 cc_uint32_t WRsPosted;
00858
00859 AMMASSO_STATS_START(DeliverViaNetwork)
00860
00861 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Called...");
00862
00863
00864
00865
00866
00867
00868
00869 AMMASSO_STATS_START(DeliverViaNetwork_post_lock)
00870
00871 DgramHeaderMake(msg->data, rank, msg->src, Cmi_charmrun_pid, otherNode->send_next, broot);
00872 otherNode->send_next = ((otherNode->send_next+1) & DGRAM_SEQNO_MASK);
00873
00874 MACHSTATE1(1, "DeliverViaNetwork() - INFO: Sending message to node %d", otherNode->myNode);
00875 MACHSTATE1(1, "DeliverViaNetwork() - INFO: rank %d", rank);
00876 MACHSTATE1(1, "DeliverViaNetwork() - INFO: broot %d", broot);
00877
00878 AMMASSO_STATS_START(DeliverViaNetwork_send)
00879
00880 sendDataOnQP(msg->data, msg->size, otherNode, 0);
00881
00882 AMMASSO_STATS_END(DeliverViaNetwork_send)
00883
00884
00885 MACHSTATE(1, "DeliverViaNetwork() - INFO: Post-send_next_lock");
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Completed.");
00938
00939 AMMASSO_STATS_END(DeliverViaNetwork_post_lock)
00940 AMMASSO_STATS_END(DeliverViaNetwork)
00941 }
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955 int CheckSocketsReady(int withDelayMs)
00956 {
00957 int nreadable;
00958 CMK_PIPE_DECL(withDelayMs);
00959
00960 CmiStdoutAdd(CMK_PIPE_SUB);
00961 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00962
00963 nreadable=CMK_PIPE_CALL();
00964 ctrlskt_ready_read = 0;
00965 dataskt_ready_read = 0;
00966 dataskt_ready_write = 0;
00967
00968 if (nreadable == 0) {
00969 MACHSTATE(2, "} CheckSocketsReady (nothing readable)");
00970 return nreadable;
00971 }
00972 if (nreadable==-1) {
00973 CMK_PIPE_CHECKERR();
00974 MACHSTATE(3, "} CheckSocketsReady (INTERRUPTED!)");
00975 return CheckSocketsReady(0);
00976 }
00977 CmiStdoutCheck(CMK_PIPE_SUB);
00978 if (Cmi_charmrun_fd!=-1)
00979 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00980 MACHSTATE(3, "} CheckSocketsReady");
00981 return nreadable;
00982 }
00983
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995 static void ServiceCharmrun_nolock() {
00996
00997 int again = 1;
00998
00999 MACHSTATE(2, "ServiceCharmrun_nolock begin {");
01000
01001 while (again) {
01002 again = 0;
01003
01004 CheckSocketsReady(0);
01005 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
01006 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
01007 }
01008
01009 MACHSTATE(2, "} ServiceCharmrun_nolock end");
01010 }
01011
01012 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from) {
01013
01014 int nodeIndex, ctrlType, i, n, nWR;
01015 AmmassoToken *token, *pretoken;
01016 AmmassoBuffer *tokenBuf;
01017 cc_data_addr_t *tokenSgl;
01018 OtherNode node;
01019 AmmassoTokenDescription *tokenDesc;
01020
01021 AMMASSO_STATS_START(processAmmassoControlMessage)
01022
01023
01024
01025
01026 switch (tail->flags) {
01027
01028 case AMMASSO_READY:
01029
01030
01031 contextBlock->nodeReadyCount--;
01032 MACHSTATE1(3, "processAmmassoControlMessage() - Ammasso - INFO: Received READY packet... still waiting for %d more...", contextBlock->nodeReadyCount);
01033
01034 break;
01035
01036 case AMMASSO_ALLOCATE:
01037
01038 token = getQPSendToken(from);
01039 tokenBuf = token->localBuf;
01040
01041 tokenBuf->tail.length = 1;
01042 tokenBuf->tail.ack = 0;
01043 tokenBuf->tail.flags = AMMASSO_ALLOCATED;
01044
01045
01046 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
01047 tokenSgl->length = sizeof(Tailer);
01048 tokenSgl->to = (unsigned long)&tokenBuf->tail;
01049 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
01050
01051 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
01052
01053 LIST_ENQUEUE(from->,usedTokens,token);
01054 from->max_used_tokens = (from->num_usedTokens>from->max_used_tokens)?from->num_usedTokens:from->max_used_tokens;
01055
01056
01057 n = *((int*)msg);
01058 if (contextBlock->num_freeTokens < n) {
01059 int quantity = (n - contextBlock->num_freeTokens + 1023) & (~1023);
01060 BufferAlloc(quantity);
01061 }
01062 token = contextBlock->freeTokens;
01063 tokenDesc = (AmmassoTokenDescription*)(msg+sizeof(int));
01064 for (i=0; i<n; ++i) {
01065 token->remoteBuf = (AmmassoBuffer*)tokenDesc[i].to;
01066 token->wr.wr_u.rdma_write.remote_stag = tokenDesc[i].stag;
01067 pretoken = token;
01068 token = token->next;
01069 }
01070 from->last_usedTokens->next = contextBlock->freeTokens;
01071 from->last_usedTokens = pretoken;
01072 pretoken->next = NULL;
01073 contextBlock->freeTokens = token;
01074 from->num_usedTokens += n;
01075 contextBlock->num_freeTokens -= n;
01076
01077 break;
01078
01079 case AMMASSO_ALLOCATED:
01080
01081
01082
01083 from->last_recv_buf->next = from->pending;
01084 from->last_recv_buf = from->last_pending;
01085 from->last_pending = NULL;
01086 from->num_recv_buf += from->num_pending;
01087 *from->remoteAck += from->num_pending;
01088 from->num_pending = 0;
01089
01090 break;
01091
01092 case AMMASSO_MOREBUFFERS:
01093
01094
01095 n = *((int*)msg);
01096 GrantTokens(from, n);
01097
01098
01099 break;
01100
01101 case AMMASSO_RELEASE:
01102
01103
01104
01105
01106 break;
01107
01108 case AMMASSO_RELEASED:
01109
01110
01111 from->secondLastRecvBuf->next = NULL;
01112 tokenBuf = from->last_recv_buf;
01113 from->last_recv_buf = from->secondLastRecvBuf;
01114 from->num_recv_buf--;
01115 LIST_ENQUEUE(contextBlock->,freeRecvBuffers,tokenBuf);
01116 n = *((int*)msg);
01117 for (i=1; i<n; ++i) {
01118 LIST_DEQUEUE(from->,recv_buf,tokenBuf);
01119 LIST_ENQUEUE(contextBlock->,freeRecvBuffers,tokenBuf);
01120 }
01121
01122 break;
01123
01124 case ACK_WRAPPING:
01125
01126
01127 from->localAck &= ACK_MASK;
01128
01129 break;
01130
01131 default:
01132 MACHSTATE1(5, "processAmmassoControlMessage() - Ammasso -INFO: Received control message with invalid flags: %d", tail->flags);
01133 CmiAbort("Invalid control message received");
01134 }
01135
01136 AMMASSO_STATS_END(processAmmassoControlMessage)
01137 }
01138
01139 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from) {
01140
01141 int rank, srcPE, seqNum, magicCookie, size, i;
01142 unsigned int broot;
01143 unsigned char checksum;
01144 OtherNode fromNode;
01145 char *newMsg;
01146 int needAck;
01147
01148 AMMASSO_STATS_START(ProcessMessage)
01149
01150 MACHSTATE(2, "ProcessMessage() - INFO: Called...");
01151 MACHSTATE2(1, "ProcessMessage() - INFO: tail - ack=%d, flags=%d", tail->ack, tail->flags);
01152
01153
01154
01155 if (2*from->messagesNotYetAcknowledged > from->num_recv_buf) {
01156 needAck = 1;
01157 } else {
01158 needAck = 0;
01159 }
01160
01161
01162
01163
01164 if (tail->ack > from->localAck) {
01165 AmmassoToken *token;
01166 for (i=from->localAck; i<tail->ack; ++i) {
01167 LIST_DEQUEUE(from->,usedTokens,token);
01168 LIST_ENQUEUE(from->,sendTokens,token);
01169 }
01170 from->localAck = tail->ack;
01171 }
01172
01173 {
01174 MACHSTATE1(1, "ProcessMessage() - INFO: msg = %p", msg);
01175 int j;
01176 for (j = 0; j < DGRAM_HEADER_SIZE + 24; j++) {
01177 MACHSTATE2(1, "ProcessMessage() - INFO: msg[%d] = %02x", j, msg[j]);
01178 }
01179 }
01180
01181 if (tail->flags != 0) {
01182 processAmmassoControlMessage(msg, len, tail, from);
01183 return needAck;
01184 }
01185
01186
01187 DgramHeaderBreak(msg, rank, srcPE, magicCookie, seqNum, broot);
01188
01189 MACHSTATE(1, "ProcessMessage() - INFO: Message Contents:");
01190 MACHSTATE1(1, " rank = %d", rank);
01191 MACHSTATE1(1, " srcPE = %d", srcPE);
01192 MACHSTATE1(1, " magicCookie = %d", magicCookie);
01193 MACHSTATE1(1, " seqNum = %d", seqNum);
01194 MACHSTATE1(1, " broot = %d", broot);
01195
01196 #ifdef CMK_USE_CHECKSUM
01197
01198
01199 checksum = computeCheckSum(msg, len);
01200 if (checksum != 0) {
01201 MACHSTATE1(5, "ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...", checksum);
01202 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with bad checksum (%d)... ignoring...\n", CmiMyPe(), checksum);
01203 return needAck;
01204 }
01205
01206 #else
01207
01208
01209 if (magicCookie != (Cmi_charmrun_pid & DGRAM_MAGIC_MASK)) {
01210 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...");
01211 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received message with a bad magic cookie... ignoring...\n", CmiMyPe());
01212
01213 {
01214 CmiPrintf("ProcessMessage() - INFO: Cmi_charmrun_pid = %d\n", Cmi_charmrun_pid);
01215 CmiPrintf("ProcessMessage() - INFO: node->recv_UseIndex = %d\n", nodes_by_pe[srcPE]->recv_UseIndex);
01216 CmiPrintf("ProcessMessage() - INFO: msg = %p\n", msg);
01217 int j;
01218 for (j = 0; j < DGRAM_HEADER_SIZE + 24; j++) {
01219 CmiPrintf("ProcessMessage() - INFO: msg[%d] = %02x\n", j, msg[j]);
01220 }
01221 }
01222 return needAck;
01223 }
01224
01225 #endif
01226
01227
01228 fromNode = nodes_by_pe[srcPE];
01229
01230 CmiAssert(fromNode == from);
01231
01232 MACHSTATE1(1, "ProcessMessage() - INFO: Message from node %d...", fromNode->myNode);
01233
01234
01235
01236
01237
01238
01239
01240
01241 if (seqNum == (fromNode->recv_expect)) {
01242
01243 fromNode->recv_expect = ((seqNum+1) & DGRAM_SEQNO_MASK);
01244 } else {
01245 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Received a message with a bad sequence number... ignoring...");
01246 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Received a message witha bad sequence number... ignoring...\n", CmiMyPe());
01247 return needAck;
01248 }
01249
01250
01251
01252
01253 newMsg = fromNode->asm_msg;
01254
01255
01256
01257
01258
01259 if (newMsg == NULL) {
01260
01261
01262 size = CmiMsgHeaderGetLength(msg);
01263 newMsg = (char*)CmiAlloc(size);
01264 _MEMCHECK(newMsg);
01265
01266
01267 if (len > size) {
01268 MACHSTATE2(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)", size, len);
01269 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch (size: %d != len: %d)\n", CmiMyPe(), size, len);
01270 CmiAbort("Message Size Mismatch");
01271 }
01272
01273
01274
01275 memcpy(newMsg, msg, len);
01276 fromNode->asm_rank = rank;
01277 fromNode->asm_total = size;
01278 fromNode->asm_fill = len;
01279 fromNode->asm_msg = newMsg;
01280
01281
01282 } else {
01283
01284 size = len - DGRAM_HEADER_SIZE;
01285
01286
01287
01288 if (fromNode->asm_fill + size > fromNode->asm_total) {
01289 MACHSTATE(5, "ProcessMessage() - Ammasso - ERROR: Message size mismatch");
01290 CmiPrintf("[%d] ProcessMessage() - Ammasso - ERROR: Message size mismatch", CmiMyPe());
01291 CmiAbort("Message Size Mismatch");
01292 }
01293
01294
01295 memcpy(newMsg + fromNode->asm_fill, msg + DGRAM_HEADER_SIZE, size);
01296 fromNode->asm_fill += size;
01297 }
01298
01299 MACHSTATE2(1, "ProcessMessage() - Ammasso - INFO: Message copied into asm_buf (asm_fill = %d, asm_total = %d)...", fromNode->asm_fill, fromNode->asm_total);
01300
01301
01302 if (fromNode->asm_fill == fromNode->asm_total) {
01303
01304 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Pushing message...");
01305
01306
01307 switch (rank) {
01308
01309 case DGRAM_BROADCAST:
01310
01311 MACHSTATE1(3, "ProcessMessage() - Ammasso - INFO: Broadcast - _Cmi_mynodesize = %d", _Cmi_mynodesize);
01312
01313
01314 for (i = 1; i < _Cmi_mynodesize; i++)
01315 CmiPushPE(i, CopyMsg(newMsg, fromNode->asm_total));
01316 CmiPushPE(0, newMsg);
01317 break;
01318
01319 #if CMK_NODE_QUEUE_AVAILABLE
01320 case DGRAM_NODEBROADCAST:
01321 case DGRAM_NODEMESSAGE:
01322 CmiPushNode(newMsg);
01323 break;
01324 #endif
01325
01326 default:
01327 CmiPushPE(rank, newMsg);
01328 }
01329
01330 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: NULLing asm_msg...");
01331
01332
01333 fromNode->asm_msg = NULL;
01334 }
01335
01336 MACHSTATE(1, "ProcessMessage() - Ammasso - INFO: Checking for re-broadcast");
01337
01338
01339 #if CMK_BROADCAST_SPANNING_TREE
01340
01341 if (rank == DGRAM_BROADCAST
01342 #if CMK_NODE_QUEUE_AVAILABLE
01343 || rank == DGRAM_NODEBROADCAST
01344 #endif
01345 ) SendSpanningChildren(NULL, 0, len, msg, broot, rank);
01346
01347 #elif CMK_BROADCAST_HYPERCUBE
01348
01349 if (rank == DGRAM_BROADCAST
01350 #if CMK_NODE_QUEUE_AVAILABLE
01351 || rank == DGRAM_NODEBROADCAST
01352 #endif
01353 ) {
01354 MACHSTATE(3, "ProcessMessage() - INFO: Calling SendHypercube()...");
01355 SendHypercube(NULL, 0, len, msg, broot, rank);
01356 }
01357 #endif
01358
01359 AMMASSO_STATS_END(ProcessMessage)
01360 return needAck;
01361 }
01362
01363
01364
01365
01366
01367
01368
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391
01392
01393
01394
01395
01396
01397
01398
01399
01400
01401
01402
01403
01404
01405
01406
01407
01408
01409
01410
01411
01412
01413
01414
01415
01416
01417
01418
01419
01420
01421
01422
01424
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473 static void CommunicationServer_nolock(int withDelayMs) {
01474
01475 int i;
01476
01477 MACHSTATE(2, "CommunicationServer_nolock start {");
01478
01479
01480
01481
01482
01483 for (i = 0; i < contextBlock->numNodes; i++) {
01484 if (i == contextBlock->myNode) continue;
01485
01486
01487 CheckRecvBufForMessage(&(nodes[i]));
01488 }
01489
01490 MACHSTATE(2, "}CommunicationServer_nolock end");
01491 }
01492
01493
01494
01495
01496
01497
01498
01499
01500 static void CommunicationServer(int withDelayMs, int where) {
01501
01503
01504
01505
01506 AMMASSO_STATS_START(CommunicationServer)
01507
01508 MACHSTATE2(2, "CommunicationServer(%d) from %d {",withDelayMs, where);
01509
01510
01511 if (where == 1) {
01512
01513
01514 if (!machine_initiated_shutdown)
01515 ServiceCharmrun_nolock();
01516
01517
01518 return;
01519 }
01520
01521 CmiCommLock();
01522 CommunicationServer_nolock(withDelayMs);
01523 CmiCommUnlock();
01524
01525 #if CMK_IMMEDIATE_MSG
01526 if (where == 0)
01527 CmiHandleImmediate();
01528 #endif
01529
01530 MACHSTATE(2,"} CommunicationServer");
01531
01532 AMMASSO_STATS_END(CommunicationServer)
01533 }
01534
01535
01536
01537
01538
01539
01540 void CmiMachineExit(void) {
01541
01542 char buf[128];
01543 cc_status_t rtn;
01544 int i;
01545
01546 MACHSTATE(2, "CmiMachineExit() - INFO: Called...");
01547
01548
01549 if (contextBlock->myNode)
01550 usleep(10000*contextBlock->myNode);
01551
01552 AMMASSO_STATS_DISPLAY(MachineInit)
01553
01554 AMMASSO_STATS_DISPLAY(AmmassoDoIdle)
01555
01556 AMMASSO_STATS_DISPLAY(DeliverViaNetwork)
01557 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_pre_lock)
01558 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_lock)
01559 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_post_lock)
01560 AMMASSO_STATS_DISPLAY(DeliverViaNetwork_send)
01561
01562 AMMASSO_STATS_DISPLAY(getQPSendBuffer)
01563 AMMASSO_STATS_DISPLAY(getQPSendBuffer_lock)
01564 AMMASSO_STATS_DISPLAY(getQPSendBuffer_CEH)
01565 AMMASSO_STATS_DISPLAY(getQPSendBuffer_loop)
01566
01567 AMMASSO_STATS_DISPLAY(sendDataOnQP)
01568 AMMASSO_STATS_DISPLAY(sendDataOnQP_pre_send)
01569 AMMASSO_STATS_DISPLAY(sendDataOnQP_send)
01570 AMMASSO_STATS_DISPLAY(sendDataOnQP_post_send)
01571
01572 AMMASSO_STATS_DISPLAY(sendDataOnQP_1024)
01573 AMMASSO_STATS_DISPLAY(sendDataOnQP_2048)
01574 AMMASSO_STATS_DISPLAY(sendDataOnQP_4096)
01575 AMMASSO_STATS_DISPLAY(sendDataOnQP_16384)
01576 AMMASSO_STATS_DISPLAY(sendDataOnQP_over)
01577
01578 AMMASSO_STATS_DISPLAY(AsynchronousEventHandler)
01579 AMMASSO_STATS_DISPLAY(CompletionEventHandler)
01580 AMMASSO_STATS_DISPLAY(ProcessMessage)
01581 AMMASSO_STATS_DISPLAY(processAmmassoControlMessage)
01582
01583 AMMASSO_STATS_DISPLAY(sendAck)
01584
01585
01586
01587
01588
01589
01590
01591 if (Cmi_charmrun_pid != 0 && contextBlock->rnic != -1) {
01592
01593
01594 for (i = 0; i < contextBlock->numNodes; i++) {
01595
01596
01597 closeQPConnection(nodes + i, 1);
01598
01599
01600 CmiDestroyLock(nodes[i].sendBufLock);
01601 CmiDestroyLock(nodes[i].send_next_lock);
01602 CmiDestroyLock(nodes[i].recv_expect_lock);
01603 }
01604
01605
01606
01608 rtn = cc_rnic_close(contextBlock->rnic);
01609 if (rtn != CC_OK) {
01610 MACHSTATE2(5, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
01611 sprintf(buf, "CmiMachineExit() - ERROR: Unable to close the RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
01612 CmiAbort(buf);
01613 }
01614
01615 MACHSTATE(2, "CmiMachineExit() - INFO: RNIC Closed.");
01616 }
01617 }
01618
01619
01620
01621 OtherNode getNodeFromQPId(cc_qp_id_t qp_id) {
01622
01623 int i;
01624
01625
01626
01627 for (i = 0; i < contextBlock->numNodes; i++)
01628 if (nodes[i].qp_id == qp_id)
01629 return (nodes + i);
01630
01631 return NULL;
01632 }
01633
01634 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp) {
01635
01636 int i;
01637
01638
01639
01640 for (i = 0; i < contextBlock->numNodes; i++)
01641 if (nodes[i].qp == qp)
01642 return (nodes + i);
01643
01644 return NULL;
01645 }
01646
01647
01648 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *er, void *cb) {
01649
01650 int nodeNumber, i;
01651 OtherNode node;
01652 cc_ep_handle_t connReqEP;
01653 cc_status_t rtn;
01654 char buf[64];
01655 cc_qp_modify_attrs_t modAttrs;
01656 AmmassoPrivateData *priv;
01657 AmmassoToken *token;
01658
01659 AMMASSO_STATS_START(AsynchronousEventHandler)
01660
01661 MACHSTATE2(2, "AsynchronousEventHandler() - INFO: Called... event_id = %d, \"%s\"", er->event_id, cc_event_id_to_string(er->event_id));
01662
01663
01664 if (er->rnic_handle != contextBlock->rnic) {
01665 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_handle != contextBlock->rnic");
01666 }
01667 if (er->rnic_user_context != contextBlock) {
01668 MACHSTATE(5, "AsynchronousEventHandler() - WARNING: er->rnic_user_context != contextBlock");
01669 }
01670
01671
01672 switch (er->event_id) {
01673
01674
01675 case CCAE_LLP_CLOSE_COMPLETE:
01676 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Closed.");
01677
01678
01679 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er->resource_indicator, CC_RES_IND_QP);
01680 node = getNodeFromQPId(er->resource_id.qp_id);
01681 if (node == NULL) {
01682 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
01683 break;
01684 }
01685
01686 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
01687 #if CMK_SHARED_VARS_UNAVAILABLE
01688 while (node->sendBufLock != 0) { usleep(1); }
01689 #endif
01690 CmiLock(node->sendBufLock);
01691
01692
01693 node->connectionState = QP_CONN_STATE_CONNECTION_CLOSED;
01694
01695 CmiUnlock(node->sendBufLock);
01696 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
01697
01698 break;
01699
01700
01701 case CCAE_CONNECTION_REQUEST:
01702
01703 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: Incomming Connection Request -> %s:%d",
01704 inet_ntoa(*(struct in_addr*) &(er->event_data.connection_request.laddr)),
01705 ntohs(er->event_data.connection_request.lport)
01706 );
01707
01708 connReqEP = er->event_data.connection_request.cr_handle;
01709
01710 priv = (AmmassoPrivateData*)er->event_data.connection_request.private_data;
01711
01712 nodeNumber = priv->node;
01713
01714 MACHSTATE1(3, "AsynchronousEventHandler() - INFO: Connection Request from node %d", nodeNumber);
01715
01716 if (nodeNumber < 0 || nodeNumber >= contextBlock->numNodes) {
01717
01718
01719 MACHSTATE1(1, "AsynchronousEventHandler() - WARNING: Unknown entity attempting to connect (node %d)... rejecting connection.", nodeNumber);
01720 cc_cr_reject(contextBlock->rnic, connReqEP);
01721
01722 } else {
01723
01724
01725 { int j;
01726 for (j = 0; j < 16; j++) {
01727 MACHSTATE2(3, " private_data[%d] = %02X", j, ((char*)er->event_data.connection_request.private_data)[j]);
01728 }
01729 }
01730
01731
01732
01733 for (i=0; i<(AMMASSO_INITIAL_BUFFERS/(contextBlock->numNodes-1)); ++i) {
01734 LIST_DEQUEUE(contextBlock->,freeTokens,token);
01735 token->wr.wr_u.rdma_write.remote_stag = priv->stag;
01736 token->wr.wr_u.rdma_write.remote_to = priv->to + (i * sizeof(AmmassoBuffer));
01737 token->remoteBuf = (AmmassoBuffer*)(priv->to + (i * sizeof(AmmassoBuffer)));
01738 LIST_ENQUEUE(nodes[nodeNumber].,sendTokens,token);
01739 }
01740
01741 nodes[nodeNumber].ack_sq_wr->wr_u.rdma_write.remote_to = priv->ack_to;
01742 nodes[nodeNumber].ack_sq_wr->wr_u.rdma_write.remote_stag = priv->stag;
01743
01744 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens starting from %p, stag = %d",priv->to,priv->stag);
01745
01746
01747 nodes[nodeNumber].cr = connReqEP;
01748
01749
01750 priv = (AmmassoPrivateData*)buf;
01751 priv->node = contextBlock->myNode;
01752 priv->stag = nodes[nodeNumber].recv_buf->stag;
01753 priv->to = (cc_uint64_t)nodes[nodeNumber].recv_buf;
01754 priv->ack_to = (cc_uint64_t)nodes[nodeNumber].directAck;
01755
01756 MACHSTATE1(1, " node = %d", priv->node);
01757 MACHSTATE1(1, " stag = %d", priv->stag);
01758 MACHSTATE1(1, " to = %p", priv->to);
01759
01760 { int j;
01761 MACHSTATE2(3, " buf = %p, priv = %p", buf, priv);
01762 for (j = 0; j < 16; j++) {
01763 MACHSTATE2(3, " ((char*)priv)[%d] = %02X", j, ((char*)priv)[j]);
01764 }
01765 }
01766
01767 MACHSTATE(1, "AsynchronousEventHandler() - Ammasso - INFO: Accepting Connection...");
01768
01769 rtn = cc_cr_accept(contextBlock->rnic, connReqEP, nodes[nodeNumber].qp, sizeof(AmmassoPrivateData), (cc_uint8_t*)priv);
01770 if (rtn != CC_OK) {
01771
01772
01773 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - WARNING: Unable to accept connection from node %d", nodeNumber);
01774
01775 } else {
01776
01777 MACHSTATE1(3, "AsynchronousEventHandler() - Ammasso - INFO: Accepted Connection from node %d", nodeNumber);
01778
01779 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
01780 #if CMK_SHARED_VARS_UNAVAILABLE
01781 while (nodes[nodeNumber].sendBufLock != 0) { usleep(1); }
01782 #endif
01783 CmiLock(nodes[nodeNumber].sendBufLock);
01784
01785
01786 if (nodes[nodeNumber].connectionState == QP_CONN_STATE_PRE_CONNECT)
01787 (contextBlock->outstandingConnectionCount)--;
01788 nodes[nodeNumber].connectionState = QP_CONN_STATE_CONNECTED;
01789
01790 CmiUnlock(nodes[nodeNumber].sendBufLock);
01791 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
01792
01793 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", nodes[nodeNumber].myNode);
01794 }
01795 }
01796
01797 break;
01798
01799
01800 case CCAE_ACTIVE_CONNECT_RESULTS:
01801 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Connection Results");
01802
01803
01804 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: er->resource_indicator = %d (CC_RES_IND_QP: %d)", er->resource_indicator, CC_RES_IND_QP);
01805 node = getNodeFromQPId(er->resource_id.qp_id);
01806 if (node == NULL) {
01807 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to create/recover connection");
01808 break;
01809 }
01810
01811
01812 if (er->event_data.active_connect_results.status != CC_CONN_STATUS_SUCCESS) {
01813
01814 MACHSTATE(5, " Connection Failed.");
01815 MACHSTATE1(5, " - status: \"%s\"", cc_connect_status_to_string(er->event_data.active_connect_results.status));
01816 MACHSTATE1(5, " - private_data_length = %d", er->event_data.active_connect_results.private_data_length);
01817 displayQueueQuery(node->qp, &(node->qp_attrs));
01818
01819
01820 reestablishQPConnection(node);
01821
01822 } else {
01823
01824 MACHSTATE(3, " Connection Success...");
01825 MACHSTATE2(1, " l -> %s:%d", inet_ntoa(*(struct in_addr*) &(er->event_data.active_connect_results.laddr)), ntohs(er->event_data.active_connect_results.lport));
01826 MACHSTATE2(1, " r -> %s:%d", inet_ntoa(*(struct in_addr*) &(er->event_data.active_connect_results.raddr)), ntohs(er->event_data.active_connect_results.rport));
01827 MACHSTATE4(1, " private_data_length = %d (%d, %d, %d)", er->event_data.active_connect_results.private_data_length, sizeof(int), sizeof(cc_stag_t), sizeof(cc_uint64_t));
01828
01829 priv = (AmmassoPrivateData*)((char*)er->event_data.active_connect_results.private_data);
01830
01831 { int j;
01832 MACHSTATE2(1, " private_data = %p, priv = %p", er->event_data.active_connect_results.private_data, priv);
01833 for (j = 0; j < 16; j++) {
01834 MACHSTATE3(1, " private_data[%d] = %02X (priv:%02X)", j, ((char*)(er->event_data.active_connect_results.private_data))[j], ((char*)priv)[j]);
01835 }
01836 }
01837
01838
01839
01840 for (i=0; i<(AMMASSO_INITIAL_BUFFERS/(contextBlock->numNodes-1)); ++i) {
01841 LIST_DEQUEUE(contextBlock->,freeTokens,token);
01842 token->wr.wr_u.rdma_write.remote_stag = priv->stag;
01843 token->wr.wr_u.rdma_write.remote_to = priv->to + (i * sizeof(AmmassoBuffer));
01844 token->remoteBuf = (AmmassoBuffer*)(priv->to + (i * sizeof(AmmassoBuffer)));
01845 LIST_ENQUEUE(node->,sendTokens,token);
01846 }
01847
01848 node->ack_sq_wr->wr_u.rdma_write.remote_to = priv->ack_to;
01849 node->ack_sq_wr->wr_u.rdma_write.remote_stag = priv->stag;
01850
01851 MACHSTATE1(3, " node = %d", priv->node);
01852 MACHSTATE1(3, " stag = %d", priv->stag);
01853 MACHSTATE1(3, " to = %p", priv->to);
01854
01855 MACHSTATE2(1, "AsynchronousEventHandler() - INFO: tokens from %p, stag = %d",priv->to,priv->stag);
01856
01857 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
01858 #if CMK_SHARED_VARS_UNAVAILABLE
01859 while (node->sendBufLock != 0) { usleep(1); }
01860 #endif
01861 CmiLock(node->sendBufLock);
01862
01863
01864 if (node->connectionState == QP_CONN_STATE_PRE_CONNECT)
01865 (contextBlock->outstandingConnectionCount)--;
01866 node->connectionState = QP_CONN_STATE_CONNECTED;
01867
01868 CmiUnlock(node->sendBufLock);
01869 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
01870
01871 MACHSTATE1(1, "AsynchronousEventHandler() - Connected to node %d", node->myNode);
01872 }
01873
01874 break;
01875
01876
01877
01878 case CCAE_TERMINATE_MESSAGE_RECEIVED:
01879
01880 case CCAE_LLP_SEGMENT_SIZE_INVALID:
01881 case CCAE_LLP_INVALID_CRC:
01882 case CCAE_LLP_BAD_FPDU:
01883
01884 case CCAE_INVALID_DDP_VERSION:
01885 case CCAE_INVALID_RDMA_VERSION:
01886 case CCAE_UNEXPECTED_OPCODE:
01887 case CCAE_INVALID_DDP_QUEUE_NUMBER:
01888 case CCAE_RDMA_READ_NOT_ENABLED:
01889 case CCAE_NO_L_BIT:
01890
01891 case CCAE_TAGGED_INVALID_STAG:
01892 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION:
01893 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION:
01894 case CCAE_TAGGED_INVALID_PD:
01895 case CCAE_WRAP_ERROR:
01896
01897 case CCAE_BAD_LLP_CLOSE:
01898
01899 case CCAE_INVALID_MSN_RANGE:
01900 case CCAE_INVALID_MSN_GAP:
01901
01902 case CCAE_IRRQ_INVALID_STAG:
01903 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION:
01904 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION:
01905 case CCAE_IRRQ_INVALID_PD:
01906 case CCAE_IRRQ_WRAP_ERROR:
01907 case CCAE_IRRQ_OVERFLOW:
01908 case CCAE_IRRQ_MSN_GAP:
01909 case CCAE_IRRQ_MSN_RANGE:
01910
01911 case CCAE_CQ_SQ_COMPLETION_OVERFLOW:
01912 case CCAE_CQ_RQ_COMPLETION_ERROR:
01913 case CCAE_QP_SRQ_WQE_ERROR:
01914
01915
01916
01917 case CCAE_LLP_CONNECTION_LOST:
01918 case CCAE_LLP_CONNECTION_RESET:
01919
01920 case CCAE_BAD_CLOSE:
01921
01922 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR:
01923 MACHSTATE3(5, "AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)", cc_event_id_to_string(er->event_id), er->resource_indicator, CC_RES_IND_QP);
01924 CmiPrintf("AsynchronousEventHandler() - WARNING: Connection Error \"%s\" - er->resource_indicator = %d (CC_RES_IND_QP: %d)\n", cc_event_id_to_string(er->event_id), er->resource_indicator, CC_RES_IND_QP);
01925
01926
01927 node = getNodeFromQPId(er->resource_id.qp_id);
01928 if (node == NULL) {
01929 MACHSTATE(5, "AsynchronousEventHandler() - ERROR: Unable to find QP from QP ID... Unable to recover connection");
01930 break;
01931 }
01932
01933 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Pre-sendBufLock");
01934 #if CMK_SHARED_VARS_UNAVAILABLE
01935 while (node->sendBufLock != 0) { usleep(1); }
01936 #endif
01937 CmiLock(node->sendBufLock);
01938
01939
01940 node->connectionState = QP_CONN_STATE_CONNECTION_LOST;
01941
01942 CmiUnlock(node->sendBufLock);
01943 MACHSTATE(1, "AsynchronousEventHandler() - INFO: Post-sendBufLock");
01944
01945 MACHSTATE1(1, "AsynchronousEventHandler() - Connection ERROR Occured - node %d", node->myNode);
01946 displayQueueQuery(node->qp, &(node->qp_attrs));
01947
01948
01949 reestablishQPConnection(node);
01950
01951 break;
01952
01953
01954 default:
01955 MACHSTATE1(5, "AsynchronousEventHandler() - WARNING - Unknown/Unexpected Asynchronous Event: er->event_id = %d", er->event_id);
01956 break;
01957
01958 }
01959
01960 AMMASSO_STATS_END(AsynchronousEventHandler)
01961 }
01962
01963 void CheckRecvBufForMessage(OtherNode node) {
01964
01965 int needAck;
01966 unsigned int len;
01967 AmmassoBuffer *current;
01968
01969 MACHSTATE1(2, "CheckRecvBufForMessage() - INFO: Called... (node->recv_buf = %p)...", node->recv_buf);
01970
01971
01972 while ((len = node->recv_buf->tail.length) != 0) {
01973
01974 MACHSTATE1(2, " (len = %d)...", len);
01975
01976
01977 node->recv_buf->tail.length = 0;
01978 (*node->remoteAck) ++;
01979 node->messagesNotYetAcknowledged ++;
01980
01981 current = node->recv_buf;
01982
01983
01984 node->last_recv_buf->next = node->recv_buf;
01985 node->last_recv_buf = node->recv_buf;
01986 node->recv_buf = node->recv_buf->next;
01987 node->last_recv_buf->next = NULL;
01988
01989
01990 needAck = ProcessMessage(&(current->buf[AMMASSO_BUFSIZE - ALIGN8(len)]), len, ¤t->tail, node);
01991
01992
01993 if (needAck) sendAck(node);
01994 }
01995
01996 }
01997
01998
01999
02000
02001
02002
02003
02004
02005
02006
02007
02008
02009
02010
02011
02012
02013
02014
02015
02016
02018
02019
02020
02021
02022
02023
02024
02025
02026
02027
02028
02029
02030
02031
02032
02033
02034
02035
02036
02037
02038
02039
02040
02041
02042
02043
02044
02045
02046
02047
02048
02049
02050
02051
02052
02053
02054
02055
02056
02057
02058
02059
02060
02061
02062
02063
02064
02065
02066
02067
02068
02069
02070
02071
02072
02073
02074
02075
02076
02077
02078
02079
02080
02081
02082
02083
02084
02085
02086
02087
02088
02089
02090
02091
02092
02093
02094
02095
02096
02097
02098
02099
02100
02101
02102
02103
02104
02105
02106
02107
02108
02109
02110
02111
02112
02113
02114
02115
02116
02117
02118
02119
02120
02121
02122
02123
02124
02125
02126
02127
02128
02129
02130
02131
02132
02133
02134
02135
02136
02137
02138
02139
02140
02141
02142
02143
02144
02145
02146
02147
02148
02149 void CmiAmmassoOpenQueuePairs() {
02150
02151 char buf[128];
02152 int i, myNode, numNodes, keepWaiting;
02153 int buffersPerNode;
02154 cc_qp_create_attrs_t qpCreateAttrs;
02155 cc_status_t rtn;
02156 cc_inet_addr_t address;
02157 cc_inet_port_t port;
02158 AmmassoBuffer *sendBuffer;
02159 AmmassoBuffer *bufferScanner;
02160 AmmassoToken *newTokens, *tokenScanner;
02161 cc_data_addr_t *newSgls;
02162 cc_stag_index_t newStagIndex;
02163 ammasso_ack_t *ack_location;
02164
02165
02166 MACHSTATE1(2, "CmiAmmassoOpenQueuePairs() - INFO: Called... (Cmi_charmrun_pid = %d)", Cmi_charmrun_pid);
02167
02168
02169 if (Cmi_charmrun_pid == 0) return;
02170
02171 if (nodes == NULL) {
02172 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - WARNING: nodes = NULL");
02173 return;
02174 }
02175 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: nodes = %p (remove this line)", nodes);
02176
02177
02178
02179
02180
02181
02182 contextBlock->myNode = myNode = _Cmi_mynode;
02183 contextBlock->numNodes = numNodes = _Cmi_numnodes;
02184 contextBlock->outstandingConnectionCount = contextBlock->numNodes - 1;
02185 contextBlock->nodeReadyCount = contextBlock->numNodes - 1;
02186 contextBlock->conditionRegistered = 0;
02187
02188 MACHSTATE2(1, "CmiAmmassoOpenQueuePairs() - INFO: myNode = %d, numNodes = %d", myNode, numNodes);
02189
02190 CmiAssert(sizeof(AmmassoBuffer) == (sizeof(AmmassoBuffer)&(~63)));
02191
02192
02193 contextBlock->freeRecvBuffers = (AmmassoBuffer*) CmiAlloc(AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t));
02194
02195 ack_location = (ammasso_ack_t*)&(contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS]);
02196 if (contextBlock->freeRecvBuffers == NULL) {
02197
02198
02199 cc_rnic_close(contextBlock->rnic);
02200
02201
02202 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
02203 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for RECV buffers");
02204 CmiAbort(buf);
02205 }
02206
02207 contextBlock->pinnedMemory = AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t);
02208 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
02209 CC_ADDR_TYPE_VA_BASED,
02210 (cc_byte_t*)contextBlock->freeRecvBuffers,
02211 AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t),
02212 contextBlock->pd_id,
02213 0, 0,
02214 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
02215 &newStagIndex)
02216 );
02217
02218 for (i=0; i<AMMASSO_INITIAL_BUFFERS; ++i) {
02219 contextBlock->freeRecvBuffers[i].tail.length = 0;
02220 contextBlock->freeRecvBuffers[i].next = &(contextBlock->freeRecvBuffers[i+1]);
02221 contextBlock->freeRecvBuffers[i].stag = newStagIndex;
02222 }
02223 contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS-1].next = NULL;
02224 contextBlock->last_freeRecvBuffers = &contextBlock->freeRecvBuffers[AMMASSO_INITIAL_BUFFERS-1];
02225
02226 buffersPerNode = AMMASSO_INITIAL_BUFFERS / (contextBlock->numNodes-1);
02227
02228
02229
02230 bufferScanner = contextBlock->freeRecvBuffers;
02231 contextBlock->freeRecvBuffers = contextBlock->freeRecvBuffers[(contextBlock->numNodes-1)*buffersPerNode-1].next;
02232 contextBlock->num_freeRecvBuffers = AMMASSO_INITIAL_BUFFERS - (contextBlock->numNodes-1)*buffersPerNode;
02233 for (i=0; i<contextBlock->numNodes; ++i) {
02234 if (i == contextBlock->myNode) continue;
02235 nodes[i].num_recv_buf = buffersPerNode;
02236 nodes[i].recv_buf = bufferScanner;
02237 bufferScanner[buffersPerNode-1].next = NULL;
02238 nodes[i].last_recv_buf = &(bufferScanner[buffersPerNode-1]);
02239 nodes[i].pending = NULL;
02240 bufferScanner += buffersPerNode;
02241 nodes[i].directAck = ack_location;
02242 ack_location++;
02243 }
02244
02245
02246
02247 sendBuffer = (AmmassoBuffer*) CmiAlloc(AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t));
02248
02249 ack_location = (ammasso_ack_t*)&(sendBuffer[AMMASSO_INITIAL_BUFFERS]);
02250 if (sendBuffer == NULL) {
02251
02252
02253 cc_rnic_close(contextBlock->rnic);
02254
02255
02256 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02257 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02258 CmiAbort(buf);
02259 }
02260
02261 contextBlock->pinnedMemory += AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t);
02262 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
02263 CC_ADDR_TYPE_VA_BASED,
02264 (cc_byte_t*)sendBuffer,
02265 AMMASSO_INITIAL_BUFFERS*sizeof(AmmassoBuffer) + (contextBlock->numNodes-1)*sizeof(ammasso_ack_t),
02266 contextBlock->pd_id,
02267 0, 0,
02268 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
02269 &newStagIndex)
02270 );
02271
02272 contextBlock->freeTokens = NULL;
02273 contextBlock->num_freeTokens = 0;
02274
02275
02276 newTokens = (AmmassoToken*) CmiAlloc((AMMASSO_INITIAL_BUFFERS+contextBlock->numNodes-1)*ALIGN8(sizeof(AmmassoToken)));
02277
02278 if (newTokens == NULL) {
02279
02280
02281 cc_rnic_close(contextBlock->rnic);
02282
02283
02284 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02285 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02286 CmiAbort(buf);
02287 }
02288
02289 newSgls = (cc_data_addr_t*) CmiAlloc((AMMASSO_INITIAL_BUFFERS+contextBlock->numNodes-1)*ALIGN8(sizeof(cc_data_addr_t)));
02290
02291 if (newSgls == NULL) {
02292
02293
02294 cc_rnic_close(contextBlock->rnic);
02295
02296
02297 MACHSTATE(5, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02298 sprintf(buf, "CmiAmmassoOpenQueuePairs() - ERROR: Unable to allocate memory for SEND buffers");
02299 CmiAbort(buf);
02300 }
02301
02302 contextBlock->num_freeTokens = 0;
02303 contextBlock->last_freeTokens = NULL;
02304 contextBlock->freeTokens = NULL;
02305 tokenScanner = newTokens;
02306 for (i=0; i<AMMASSO_INITIAL_BUFFERS; ++i) {
02307 newSgls->stag = newStagIndex;
02308 newSgls->length = AMMASSO_BUFSIZE + sizeof(Tailer);
02309 newSgls->to = (unsigned long)&(sendBuffer[i]);
02310 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
02311 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
02312 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
02313 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = newSgls;
02314 tokenScanner->wr.signaled = 1;
02315 tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
02316 LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
02317 newSgls = (cc_data_addr_t*)(((char*)newSgls)+ALIGN8(sizeof(cc_data_addr_t)));
02318 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
02319 }
02320
02321
02322
02323
02324
02325 for (i=0; i<contextBlock->numNodes; ++i) {
02326 if (i == contextBlock->myNode) continue;
02327 newSgls->stag = newStagIndex;
02328 newSgls->length = sizeof(ammasso_ack_t);
02329 newSgls->to = (unsigned long)ack_location;
02330 nodes[i].remoteAck = ack_location;
02331 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
02332 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
02333 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
02334 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = newSgls;
02335 tokenScanner->wr.signaled = 1;
02336 nodes[i].ack_sq_wr = &tokenScanner->wr;
02337 newSgls = (cc_data_addr_t*)(((char*)newSgls)+ALIGN8(sizeof(cc_data_addr_t)));
02338 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
02339 ack_location++;
02340 }
02341
02342
02343
02344
02345
02346
02347
02348 for (i = 0; i < numNodes; i++) {
02349
02350
02351 nodes[i].myNode = i;
02352 nodes[i].connectionState = QP_CONN_STATE_PRE_CONNECT;
02353 nodes[i].messagesNotYetAcknowledged = 0;
02354 nodes[i].usedTokens = NULL;
02355 nodes[i].last_usedTokens = NULL;
02356 nodes[i].num_usedTokens = 0;
02357 nodes[i].localAck = 0;
02358 nodes[i].sendBufLock = CmiCreateLock();
02359 nodes[i].send_next_lock = CmiCreateLock();
02360 nodes[i].recv_expect_lock = CmiCreateLock();
02361 nodes[i].max_used_tokens = 0;
02362
02363
02364
02365
02366
02367 if (i == myNode) continue;
02368
02369 *nodes[i].remoteAck = 0;
02370
02371
02372 establishQPConnection(nodes + i, 0);
02373
02374 }
02375
02376
02377
02378 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all connections to be established...");
02379 while (contextBlock->outstandingConnectionCount > 0) {
02380
02381 usleep(1000);
02382
02383 for (i = 0; i < contextBlock->numNodes; i++) {
02384 if (i == contextBlock->myNode) continue;
02385
02386
02387 CheckRecvBufForMessage(&(nodes[i]));
02388 }
02389 }
02390 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All Connections have been established... Continuing");
02391
02392
02393
02394 sleep(1);
02395
02396 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending ready to all neighboors...");
02397
02398
02399 for (i = 0; i < numNodes; i++) {
02400 int tmp;
02401 char buf[24];
02402
02403 if (i == myNode) continue;
02404
02405 MACHSTATE1(1, "CmiAmmassoOpenQueuePairs() - INFO: Sending READY to node %d", i);
02406
02407
02408 sendDataOnQP(buf, 1, &(nodes[i]), AMMASSO_READY);
02409
02410
02411
02412
02413
02414
02415
02416
02417
02418
02419
02420
02421
02422
02423
02424
02425
02426
02427
02428
02429
02430
02431 }
02432
02433
02435
02436
02437
02438
02439
02440
02441
02442
02443
02444
02445
02446
02447
02448
02449
02450 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All ready packets sent to neighboors...");
02451
02452
02453
02454
02455
02456
02457 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Waiting for all neighboors to be ready...");
02458 while (contextBlock->nodeReadyCount > 0) {
02459 usleep(10000);
02460
02461 for (i = 0; i < contextBlock->numNodes; i++) {
02462 if (i == contextBlock->myNode) continue;
02463
02464
02465 CheckRecvBufForMessage(&(nodes[i]));
02466 }
02467 }
02468 MACHSTATE(1, "CmiAmmassoOpenQueuePairs() - INFO: All neighboors ready...");
02469
02470 MACHSTATE(2, "CmiAmmassoOpenQueuePairs() - INFO: Finished.");
02471 }
02472
02473
02474
02475
02476
02477
02478
02479 void establishQPConnection(OtherNode node, int reuseQPFlag) {
02480
02481 cc_qp_create_attrs_t qpCreateAttrs;
02482 cc_status_t rtn;
02483 int i;
02484 cc_uint32_t numWRsPosted;
02485
02486 MACHSTATE1(2, "establishQPConnection() - INFO: Called for node %d...", node->myNode);
02487
02489
02490 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-RECV-CQ-CREATE)");
02491
02492
02493 node->recv_cq_depth = 1;
02494 CC_CHECK(cc_cq_create,(contextBlock->rnic, &(node->recv_cq_depth), contextBlock->eh_id, node, &(node->recv_cq)));
02495
02496 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-SEND-CQ-CREATE)");
02497
02498
02499
02500 node->send_cq_depth = AMMASSO_BUFFERS_INFLY;
02501 CC_CHECK(cc_cq_create,(contextBlock->rnic, &(node->send_cq_depth), contextBlock->eh_id, node, &(node->send_cq)));
02502
02503 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-QP-CREATE)");
02504
02505
02506
02507 qpCreateAttrs.sq_cq = node->send_cq;
02508 qpCreateAttrs.rq_cq = node->recv_cq;
02509 qpCreateAttrs.sq_depth = node->send_cq_depth;
02510 qpCreateAttrs.rq_depth = node->recv_cq_depth;
02511 qpCreateAttrs.srq = 0;
02512 qpCreateAttrs.rdma_read_enabled = 1;
02513 qpCreateAttrs.rdma_write_enabled = 1;
02514 qpCreateAttrs.rdma_read_response_enabled = 1;
02515 qpCreateAttrs.mw_bind_enabled = 0;
02516 qpCreateAttrs.zero_stag_enabled = 0;
02517 qpCreateAttrs.send_sgl_depth = 1;
02518 qpCreateAttrs.recv_sgl_depth = 1;
02519 qpCreateAttrs.rdma_write_sgl_depth = 1;
02520 qpCreateAttrs.ord = 1;
02521 qpCreateAttrs.ird = 1;
02522 qpCreateAttrs.pdid = contextBlock->pd_id;
02523 qpCreateAttrs.user_context = node;
02524
02525 CC_CHECK(cc_qp_create,(contextBlock->rnic, &qpCreateAttrs, &(node->qp), &(node->qp_id)));
02526
02527
02528
02529
02530 node->sendBufLock = 0;
02531 node->send_next = 0;
02532 node->send_next_lock = CmiCreateLock();
02533 node->recv_expect = 0;
02534 node->recv_expect_lock = CmiCreateLock();
02535 node->recv_UseIndex = 0;
02536
02537 if (!reuseQPFlag) {
02538
02539 MACHSTATE(1, "establishQPConnection() - INFO: (PRE-NSMR-REGISTER-VIRT QP-QUERY-ATTRS)");
02540
02541
02542 contextBlock->pinnedMemory += sizeof(cc_qp_query_attrs_t);
02543 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
02544 CC_ADDR_TYPE_VA_BASED,
02545 (cc_byte_t*)(&(node->qp_attrs)),
02546 sizeof(cc_qp_query_attrs_t),
02547 contextBlock->pd_id,
02548 0, 0,
02549 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
02550 &(node->qp_attrs_stag_index))
02551 );
02552 }
02553
02555 if (node->myNode < contextBlock->myNode) {
02556
02557 int count = 64;
02558 char value[64];
02559 int j;
02560
02561 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Server\" Code...");
02562
02563
02564 CC_CHECK(cc_rnic_getconfig,(contextBlock->rnic, CC_GETCONFIG_ADDRS, &count, &value));
02565
02566
02567
02568
02569
02570
02571
02572
02573 *(((char*)&(node->address)) + 0) = value[0];
02574 *(((char*)&(node->address)) + 1) = value[1];
02575 *(((char*)&(node->address)) + 2) = value[2];
02576 *(((char*)&(node->address)) + 3) = value[3];
02577
02578
02579 node->port = htons(AMMASSO_PORT + node->myNode);
02580
02581 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
02582 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
02583 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node->port >> 8) & 0xFF), (node->port & 0xFF));
02584
02585
02586
02587
02588 CC_CHECK(cc_ep_listen_create,(contextBlock->rnic, node->address, &(node->port), 3, contextBlock, &(node->ep)));
02589
02590 MACHSTATE(1, "establishQPConnection() - Listening...");
02591 }
02592
02593
02595 if (node->myNode > contextBlock->myNode) {
02596 int first;
02597 AmmassoPrivateData priv;
02598
02599
02600 if (node->myNode == contextBlock->myNode + 1 || reuseQPFlag)
02601 usleep(400000);
02602
02603 MACHSTATE(1, "establishQPConnection() - INFO: Starting \"Client\" Code...");
02604
02605
02606
02607 *(((char*)&(node->address)) + 0) = *(((char*)&(node->addr.sin_addr.s_addr)) + 0);
02608 *(((char*)&(node->address)) + 1) = *(((char*)&(node->addr.sin_addr.s_addr)) + 1);
02609 *(((char*)&(node->address)) + 2) = *(((char*)&(node->addr.sin_addr.s_addr)) + 2);
02610 *(((char*)&(node->address)) + 3) = (*(((char*)&(node->addr.sin_addr.s_addr)) + 3)) - 1;
02611
02612
02613 node->port = htons(AMMASSO_PORT + contextBlock->myNode);
02614
02615 MACHSTATE4(1, "establishQPConnection() - Using Address (Hex) 0x%02X 0x%02X 0x%02X 0x%02X", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
02616 MACHSTATE4(1, " (Dec) %4d %4d %4d %4d", ((node->address >> 24) & 0xFF), ((node->address >> 16) & 0xFF), ((node->address >> 8) & 0xFF), (node->address & 0xFF));
02617 MACHSTATE2(1, " Port (Hex) 0x%02X 0x%02X", ((node->port >> 8) & 0xFF), (node->port & 0xFF));
02618
02619
02620
02621
02622
02623
02624 priv.node = contextBlock->myNode;
02625 priv.stag = node->recv_buf->stag;
02626 priv.to = (cc_uint64_t)node->recv_buf;
02627 priv.ack_to = (cc_uint64_t)node->directAck;
02628
02629 CC_CHECK(cc_qp_connect,(contextBlock->rnic, node->qp, node->address, node->port, sizeof(AmmassoPrivateData), (cc_uint8_t*)&priv));
02630
02631 }
02632 }
02633
02634
02635
02636 void reestablishQPConnection(OtherNode node) {
02637
02638 cc_status_t rtn;
02639 char buf[16];
02640 cc_qp_modify_attrs_t modAttrs;
02641 cc_wc_t wc;
02642
02643 MACHSTATE1(2, "reestablishQPConnection() - INFO: For node %d: Clearing Outstanding WRs...", node->myNode);
02644
02645
02646 while (1) {
02647
02648 rtn = cc_cq_poll(contextBlock->rnic, node->recv_cq, &wc);
02649 if (rtn == CCERR_CQ_EMPTY) break;
02650
02651
02652 }
02653
02654
02655 while (1) {
02656
02657 rtn = cc_cq_poll(contextBlock->rnic, node->send_cq, &wc);
02658 if (rtn == CCERR_CQ_EMPTY) break;
02659
02660
02661 }
02662
02663 MACHSTATE1(1, "reestablishQPConnection() - INFO: For node %d: Waiting for QP to enter ERROR state...", node->myNode);
02664
02665 do {
02666
02667
02668 rtn = cc_qp_query(contextBlock->rnic, node->qp, &(node->qp_attrs));
02669 if (rtn != CC_OK) {
02670 MACHSTATE2(5, "AsynchronousEventHandler() - ERROR: Unable to Query Queue Pair (l): %d, \"%s\"", rtn, cc_status_to_string(rtn));
02671 break;
02672 }
02673
02674
02675 if (node->qp_attrs.qp_state == CC_QP_STATE_ERROR)
02676 break;
02677 else
02678 usleep(1000);
02679
02680 } while (1);
02681
02682 MACHSTATE2(1, "reestablishQPConnection() - INFO: Finished waiting node %d: QP state = \"%s\"...", node->myNode, cc_qp_state_to_string(node->qp_attrs.qp_state));
02683 MACHSTATE1(1, "reestablishQPConnection() - INFO: Attempting to transition QP into IDLE state for node %d", node->myNode);
02684
02685
02686 modAttrs.llp_ep = node->ep;
02687 modAttrs.next_qp_state = CC_QP_STATE_IDLE;
02688 modAttrs.ord = CC_QP_NO_ATTR_CHANGE;
02689 modAttrs.ird = CC_QP_NO_ATTR_CHANGE;
02690 modAttrs.sq_depth = CC_QP_NO_ATTR_CHANGE;
02691 modAttrs.rq_depth = CC_QP_NO_ATTR_CHANGE;
02692 modAttrs.stream_message_buffer = NULL;
02693 modAttrs.stream_message_length = 0;
02694 rtn = cc_qp_modify(contextBlock->rnic, node->qp, &modAttrs);
02695 if (rtn != CC_OK) {
02696
02697 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Modify QP State: %d, \"%s\"", rtn, cc_status_to_string(rtn));
02698 }
02699
02700 rtn = cc_qp_query(contextBlock->rnic, node->qp, &(node->qp_attrs));
02701 if (rtn != CC_OK) {
02702 MACHSTATE2(5, "reestablishQPConnection() - ERROR: Unable to Query Queue Pair (1): %d, \"%s\"", rtn, cc_status_to_string(rtn));
02703 }
02704 MACHSTATE2(1, "reestablishQPConnection() - INFO: Transition results for node %d: QP state = \"%s\"...", node->myNode, cc_qp_state_to_string(node->qp_attrs.qp_state));
02705
02706 closeQPConnection(node, 0);
02707 establishQPConnection(node, 1);
02708 }
02709
02710
02711
02712
02713
02714
02715 void closeQPConnection(OtherNode node, int destroyQPFlag) {
02716
02717 MACHSTATE(2, "closeQPConnection() - INFO: Called...");
02718
02719
02720
02721
02722
02723
02724
02725
02726
02727
02728
02729
02730
02731
02732
02733
02734
02735
02736
02737
02738
02739
02740
02741
02742 }
02743
02744
02745 char* cc_status_to_string(cc_status_t errorCode) {
02746
02747 switch (errorCode) {
02748
02749 case CC_OK: return "OK";
02750 case CCERR_INSUFFICIENT_RESOURCES: return "Insufficient Resources";
02751 case CCERR_INVALID_MODIFIER: return "Invalid Modifier";
02752 case CCERR_INVALID_MODE: return "Invalid Mode";
02753 case CCERR_IN_USE: return "In Use";
02754 case CCERR_INVALID_RNIC: return "Invalid RNIC";
02755 case CCERR_INTERRUPTED_OPERATION: return "Interrupted Operation";
02756 case CCERR_INVALID_EH: return "Invalid EH";
02757 case CCERR_INVALID_CQ: return "Invalid CQ";
02758 case CCERR_CQ_EMPTY: return "CQ Empty";
02759 case CCERR_NOT_IMPLEMENTED: return "Not Implemented";
02760 case CCERR_CQ_DEPTH_TOO_SMALL: return "CQ Depth Too Small";
02761 case CCERR_PD_IN_USE: return "PD In Use";
02762 case CCERR_INVALID_PD: return "Invalid PD";
02763 case CCERR_INVALID_SRQ: return "Invalid SRQ";
02764 case CCERR_INVALID_ADDRESS: return "Invalid Address";
02765 case CCERR_INVALID_NETMASK: return "Invalid Netmask";
02766 case CCERR_INVALID_QP: return "Invalid QP";
02767 case CCERR_INVALID_QP_STATE: return "Invalid QP State";
02768 case CCERR_TOO_MANY_WRS_POSTED: return "Too Many WRs Posted";
02769 case CCERR_INVALID_WR_TYPE: return "Invalid WR Type";
02770 case CCERR_INVALID_SGL_LENGTH: return "Invalid SGL Length";
02771 case CCERR_INVALID_SQ_DEPTH: return "Invalid SQ Depth";
02772 case CCERR_INVALID_RQ_DEPTH: return "Invalid RQ Depth";
02773 case CCERR_INVALID_ORD: return "Invalid ORD";
02774 case CCERR_INVALID_IRD: return "Invalid IRD";
02775 case CCERR_QP_ATTR_CANNOT_CHANGE: return "QP_ATTR_CANNON_CHANGE";
02776 case CCERR_INVALID_STAG: return "Invalid STag";
02777 case CCERR_QP_IN_USE: return "QP In Use";
02778 case CCERR_OUTSTANDING_WRS: return "Outstanding WRs";
02779
02780 case CCERR_STAG_IN_USE: return "STag In Use";
02781 case CCERR_INVALID_STAG_INDEX: return "Invalid STag Index";
02782 case CCERR_INVALID_SGL_FORMAT: return "Invalid SGL Format";
02783 case CCERR_ADAPTER_TIMEOUT: return "Adapter Timeout";
02784 case CCERR_INVALID_CQ_DEPTH: return "Invalid CQ Depth";
02785 case CCERR_INVALID_PRIVATE_DATA_LENGTH: return "Invalid Private Data Length";
02786 case CCERR_INVALID_EP: return "Invalid EP";
02787 case CCERR_FLUSHED: return "Flushed";
02788 case CCERR_INVALID_WQE: return "Invalid WQE";
02789 case CCERR_LOCAL_QP_CATASTROPHIC_ERROR: return "Local QP Catastrophic Error";
02790 case CCERR_REMOTE_TERMINATION_ERROR: return "Remote Termination Error";
02791 case CCERR_BASE_AND_BOUNDS_VIOLATION: return "Base and Bounds Violation";
02792 case CCERR_ACCESS_VIOLATION: return "Access Violation";
02793 case CCERR_INVALID_PD_ID: return "Invalid PD ID";
02794 case CCERR_WRAP_ERROR: return "Wrap Error";
02795 case CCERR_INV_STAG_ACCESS_ERROR: return "Invalid STag Access Error";
02796 case CCERR_ZERO_RDMA_READ_RESOURCES: return "Zero RDMA Read Resources";
02797 case CCERR_QP_NOT_PRIVILEGED: return "QP Not Privileged";
02798 case CCERR_STAG_STATE_NOT_INVALID: return "STag State Not Invalid";
02799 case CCERR_INVALID_PAGE_SIZE: return "Invalid Page Size";
02800 case CCERR_INVALID_BUFFER_SIZE: return "Invalid Buffer Size";
02801 case CCERR_INVALID_PBE: return "Invalid PBE";
02802 case CCERR_INVALID_FBO: return "Invalid FBO";
02803 case CCERR_INVALID_LENGTH: return "Invalid Length";
02804 case CCERR_INVALID_ACCESS_RIGHTS: return "Invalid Access Rights";
02805 case CCERR_PBL_TOO_BIG: return "PBL Too Big";
02806 case CCERR_INVALID_VA: return "Invalid VA";
02807 case CCERR_INVALID_REGION: return "Invalid Region";
02808 case CCERR_INVALID_WINDOW: return "Invalid Window";
02809 case CCERR_TOTAL_LENGTH_TOO_BIG: return "Total Length Too Big";
02810 case CCERR_INVALID_QP_ID: return "Invalid QP ID";
02811 case CCERR_ADDR_IN_USE: return "Address In Use";
02812 case CCERR_ADDR_NOT_AVAIL: return "Address Not Available";
02813 case CCERR_NET_DOWN: return "Network Down";
02814 case CCERR_NET_UNREACHABLE: return "Network Unreachable";
02815 case CCERR_CONN_ABORTED: return "Connection Aborted";
02816 case CCERR_CONN_RESET: return "Connection Reset";
02817 case CCERR_NO_BUFS: return "No Buffers";
02818 case CCERR_CONN_TIMEDOUT: return "Connection Timed-Out";
02819 case CCERR_CONN_REFUSED: return "Connection Refused";
02820 case CCERR_HOST_UNREACHABLE: return "Host Unreachable";
02821 case CCERR_INVALID_SEND_SGL_DEPTH: return "Invalid Send SGL Depth";
02822 case CCERR_INVALID_RECV_SGL_DEPTH: return "Invalid Receive SGL Depth";
02823 case CCERR_INVALID_RDMA_WRITE_SGL_DEPTH: return "Ivalid RDMA Write SGL Depth";
02824 case CCERR_INSUFFICIENT_PRIVILEGES: return "Insufficient Privileges";
02825 case CCERR_STACK_ERROR: return "Stack Error";
02826 case CCERR_INVALID_VERSION: return "Invalid Version";
02827 case CCERR_INVALID_MTU: return "Invalid MTU";
02828 case CCERR_INVALID_IMAGE: return "Invalid Image";
02829 case CCERR_PENDING: return "(PENDING: Internal to Adapter... Hopefully you aren't reading this...)";
02830 case CCERR_DEFER: return "(DEFER: Internal to Adapter... Hopefully you aren't reading this...)";
02831 case CCERR_FAILED_WRITE: return "Failed Write";
02832 case CCERR_FAILED_ERASE: return "Failed Erase";
02833 case CCERR_FAILED_VERIFICATION: return "Failed Verification";
02834 case CCERR_NOT_FOUND: return "Not Found";
02835 default: return "Unknown Error Code";
02836 }
02837 }
02838
02839
02840
02841 char* cc_conn_error_to_string(cc_connect_status_t errorCode) {
02842
02843 switch (errorCode) {
02844 case CC_CONN_STATUS_SUCCESS: return "Success";
02845 case CC_CONN_STATUS_NO_MEM: return "No Memory";
02846 case CC_CONN_STATUS_TIMEDOUT: return "Timed-Out";
02847 case CC_CONN_STATUS_REFUSED: return "Refused";
02848 case CC_CONN_STATUS_NETUNREACH: return "Network Unreachable";
02849 case CC_CONN_STATUS_HOSTUNREACH: return "Host Unreachable";
02850 case CC_CONN_STATUS_INVALID_RNIC: return "Invalid RNIC";
02851 case CC_CONN_STATUS_INVALID_QP: return "Invalid QP";
02852 case CC_CONN_STATUS_INVALID_QP_STATE: return "Invalid QP State";
02853 case CC_CONN_STATUS_REJECTED: return "Rejected";
02854 default: return (cc_status_to_string((cc_status_t)errorCode));
02855 }
02856 }
02857
02858 void displayQueueQuery(cc_qp_handle_t qp, cc_qp_query_attrs_t *attrs) {
02859
02860
02861 cc_status_t rtn;
02862 char buf[1024];
02863
02864 OtherNode node = getNodeFromQPHandle(qp);
02865 if (node != NULL) {
02866 MACHSTATE1(2, "displayQueueQuery() - Called for node %d", node->myNode);
02867 } else {
02868 MACHSTATE(2, "displayQueueQuery() - Called for unknown node");
02869 }
02870
02871
02872 rtn = cc_qp_query(contextBlock->rnic, qp, attrs);
02873 if (rtn != CC_OK) {
02874
02875 MACHSTATE2(5, "displayQueueQuery() - ERROR: Unable to query queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
02876 return;
02877 }
02878
02879
02880
02881 MACHSTATE2(1, "displayQueueQuery() - qp_state = %d, \"%s\"", attrs->qp_state, cc_qp_state_to_string(attrs->qp_state));
02882 if (attrs->terminate_message_length > 0) {
02883 memcpy(buf, attrs->terminate_message, attrs->terminate_message_length);
02884 buf[attrs->terminate_message_length] = '\0';
02885 MACHSTATE1(1, "displayQueueQuery() - terminate_message = \"%s\"", buf);
02886 } else {
02887 MACHSTATE(1, "displayQueueQuery() - terminate_message = NULL");
02888 }
02889 }
02890
02891 char* cc_qp_state_to_string(cc_qp_state_t qpState) {
02892
02893 switch (qpState) {
02894 case CC_QP_STATE_IDLE: return "IDLE";
02895 case CC_QP_STATE_CONNECTING: return "CONNECTED";
02896 case CC_QP_STATE_RTS: return "RTS";
02897 case CC_QP_STATE_CLOSING: return "CLOSING";
02898 case CC_QP_STATE_TERMINATE: return "TERMINATE";
02899 case CC_QP_STATE_ERROR: return "ERROR";
02900 default: return "unknown";
02901 }
02902 }
02903
02904 char* cc_event_id_to_string(cc_event_id_t id) {
02905
02906 switch(id) {
02907 case CCAE_REMOTE_SHUTDOWN: return "Remote Shutdown";
02908 case CCAE_ACTIVE_CONNECT_RESULTS: return "Active Connect Results";
02909 case CCAE_CONNECTION_REQUEST: return "Connection Request";
02910 case CCAE_LLP_CLOSE_COMPLETE: return "LLP Close Complete";
02911 case CCAE_TERMINATE_MESSAGE_RECEIVED: return "Terminate Message Received";
02912 case CCAE_LLP_CONNECTION_RESET: return "LLP Connection Reset";
02913 case CCAE_LLP_CONNECTION_LOST: return "LLP Connection Lost";
02914 case CCAE_LLP_SEGMENT_SIZE_INVALID: return "Segment Size Invalid";
02915 case CCAE_LLP_INVALID_CRC: return "LLP Invalid CRC";
02916 case CCAE_LLP_BAD_FPDU: return "LLP Bad FPDU";
02917 case CCAE_INVALID_DDP_VERSION: return "Invalid DDP Version";
02918 case CCAE_INVALID_RDMA_VERSION: return "Invalid RMDA Version";
02919 case CCAE_UNEXPECTED_OPCODE: return "Unexpected Opcode";
02920 case CCAE_INVALID_DDP_QUEUE_NUMBER: return "Invalid DDP Queue Number";
02921 case CCAE_RDMA_READ_NOT_ENABLED: return "RDMA Read Not Enabled";
02922 case CCAE_RDMA_WRITE_NOT_ENABLED: return "RDMA Write Not Enabled";
02923 case CCAE_RDMA_READ_TOO_SMALL: return "RDMA Read Too Small";
02924 case CCAE_NO_L_BIT: return "No L Bit";
02925 case CCAE_TAGGED_INVALID_STAG: return "Tagged Invalid STag";
02926 case CCAE_TAGGED_BASE_BOUNDS_VIOLATION: return "Tagged Base Bounds Violation";
02927 case CCAE_TAGGED_ACCESS_RIGHTS_VIOLATION: return "Tagged Access Rights Violation";
02928 case CCAE_TAGGED_INVALID_PD: return "Tagged Invalid PD";
02929 case CCAE_WRAP_ERROR: return "Wrap Error";
02930 case CCAE_BAD_CLOSE: return "Bad Close";
02931 case CCAE_BAD_LLP_CLOSE: return "Bad LLP Close";
02932 case CCAE_INVALID_MSN_RANGE: return "Invalid MSN Range";
02933 case CCAE_INVALID_MSN_GAP: return "Invalid MSN Gap";
02934 case CCAE_IRRQ_OVERFLOW: return "IRRQ Overflow";
02935 case CCAE_IRRQ_MSN_GAP: return "IRRQ MSG Gap";
02936 case CCAE_IRRQ_MSN_RANGE: return "IRRQ MSN Range";
02937 case CCAE_IRRQ_INVALID_STAG: return "IRRQ Invalid STag";
02938 case CCAE_IRRQ_BASE_BOUNDS_VIOLATION: return "IRRQ Base Bounds Violation";
02939 case CCAE_IRRQ_ACCESS_RIGHTS_VIOLATION: return "IRRQ Access Rights Violation";
02940 case CCAE_IRRQ_INVALID_PD: return "IRRQ Invalid PD";
02941 case CCAE_IRRQ_WRAP_ERROR: return "IRRQ Wrap Error";
02942 case CCAE_CQ_SQ_COMPLETION_OVERFLOW: return "CQ SQ Completion Overflow";
02943 case CCAE_CQ_RQ_COMPLETION_ERROR: return "CQ RQ Completion Overflow";
02944 case CCAE_QP_SRQ_WQE_ERROR: return "QP SRQ WQE Error";
02945 case CCAE_QP_LOCAL_CATASTROPHIC_ERROR: return "QP Local Catastrophic Error";
02946 case CCAE_CQ_OVERFLOW: return "CQ Overflow";
02947 case CCAE_CQ_OPERATION_ERROR: return "CQ Operation Error";
02948 case CCAE_SRQ_LIMIT_REACHED: return "SRQ Limit Reached";
02949 case CCAE_QP_RQ_LIMIT_REACHED: return "QP RQ Limit Reached";
02950 case CCAE_SRQ_CATASTROPHIC_ERROR: return "SRQ Catastrophic Error";
02951 case CCAE_RNIC_CATASTROPHIC_ERROR: return "RNIC Catastrophic Error";
02952 default: return "Unknown Event ID";
02953 }
02954 }
02955
02956 char* cc_connect_status_to_string(cc_connect_status_t status) {
02957
02958 switch (status) {
02959 case CC_CONN_STATUS_SUCCESS: return "Success";
02960 case CC_CONN_STATUS_TIMEDOUT: return "Timedout";
02961 case CC_CONN_STATUS_REFUSED: return "Refused";
02962 case CC_CONN_STATUS_NETUNREACH: return "Network Unreachable";
02963 default: return "Unknown";
02964 }
02965 }