00001
00025 #ifndef ALIGN8
00026 #define ALIGN8(x) (int)((~7)&((x)+7))
00027 #endif
00028
00029
00030 #define MAX_PINNED_MEMORY 100000000
00031
00032
00033 #define WASTE_TIME 600
00034
00035 #define CC_POST_CHECK(routine,args,nodeTo) {\
00036 int retry=10000000; \
00037 while (ammasso_check_post_err(routine args, #routine, __LINE__, &nWR, nodeTo, retry) == 1) { \
00038 int i; \
00039 retry += WASTE_TIME; \
00040 for (i=0; i<=WASTE_TIME; ++i) { retry --; } \
00041 } \
00042 }
00043
00044 #define CC_CHECK(routine,args) \
00045 ammasso_check_err(routine args, #routine, __LINE__);
00046
00047 static void ammasso_check_err(cc_status_t returnCode,const char *routine,int line) {
00048 if (returnCode!=CC_OK) {
00049 char buf[128];
00050 char *errMsg = cc_status_to_string(returnCode);
00051
00052
00053 cc_rnic_close(contextBlock->rnic);
00054
00055
00056 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00057 MACHSTATE2(5," Description: %d, %s\n",returnCode,errMsg);
00058 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00059 " Description: %d, %s\n", routine, __FILE__, line,returnCode,errMsg);
00060 CmiAbort(buf);
00061 }
00062 }
00063
00064
00065
00066 static int ammasso_check_post_err(cc_status_t returnCode,const char *routine,int line, int *nWR, int nodeTo, int retry) {
00067 if (returnCode == CCERR_TOO_MANY_WRS_POSTED && *nWR != 1 && retry>0) {
00068 cc_wc_t wc;
00069
00070 while (cc_cq_poll(contextBlock->rnic, nodes[nodeTo].send_cq, &wc) == CC_OK) {
00071 MACHSTATE1(5, "Error posting send request - INFO: Send completed with node %d... now waiting for acknowledge...", nodeTo);
00072 }
00073 MACHSTATE(5, "Error posting send request - Retrying...");
00074 return 1;
00075 }
00076
00077 if (returnCode != CC_OK || *nWR != 1) {
00078 char buf[128];
00079 char *errMsg = cc_status_to_string(returnCode);
00080
00081
00082 cc_rnic_close(contextBlock->rnic);
00083
00084
00085 MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00086 MACHSTATE3(5," Description: %d, %s (nWR = %d)\n",returnCode,errMsg,nWR);
00087 sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00088 " Description: %d, %s (nWR = %d)\n", routine, __FILE__, line,returnCode,errMsg,*nWR);
00089 CmiAbort(buf);
00090 }
00091 return 0;
00092 }
00093
00094
00096
00097
00098 void CmiMachineInit(char **argv);
00099
00100 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00101
00102 void AmmassoDoIdle();
00103 void CmiNotifyIdle(void);
00104 static CmiIdleState* CmiNotifyGetState(void);
00105 static void CmiNotifyBeginIdle(CmiIdleState *s);
00106 static void CmiNotifyStillIdle(CmiIdleState *s);
00107
00108 void sendAck(OtherNode node);
00109 AmmassoToken *getQPSendToken(OtherNode node);
00110 int sendDataOnQP(char* data, int len, OtherNode node, char flags);
00111 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy);
00112 static void CommunicationServer(int withDelayMs, int where);
00113 void CmiMachineExit();
00114
00115 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *eventRecord, void *cb);
00116 void CheckRecvBufForMessage(OtherNode node);
00117
00118
00119
00120 void CmiAmmassoOpenQueuePairs();
00121
00122 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from);
00123 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from);
00124
00125 OtherNode getNodeFromQPId(cc_qp_id_t qp_id);
00126 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp);
00127
00128 void establishQPConnection(OtherNode node, int reuseQPFlag);
00129 void reestablishQPConnection(OtherNode node);
00130 void closeQPConnection(OtherNode node, int destroyQPFlag);
00131
00132 void BufferAlloc(int n);
00133 void TokenAlloc(int n);
00134 void RequestTokens(OtherNode node, int n);
00135 void GrantTokens(OtherNode node, int n);
00136 void RequestReleaseTokens(OtherNode node, int n);
00137 void ReleaseTokens(OtherNode node, int n);
00138
00140
00141
00142
00143 void AllocatorCheck () {
00144 int i, limit;
00145 char buf[24];
00146 for (i=0; i<contextBlock->numNodes; ++i) {
00147 if (i==contextBlock->myNode) continue;
00148 limit = nodes[i].num_sendTokens - nodes[i].max_used_tokens - 10;
00149 CmiPrintf("[%d] AllocatorCheck called: node %d, limit %d\n",CmiMyPe(),i,limit);
00150 if (limit > 0) {
00151 ReleaseTokens(&nodes[i], limit);
00152 CmiPrintf("[%d] Releasing %d tokens to %d\n", CmiMyPe(), limit, i);
00153 nodes[i].max_used_tokens = 0;
00154 }
00155 }
00156 }
00157
00158
00159 void BufferAlloc(int n) {
00160 int i;
00161 char buf[128];
00162 cc_stag_index_t newStagIndex;
00163 AmmassoBuffer *newBuffers;
00164
00165 MACHSTATE1(3, "Allocating %d new Receive Buffers",n);
00166
00167
00168 newBuffers = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00169
00170 if (newBuffers == NULL) {
00171
00172
00173 cc_rnic_close(contextBlock->rnic);
00174
00175
00176 MACHSTATE(5, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00177 sprintf(buf, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00178 CmiAbort(buf);
00179 }
00180
00181 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00182 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00183 CC_ADDR_TYPE_VA_BASED,
00184 (cc_byte_t*)newBuffers,
00185 n*sizeof(AmmassoBuffer),
00186 contextBlock->pd_id,
00187 0, 0,
00188 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
00189 &newStagIndex)
00190 );
00191
00192 for (i=0; i<n; ++i) {
00193 newBuffers[i].tail.length = 0;
00194 newBuffers[i].next = &(newBuffers[i+1]);
00195 newBuffers[i].stag = newStagIndex;
00196 }
00197 newBuffers[n-1].next = NULL;
00198 if (contextBlock->freeRecvBuffers == NULL) {
00199 contextBlock->freeRecvBuffers = newBuffers;
00200 } else {
00201 contextBlock->last_freeRecvBuffers->next = newBuffers;
00202 }
00203 contextBlock->last_freeRecvBuffers = &newBuffers[n-1];
00204 contextBlock->num_freeRecvBuffers += n;
00205 }
00206
00207 void TokenAlloc(int n) {
00208 int i;
00209 char buf[128];
00210 cc_stag_index_t newStagIndex;
00211 AmmassoToken *sendToken, *tokenScanner;
00212 cc_data_addr_t *sendSgl;
00213 AmmassoBuffer *sendBuffer;
00214
00215 MACHSTATE1(3, "Allocating %d new Tokens",n);
00216
00217
00218 sendBuffer = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00219
00220 if (sendBuffer == NULL) {
00221
00222
00223 cc_rnic_close(contextBlock->rnic);
00224
00225
00226 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00227 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00228 CmiAbort(buf);
00229 }
00230
00231 contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00232 CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00233 CC_ADDR_TYPE_VA_BASED,
00234 (cc_byte_t*)sendBuffer,
00235 n*sizeof(AmmassoBuffer),
00236 contextBlock->pd_id,
00237 0, 0,
00238 CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
00239 &newStagIndex)
00240 );
00241
00242
00243 sendToken = (AmmassoToken*) CmiAlloc(n*ALIGN8(sizeof(AmmassoToken)));
00244
00245 if (sendToken == NULL) {
00246
00247
00248 cc_rnic_close(contextBlock->rnic);
00249
00250
00251 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00252 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00253 CmiAbort(buf);
00254 }
00255
00256 sendSgl = (cc_data_addr_t*) CmiAlloc(n*ALIGN8(sizeof(cc_data_addr_t)));
00257
00258 if (sendSgl == NULL) {
00259
00260
00261 cc_rnic_close(contextBlock->rnic);
00262
00263
00264 MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00265 sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00266 CmiAbort(buf);
00267 }
00268
00269 tokenScanner = sendToken;
00270 for (i=0; i<n; ++i) {
00271 sendSgl->stag = newStagIndex;
00272 sendSgl->length = AMMASSO_BUFSIZE + sizeof(Tailer);
00273 sendSgl->to = (unsigned long)&(sendBuffer[i]);
00274 tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
00275 tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
00276 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
00277 tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = sendSgl;
00278 tokenScanner->wr.signaled = 1;
00279 tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
00280 LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
00281 sendSgl = (cc_data_addr_t*)(((char*)sendSgl)+ALIGN8(sizeof(cc_data_addr_t)));
00282 tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
00283 }
00284 }
00285
00286 void RequestTokens(OtherNode node, int n) {
00287 char buf[24];
00288 *((int*)buf) = n;
00289 sendDataOnQP(buf, sizeof(int), node, AMMASSO_MOREBUFFERS);
00290 }
00291
00292 void GrantTokens(OtherNode node, int n) {
00293 int i;
00294 char *buf;
00295 AmmassoBuffer *buffer;
00296 AmmassoBuffer *prebuffer;
00297 AmmassoTokenDescription *tokenDesc;
00298 if (node->pending != NULL) return;
00299 if (n*sizeof(AmmassoTokenDescription) + sizeof(int) > AMMASSO_BUFSIZE) {
00300 n = (AMMASSO_BUFSIZE-sizeof(int)) / sizeof(AmmassoTokenDescription);
00301 }
00302 if (contextBlock->num_freeRecvBuffers < n) {
00303 int quantity = (n - contextBlock->num_freeRecvBuffers + 1023) & (~1023);
00304 BufferAlloc(quantity);
00305 }
00306 buf = (char*) CmiAlloc(n*sizeof(AmmassoTokenDescription) + sizeof(int));
00307 *((int*)buf) = n;
00308 tokenDesc = (AmmassoTokenDescription*)(buf+sizeof(int));
00309 buffer = contextBlock->freeRecvBuffers;
00310 for (i=0; i<n; ++i) {
00311 tokenDesc[i].stag = buffer->stag;
00312 tokenDesc[i].to = (unsigned long)buffer;
00313 prebuffer = buffer;
00314 buffer = buffer->next;
00315 }
00316 node->pending = contextBlock->freeRecvBuffers;
00317 node->last_pending = prebuffer;
00318 node->num_pending = n;
00319 prebuffer->next = NULL;
00320 contextBlock->num_freeRecvBuffers -= n;
00321 contextBlock->freeRecvBuffers = buffer;
00322 sendDataOnQP(buf, n*sizeof(AmmassoTokenDescription) + sizeof(int), node, AMMASSO_ALLOCATE);
00323 CmiFree(buf);
00324 }
00325
00326 void RequestReleaseTokens(OtherNode node, int n) {
00327 char buf[24];
00328 *((int*)buf) = n;
00329 sendDataOnQP(buf, sizeof(int), node, AMMASSO_RELEASE);
00330 }
00331
00332 void ReleaseTokens(OtherNode node, int n) {
00333 int i, nWR;
00334 AmmassoToken *token;
00335 AmmassoBuffer *tokenBuf;
00336 cc_data_addr_t *tokenSgl;
00337
00338 if (node->num_sendTokens < n) n = node->num_sendTokens - 1;
00339 if (n <= 0) return;
00340 token = node->sendTokens;
00341 tokenBuf = token->localBuf;
00342
00343 tokenBuf->tail.length = 1;
00344 tokenBuf->tail.ack = 0;
00345 tokenBuf->tail.flags = AMMASSO_RELEASED;
00346
00347
00348 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
00349 tokenSgl->length = sizeof(Tailer);
00350 tokenSgl->to = (unsigned long)&tokenBuf->tail;
00351 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00352
00353 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00354
00355 if (contextBlock->freeTokens == NULL) {
00356 contextBlock->freeTokens = node->sendTokens;
00357 } else {
00358 contextBlock->last_freeTokens->next = node->sendTokens;
00359 }
00360 for (i=1; i<n; ++i) token = token->next;
00361 contextBlock->last_freeTokens = token;
00362 node->sendTokens = token->next;
00363 token->next = NULL;
00364 contextBlock->num_freeTokens += n;
00365 node->num_sendTokens -= n;
00366 }
00367
00368
00369
00370
00371 void CmiMachineInit(char **argv) {
00372
00373 char buf[128];
00374 cc_status_t rtn;
00375
00376 AMMASSO_STATS_INIT
00377
00378 AMMASSO_STATS_START(MachineInit)
00379
00380 MACHSTATE(2, "CmiMachineInit() - INFO: (***** Ammasso Specific*****) - Called... Initializing RNIC...");
00381 MACHSTATE1(1, "CmiMachineInit() - INFO: Cmi_charmrun_pid = %d", Cmi_charmrun_pid);
00382
00383
00384
00385
00386
00387
00388 if (contextBlock != NULL) {
00389 MACHSTATE(5, "CmiMachineInit() - ERROR: contextBlock != NULL");
00390 sprintf(buf, "CmiMachineInit() - ERROR: contextBlock != NULL");
00391 CmiAbort(buf);
00392 }
00393 contextBlock = (mycb_t*)malloc(sizeof(mycb_t));
00394 if (contextBlock == NULL) {
00395 MACHSTATE(5, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00396 sprintf(buf, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00397 CmiAbort(buf);
00398 }
00399
00400
00401 memset(contextBlock, 0, sizeof(mycb_t));
00402 contextBlock->rnic = -1;
00403
00404 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-OPEN_RNIC)");
00405
00406
00407 if (Cmi_charmrun_pid != 0) {
00408
00409
00410
00411
00412 rtn = cc_rnic_open(0, CC_PBL_PAGE_MODE, contextBlock, &(contextBlock->rnic));
00413 if (rtn != CC_OK) {
00414 MACHSTATE2(5, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00415 sprintf(buf, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00416 CmiAbort(buf);
00417 }
00418
00419 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-SET-ASYNC-HANDLER)");
00420
00421
00422 CC_CHECK(cc_eh_set_async_handler,(contextBlock->rnic, AsynchronousEventHandler, contextBlock));
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-PD-ALLOC)");
00433
00434
00435 CC_CHECK(cc_pd_alloc,(contextBlock->rnic, &(contextBlock->pd_id)));
00436
00437 MACHSTATE(1, "CmiMachineInit() - INFO: RNIC Open For Business!!!");
00438
00439 } else {
00440
00441
00442 contextBlock->rnic = -1;
00443 }
00444
00445 MACHSTATE(2, "CmiMachineInit() - INFO: Completed Successfully !!!");
00446
00447 AMMASSO_STATS_END(MachineInit)
00448 }
00449
00450 void CmiCommunicationInit(char **argv)
00451 {
00452 }
00453
00454 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port) {
00455
00456
00457
00458
00459
00460
00461 MACHSTATE1(2, "CmiNodeAddressesStoreHandler() - INFO: pe = %d", pe);
00462 MACHSTATE1(1, " addr = { sin_family = %d,", addr->sin_family);
00463 MACHSTATE1(1, " sin_port = %d,", addr->sin_port);
00464 MACHSTATE4(1, " sin_addr.s_addr = %d.%d.%d.%d }", (addr->sin_addr.s_addr & 0xFF), ((addr->sin_addr.s_addr >> 8) & 0xFF), ((addr->sin_addr.s_addr >> 16) & 0xFF), ((addr->sin_addr.s_addr >> 24) & 0xFF));
00465 MACHSTATE1(1, " port = %d", port);
00466 }
00467
00468
00469 void AmmassoDoIdle() {
00470
00471 int i;
00472 cc_wc_t wc;
00473
00474 AMMASSO_STATS_START(AmmassoDoIdle)
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 for (i = 0; i < contextBlock->numNodes; i++) {
00485 if (i == contextBlock->myNode) continue;
00486 CheckRecvBufForMessage(&(nodes[i]));
00487 while (cc_cq_poll(contextBlock->rnic, nodes[i].send_cq, &wc) == CC_OK) {
00488 MACHSTATE1(3, "AmmassoDoIdle() - INFO: Send completed with node %d... now waiting for acknowledge...", i);
00489 }
00490 }
00491
00492 AMMASSO_STATS_END(AmmassoDoIdle)
00493 }
00494
00495 void CmiNotifyIdle(void) {
00496 AmmassoDoIdle();
00497 }
00498
00499 static CmiIdleState* CmiNotifyGetState(void) {
00500 return NULL;
00501 }
00502
00503 static void CmiNotifyBeginIdle(CmiIdleState *s) {
00504 AmmassoDoIdle();
00505 }
00506
00507 static void CmiNotifyStillIdle(CmiIdleState *s) {
00508 AmmassoDoIdle();
00509 }
00510
00511
00512
00513 void sendAck(OtherNode node) {
00514
00515 int nWR;
00516
00517 AMMASSO_STATS_START(sendAck)
00518
00519 MACHSTATE2(3, "sendAck() - Ammasso - INFO: Called... sending ACK %d to node %d", *node->remoteAck, node->myNode);
00520
00521 if (*node->remoteAck < ACK_MASK) {
00522
00523
00524 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00525
00526 } else {
00527
00528
00529
00530 AmmassoToken *token;
00531 AmmassoBuffer *tokenBuf;
00532 int tmp_ack = *node->remoteAck;
00533 *node->remoteAck = 0;
00534 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00535
00536 sleep(1);
00537
00538 token = getQPSendToken(node);
00539 tokenBuf = token->localBuf;
00540 tokenBuf->tail.ack = tmp_ack;
00541 tokenBuf->tail.flags = ACK_WRAPPING;
00542 tokenBuf->tail.length = 1;
00543 token->wr.wr_u.rdma_write.local_sgl.sge_list->length = sizeof(Tailer);
00544 token->wr.wr_u.rdma_write.local_sgl.sge_list->to = (unsigned long)&tokenBuf->tail;
00545 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00546 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00547 LIST_ENQUEUE(node->,usedTokens,token);
00548 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00549 *node->remoteAck = tmp_ack & ACK_MASK;
00550 }
00551
00552 node->messagesNotYetAcknowledged = 0;
00553
00554 AMMASSO_STATS_END(sendAck)
00555 }
00556
00557
00558
00559
00560
00561 AmmassoToken *getQPSendToken(OtherNode node) {
00562 AmmassoToken *token;
00563 int i;
00564 cc_wc_t wc;
00565 ammasso_ack_t newAck;
00566 while (node->connectionState != QP_CONN_STATE_CONNECTED ||
00567 node->sendTokens == NULL) {
00568
00569
00570
00571 MACHSTATE(3, "getQPSendBuffer() - INFO: No tokens available");
00572 if (*node->directAck > node->localAck) {
00573 newAck = *node->directAck;
00574 for (i=node->localAck; i<newAck; ++i) {
00575 LIST_DEQUEUE(node->,usedTokens,token);
00576 LIST_ENQUEUE(node->,sendTokens,token);
00577 }
00578 node->localAck = newAck;
00579 }
00580
00581 CheckRecvBufForMessage(node);
00582
00583 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00584 MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00585 }
00586 }
00587 LIST_DEQUEUE(node->,sendTokens,token);
00588 return token;
00589 }
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 int sendDataOnQP(char* data, int len, OtherNode node, char flags) {
00683
00684 AmmassoToken *sendBufToken;
00685 AmmassoBuffer *tokenBuf;
00686 cc_data_addr_t *tokenSgl;
00687 int toSendLength;
00688 cc_wc_t wc;
00689 cc_uint32_t nWR;
00690 char isFirst = 1;
00691 char *origMsgStart = data;
00692 char *sendBufBegin;
00693
00694 int origSize = len;
00695
00696 if (origSize <= 1024) {
00697 AMMASSO_STATS_START(sendDataOnQP_1024)
00698 } else if (origSize <= 2048) {
00699 AMMASSO_STATS_START(sendDataOnQP_2048)
00700 } else if (origSize <= 4096) {
00701 AMMASSO_STATS_START(sendDataOnQP_4096)
00702 } else if (origSize <= 16384) {
00703 AMMASSO_STATS_START(sendDataOnQP_16384)
00704 } else {
00705 AMMASSO_STATS_START(sendDataOnQP_over)
00706 }
00707
00708 AMMASSO_STATS_START(sendDataOnQP)
00709
00710
00711
00712 while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00713 MACHSTATE1(3, "sendDataOnQP() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00714 }
00715
00716
00717 MACHSTATE2(2, "sendDataOnQP() - Ammasso - INFO: Called (send to node %d, len = %d)...", node->myNode, len);
00718
00719
00720 CmiAssert(flags==0 || len<=AMMASSO_BUFSIZE);
00721
00722
00723
00724
00725
00726 while (len > 0) {
00727
00728 AMMASSO_STATS_START(sendDataOnQP_pre_send)
00729
00730
00731 sendBufToken = getQPSendToken(node);
00732 tokenBuf = sendBufToken->localBuf;
00733
00734
00735 LIST_ENQUEUE(node->,usedTokens,sendBufToken);
00736 node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00737
00738
00739
00740
00741
00742
00743
00744
00745 if (isFirst) {
00746
00747 toSendLength = len > AMMASSO_BUFSIZE ? AMMASSO_BUFSIZE : len;
00748 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00749
00750 memcpy(sendBufBegin, data, toSendLength);
00751
00752 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending 1st Fragment - toSendLength = %d...", toSendLength);
00753
00754 } else {
00755
00756 toSendLength = len > (AMMASSO_BUFSIZE - DGRAM_HEADER_SIZE) ? AMMASSO_BUFSIZE : (len+DGRAM_HEADER_SIZE);
00757 sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00758
00759 memcpy(sendBufBegin+DGRAM_HEADER_SIZE, data, toSendLength-DGRAM_HEADER_SIZE);
00760
00761
00762
00763
00764
00765
00766
00767
00768 memcpy(sendBufBegin, origMsgStart, DGRAM_HEADER_SIZE);
00769
00770 ((DgramHeader*)sendBufBegin)->seqno = node->send_next;
00771 node->send_next = ((node->send_next+1) & DGRAM_SEQNO_MASK);
00772
00773 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending Continuation Fragment - toSendLength = %d...", toSendLength);
00774 }
00775
00776
00777
00778 tokenBuf->tail.length = toSendLength;
00779 node->messagesNotYetAcknowledged = 0;
00780 tokenBuf->tail.ack = *node->remoteAck;
00781 if (*node->remoteAck > ACK_MASK) {
00782
00783 tokenBuf->tail.ack = 0;
00784
00785
00786
00787
00788
00789
00790 sendAck(node);
00791 }
00792 tokenBuf->tail.flags = flags;
00793
00794
00795 tokenSgl = sendBufToken->wr.wr_u.rdma_write.local_sgl.sge_list;
00796 tokenSgl->length = ALIGN8(toSendLength) + sizeof(Tailer);
00797 tokenSgl->to = (unsigned long)sendBufBegin;
00798 sendBufToken->wr.wr_u.rdma_write.remote_to = (unsigned long)(((char*)sendBufToken->remoteBuf)+AMMASSO_BUFSIZE-ALIGN8(toSendLength));
00799
00800
00801
00802 AMMASSO_STATS_END(sendDataOnQP_pre_send)
00803 AMMASSO_STATS_START(sendDataOnQP_send)
00804
00805 MACHSTATE(3, "sendDataOnQP() - Ammasso - INFO: Enqueuing RDMA Write WR...");
00806
00807 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tokenSgl->to = %p", tokenSgl->to);
00808 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: sendBufToken->wr.wr_u.rdma_write.remote_to = %p", sendBufToken->wr.wr_u.rdma_write.remote_to);
00809 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.ack = %d", tokenBuf->tail.ack);
00810 MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.flags = %d", tokenBuf->tail.flags);
00811
00812 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &sendBufToken->wr, 1, &nWR),node->myNode);
00813
00814 MACHSTATE(1, "sendDataOnQP() - Ammasso - INFO: RDMA Write WR Enqueue Completed");
00815
00816 AMMASSO_STATS_END(sendDataOnQP_send)
00817 AMMASSO_STATS_START(sendDataOnQP_post_send)
00818
00819
00820 data += toSendLength;
00821 len -= toSendLength;
00822 if (isFirst == 0) {
00823 data -= DGRAM_HEADER_SIZE;
00824 len += DGRAM_HEADER_SIZE;
00825 }
00826 isFirst = 0;
00827
00828 AMMASSO_STATS_END(sendDataOnQP_post_send)
00829 }
00830
00831 AMMASSO_STATS_END(sendDataOnQP)
00832
00833 if (origSize <= 1024) {
00834 AMMASSO_STATS_END(sendDataOnQP_1024)
00835 } else if (origSize <= 2048) {
00836 AMMASSO_STATS_END(sendDataOnQP_2048)
00837 } else if (origSize <= 4096) {
00838 AMMASSO_STATS_END(sendDataOnQP_4096)
00839 } else if (origSize <= 16384) {
00840 AMMASSO_STATS_END(sendDataOnQP_16384)
00841 } else {
00842 AMMASSO_STATS_END(sendDataOnQP_over)
00843 }
00844
00845 }
00846
00847
00848
00849
00850
00851 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy) {
00852
00853 cc_status_t rtn;
00854 cc_stag_index_t stag;
00855 cc_data_addr_t sgl;
00856 cc_sq_wr_t wr;
00857 cc_uint32_t WRsPosted;
00858
00859 AMMASSO_STATS_START(DeliverViaNetwork)
00860
00861 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Called...");
00862
00863
00864
00865
00866
00867
00868
00869 AMMASSO_STATS_START(DeliverViaNetwork_post_lock)
00870
00871 DgramHeaderMake(msg->data, rank, msg->src, Cmi_charmrun_pid, otherNode->send_next, broot);
00872 otherNode->send_next = ((otherNode->send_next+1) & DGRAM_SEQNO_MASK);
00873
00874 MACHSTATE1(1, "DeliverViaNetwork() - INFO: Sending message to node %d", otherNode->myNode);
00875 MACHSTATE1(1, "DeliverViaNetwork() - INFO: rank %d", rank);
00876 MACHSTATE1(1, "DeliverViaNetwork() - INFO: broot %d", broot);
00877
00878 AMMASSO_STATS_START(DeliverViaNetwork_send)
00879
00880 sendDataOnQP(msg->data, msg->size, otherNode, 0);
00881
00882 AMMASSO_STATS_END(DeliverViaNetwork_send)
00883
00884
00885 MACHSTATE(1, "DeliverViaNetwork() - INFO: Post-send_next_lock");
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937 MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Completed.");
00938
00939 AMMASSO_STATS_END(DeliverViaNetwork_post_lock)
00940 AMMASSO_STATS_END(DeliverViaNetwork)
00941 }
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955 int CheckSocketsReady(int withDelayMs)
00956 {
00957 int nreadable;
00958 CMK_PIPE_DECL(withDelayMs);
00959
00960 CmiStdoutAdd(CMK_PIPE_SUB);
00961 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00962
00963 nreadable=CMK_PIPE_CALL();
00964 ctrlskt_ready_read = 0;
00965 dataskt_ready_read = 0;
00966 dataskt_ready_write = 0;
00967
00968 if (nreadable == 0) {
00969 MACHSTATE(2, "} CheckSocketsReady (nothing readable)");
00970 return nreadable;
00971 }
00972 if (nreadable==-1) {
00973 CMK_PIPE_CHECKERR();
00974 MACHSTATE(3, "} CheckSocketsReady (INTERRUPTED!)");
00975 return CheckSocketsReady(0);
00976 }
00977 CmiStdoutCheck(CMK_PIPE_SUB);
00978 if (Cmi_charmrun_fd!=-1)
00979 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00980 MACHSTATE(3, "} CheckSocketsReady");
00981 return nreadable;
00982 }
00983
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995 static void ServiceCharmrun_nolock() {
00996
00997 int again = 1;
00998
00999 MACHSTATE(2, "ServiceCharmrun_nolock begin {");
01000
01001 while (again) {
01002 again = 0;
01003
01004 CheckSocketsReady(0);
01005 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
01006 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
01007 }
01008
01009 MACHSTATE(2, "} ServiceCharmrun_nolock end");
01010 }
01011
01012 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from) {
01013
01014 int nodeIndex, ctrlType, i, n, nWR;
01015 AmmassoToken *token, *pretoken;
01016 AmmassoBuffer *tokenBuf;
01017 cc_data_addr_t *tokenSgl;
01018 OtherNode node;
01019 AmmassoTokenDescription *tokenDesc;
01020
01021 AMMASSO_STATS_START(processAmmassoControlMessage)
01022
01023
01024
01025
01026 switch (tail->flags) {
01027
01028 case AMMASSO_READY:
01029
01030
01031 contextBlock->nodeReadyCount--;
01032 MACHSTATE1(3, "processAmmassoControlMessage() - Ammasso - INFO: Received READY packet... still waiting for %d more...", contextBlock->nodeReadyCount);
01033
01034 break;
01035
01036 case AMMASSO_ALLOCATE:
01037
01038 token = getQPSendToken(from);
01039 tokenBuf = token->localBuf;
01040
01041 tokenBuf->tail.length = 1;
01042 tokenBuf->tail.ack = 0;
01043 tokenBuf->tail.flags = AMMASSO_ALLOCATED;
01044
01045
01046 tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
01047 tokenSgl->length = sizeof(Tailer);
01048 tokenSgl->to = (unsigned long)&tokenBuf->tail;
01049 token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
01050
01051 CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
01052
01053 LIST_ENQUEUE(from->,usedTokens,token);
01054 from->max_used_tokens = (from->num_usedTokens>from->