arch/net/machine-ammasso.c

Go to the documentation of this file.
00001 
00025 #ifndef ALIGN8
00026 #define ALIGN8(x)   (int)((~7)&((x)+7))
00027 #endif
00028 
00029 /* DYNAMIC ALLOCATOR: Limit of the allowed pinned memory */
00030 #define MAX_PINNED_MEMORY   100000000
00031 /* DYNAMIC ALLOCATOR END */
00032 
00033 #define WASTE_TIME 600
00034 // In order to use CC_POST_CHECK, the last argument to cc_qp_post_sq must be "nWR"
00035 #define CC_POST_CHECK(routine,args,nodeTo) {\
00036         int retry=10000000; \
00037         while (ammasso_check_post_err(routine args, #routine, __LINE__, &nWR, nodeTo, retry) == 1) { \
00038           int i; \
00039           retry += WASTE_TIME; \
00040           for (i=0; i<=WASTE_TIME; ++i) { retry --; } \
00041         } \
00042       }
00043 
00044 #define CC_CHECK(routine,args) \
00045         ammasso_check_err(routine args, #routine, __LINE__);
00046 
00047 static void ammasso_check_err(cc_status_t returnCode,const char *routine,int line) {
00048   if (returnCode!=CC_OK) {
00049     char buf[128];
00050     char *errMsg = cc_status_to_string(returnCode);
00051 
00052     // Attempt to close the RNIC
00053     cc_rnic_close(contextBlock->rnic);
00054 
00055     // Let the user know what happened and bail
00056     MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00057     MACHSTATE2(5,"  Description: %d, %s\n",returnCode,errMsg);
00058     sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00059             "  Description: %d, %s\n", routine, __FILE__, line,returnCode,errMsg);
00060     CmiAbort(buf);
00061   }
00062 }
00063 
00064 // We pass the pointer used in the cc_qp_post_sq call as last parameter, since
00065 // when this function is called, cc_pq_post_sq has already been called
00066 static int ammasso_check_post_err(cc_status_t returnCode,const char *routine,int line, int *nWR, int nodeTo, int retry) {
00067   if (returnCode == CCERR_TOO_MANY_WRS_POSTED && *nWR != 1 && retry>0) {
00068     cc_wc_t wc;
00069     // drain the send completion queue and retry
00070     while (cc_cq_poll(contextBlock->rnic, nodes[nodeTo].send_cq, &wc) == CC_OK) {
00071       MACHSTATE1(5, "Error posting send request - INFO: Send completed with node %d... now waiting for acknowledge...", nodeTo);
00072     }
00073     MACHSTATE(5, "Error posting send request - Retrying...");
00074     return 1;
00075   }
00076 
00077   if (returnCode != CC_OK || *nWR != 1) {
00078     char buf[128];
00079     char *errMsg = cc_status_to_string(returnCode);
00080 
00081     // Attempt to close the RNIC
00082     cc_rnic_close(contextBlock->rnic);
00083 
00084     // Let the user know what happened and bail
00085     MACHSTATE3(5,"Fatal CC error while executing %s at %s:%d\n", routine, __FILE__, line);
00086     MACHSTATE3(5,"  Description: %d, %s (nWR = %d)\n",returnCode,errMsg,nWR);
00087     sprintf(buf,"Fatal CC error while executing %s at %s:%d\n"
00088             "  Description: %d, %s (nWR = %d)\n", routine, __FILE__, line,returnCode,errMsg,*nWR);
00089     CmiAbort(buf);
00090   }
00091   return 0;
00092 }
00093 
00094 
00096 // Function ProtoTypes /////////////////////////////////////////////////////////////////////////////
00097 
00098 void CmiMachineInit(char **argv);
00099 
00100 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
00101 
00102 void AmmassoDoIdle();
00103 void CmiNotifyIdle(void);
00104 static CmiIdleState* CmiNotifyGetState(void);
00105 static void CmiNotifyBeginIdle(CmiIdleState *s);
00106 static void CmiNotifyStillIdle(CmiIdleState *s);
00107 
00108 void sendAck(OtherNode node);
00109 AmmassoToken *getQPSendToken(OtherNode node);
00110 int sendDataOnQP(char* data, int len, OtherNode node, char flags);
00111 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy);
00112 static void CommunicationServer(int withDelayMs, int where);
00113 void CmiMachineExit();
00114 
00115 void AsynchronousEventHandler(cc_rnic_handle_t rnic, cc_event_record_t *eventRecord, void *cb);
00116 void CheckRecvBufForMessage(OtherNode node);
00117 //void CompletionEventHandler(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb);
00118 //void CompletionEventHandlerWithAckFlag(cc_rnic_handle_t rnic, cc_cq_handle_t cq, void *cb, int breakOnAck);
00119 
00120 void CmiAmmassoOpenQueuePairs();
00121 
00122 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from);
00123 int ProcessMessage(char* msg, int len, Tailer *tail, OtherNode from);
00124 
00125 OtherNode getNodeFromQPId(cc_qp_id_t qp_id);
00126 OtherNode getNodeFromQPHandle(cc_qp_handle_t qp);
00127 
00128 void establishQPConnection(OtherNode node, int reuseQPFlag);
00129 void reestablishQPConnection(OtherNode node);
00130 void closeQPConnection(OtherNode node, int destroyQPFlag);
00131 
00132 void BufferAlloc(int n);
00133 void TokenAlloc(int n);
00134 void RequestTokens(OtherNode node, int n);
00135 void GrantTokens(OtherNode node, int n);
00136 void RequestReleaseTokens(OtherNode node, int n);
00137 void ReleaseTokens(OtherNode node, int n);
00138 
00140 // Function Bodies /////////////////////////////////////////////////////////////////////////////////
00141 
00142 /* Callbacks used by the DYNAMIC ALLOCATOR */
00143 void AllocatorCheck () {
00144   int i, limit;
00145   char buf[24];
00146   for (i=0; i<contextBlock->numNodes; ++i) {
00147     if (i==contextBlock->myNode) continue;
00148     limit = nodes[i].num_sendTokens - nodes[i].max_used_tokens - 10;
00149     CmiPrintf("[%d] AllocatorCheck called: node %d, limit %d\n",CmiMyPe(),i,limit);
00150     if (limit > 0) {
00151       ReleaseTokens(&nodes[i], limit);
00152       CmiPrintf("[%d] Releasing %d tokens to %d\n", CmiMyPe(), limit, i);
00153       nodes[i].max_used_tokens = 0;
00154     }
00155   }
00156 }
00157 /* DYNAMIC ALLOCATOR END */
00158 
00159 void BufferAlloc(int n) {
00160   int i;
00161   char buf[128];
00162   cc_stag_index_t newStagIndex;
00163   AmmassoBuffer *newBuffers;
00164 
00165   MACHSTATE1(3, "Allocating %d new Receive Buffers",n);
00166 
00167   // Try to allocate the memory for n receiving buffers
00168   newBuffers = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00169 
00170   if (newBuffers == NULL) {
00171 
00172     // Attempt to close the RNIC
00173     cc_rnic_close(contextBlock->rnic);
00174 
00175     // Let the user know what happened and bail
00176     MACHSTATE(5, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00177     sprintf(buf, "BufferAlloc() - ERROR: Unable to allocate memory for RECV buffers");
00178     CmiAbort(buf);
00179   }
00180 
00181   contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00182   CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00183                                   CC_ADDR_TYPE_VA_BASED,
00184                                   (cc_byte_t*)newBuffers,
00185                                   n*sizeof(AmmassoBuffer),
00186                                   contextBlock->pd_id,
00187                                   0, 0,
00188                                   CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE | CC_ACF_REMOTE_WRITE,
00189                                   &newStagIndex)
00190            );
00191 
00192   for (i=0; i<n; ++i) {
00193     newBuffers[i].tail.length = 0;
00194     newBuffers[i].next = &(newBuffers[i+1]);
00195     newBuffers[i].stag = newStagIndex;
00196   }
00197   newBuffers[n-1].next = NULL;
00198   if (contextBlock->freeRecvBuffers == NULL) {
00199     contextBlock->freeRecvBuffers = newBuffers;
00200   } else {
00201     contextBlock->last_freeRecvBuffers->next = newBuffers;
00202   }
00203   contextBlock->last_freeRecvBuffers = &newBuffers[n-1];
00204   contextBlock->num_freeRecvBuffers += n;
00205 }
00206 
00207 void TokenAlloc(int n) {
00208   int i;
00209   char buf[128];
00210   cc_stag_index_t newStagIndex;
00211   AmmassoToken *sendToken, *tokenScanner;
00212   cc_data_addr_t *sendSgl;
00213   AmmassoBuffer *sendBuffer;
00214 
00215   MACHSTATE1(3, "Allocating %d new Tokens",n);
00216 
00217   // Try to allocate the memory for n sending buffers
00218   sendBuffer = (AmmassoBuffer*) CmiAlloc(n*sizeof(AmmassoBuffer));
00219 
00220   if (sendBuffer == NULL) {
00221 
00222     // Attempt to close the RNIC
00223     cc_rnic_close(contextBlock->rnic);
00224 
00225     // Let the user know what happened and bail
00226     MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00227     sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for SEND buffers");
00228     CmiAbort(buf);
00229   }
00230 
00231   contextBlock->pinnedMemory += n*sizeof(AmmassoBuffer);
00232   CC_CHECK(cc_nsmr_register_virt,(contextBlock->rnic,
00233                                   CC_ADDR_TYPE_VA_BASED,
00234                                   (cc_byte_t*)sendBuffer,
00235                                   n*sizeof(AmmassoBuffer),
00236                                   contextBlock->pd_id,
00237                                   0, 0,
00238                                   CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
00239                                   &newStagIndex)
00240            );
00241 
00242   // Allocate the send tokens
00243   sendToken = (AmmassoToken*) CmiAlloc(n*ALIGN8(sizeof(AmmassoToken)));
00244 
00245   if (sendToken == NULL) {
00246 
00247     // Attempt to close the RNIC
00248     cc_rnic_close(contextBlock->rnic);
00249 
00250     // Let the user know what happened and bail
00251     MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00252     sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send TOKEN buffers");
00253     CmiAbort(buf);
00254   }
00255 
00256   sendSgl = (cc_data_addr_t*) CmiAlloc(n*ALIGN8(sizeof(cc_data_addr_t)));
00257 
00258   if (sendSgl == NULL) {
00259 
00260     // Attempt to close the RNIC
00261     cc_rnic_close(contextBlock->rnic);
00262 
00263     // Let the user know what happened and bail
00264     MACHSTATE(5, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00265     sprintf(buf, "TokenAlloc() - ERROR: Unable to allocate memory for send SGL buffers");
00266     CmiAbort(buf);
00267   }
00268 
00269   tokenScanner = sendToken;
00270   for (i=0; i<n; ++i) {
00271     sendSgl->stag = newStagIndex;
00272     sendSgl->length = AMMASSO_BUFSIZE + sizeof(Tailer);
00273     sendSgl->to = (unsigned long)&(sendBuffer[i]);
00274     tokenScanner->wr.wr_id = (unsigned long)tokenScanner;
00275     tokenScanner->wr.wr_type = CC_WR_TYPE_RDMA_WRITE;
00276     tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_count = 1;
00277     tokenScanner->wr.wr_u.rdma_write.local_sgl.sge_list = sendSgl;
00278     tokenScanner->wr.signaled = 1;
00279     tokenScanner->localBuf = (AmmassoBuffer*)&(sendBuffer[i]);
00280     LIST_ENQUEUE(contextBlock->,freeTokens,tokenScanner);
00281     sendSgl = (cc_data_addr_t*)(((char*)sendSgl)+ALIGN8(sizeof(cc_data_addr_t)));
00282     tokenScanner = (AmmassoToken*)(((char*)tokenScanner)+ALIGN8(sizeof(AmmassoToken)));
00283   }
00284 }
00285 
00286 void RequestTokens(OtherNode node, int n) {
00287   char buf[24];
00288   *((int*)buf) = n;
00289   sendDataOnQP(buf, sizeof(int), node, AMMASSO_MOREBUFFERS);
00290 }
00291 
00292 void GrantTokens(OtherNode node, int n) {
00293   int i;
00294   char *buf;
00295   AmmassoBuffer *buffer;
00296   AmmassoBuffer *prebuffer;
00297   AmmassoTokenDescription *tokenDesc;
00298   if (node->pending != NULL) return;
00299   if (n*sizeof(AmmassoTokenDescription) + sizeof(int) > AMMASSO_BUFSIZE) {
00300     n = (AMMASSO_BUFSIZE-sizeof(int)) / sizeof(AmmassoTokenDescription);
00301   }
00302   if (contextBlock->num_freeRecvBuffers < n) {
00303     int quantity = (n - contextBlock->num_freeRecvBuffers + 1023) & (~1023);
00304     BufferAlloc(quantity);
00305   }
00306   buf = (char*) CmiAlloc(n*sizeof(AmmassoTokenDescription) + sizeof(int));
00307   *((int*)buf) = n;
00308   tokenDesc = (AmmassoTokenDescription*)(buf+sizeof(int));
00309   buffer = contextBlock->freeRecvBuffers;
00310   for (i=0; i<n; ++i) {
00311     tokenDesc[i].stag = buffer->stag;
00312     tokenDesc[i].to = (unsigned long)buffer;
00313     prebuffer = buffer;
00314     buffer = buffer->next;
00315   }
00316   node->pending = contextBlock->freeRecvBuffers;
00317   node->last_pending = prebuffer;
00318   node->num_pending = n;
00319   prebuffer->next = NULL;
00320   contextBlock->num_freeRecvBuffers -= n;
00321   contextBlock->freeRecvBuffers = buffer;
00322   sendDataOnQP(buf, n*sizeof(AmmassoTokenDescription) + sizeof(int), node, AMMASSO_ALLOCATE);
00323   CmiFree(buf);
00324 }
00325 
00326 void RequestReleaseTokens(OtherNode node, int n) {
00327   char buf[24];
00328   *((int*)buf) = n;
00329   sendDataOnQP(buf, sizeof(int), node, AMMASSO_RELEASE);
00330 }
00331 
00332 void ReleaseTokens(OtherNode node, int n) {
00333   int i, nWR;
00334   AmmassoToken *token;
00335   AmmassoBuffer *tokenBuf;
00336   cc_data_addr_t *tokenSgl;
00337 
00338   if (node->num_sendTokens < n) n = node->num_sendTokens - 1;
00339   if (n <= 0) return;
00340   token = node->sendTokens;
00341   tokenBuf = token->localBuf;
00342 
00343   tokenBuf->tail.length = 1;
00344   tokenBuf->tail.ack = 0;  // do not send any ACK with this message
00345   tokenBuf->tail.flags = AMMASSO_RELEASED;
00346 
00347   // Setup the local SGL
00348   tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
00349   tokenSgl->length = sizeof(Tailer);
00350   tokenSgl->to = (unsigned long)&tokenBuf->tail;
00351   token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00352 
00353   CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00354 
00355   if (contextBlock->freeTokens == NULL) {
00356     contextBlock->freeTokens = node->sendTokens;
00357   } else {
00358     contextBlock->last_freeTokens->next = node->sendTokens;
00359   }
00360   for (i=1; i<n; ++i) token = token->next;
00361   contextBlock->last_freeTokens = token;
00362   node->sendTokens = token->next;
00363   token->next = NULL;
00364   contextBlock->num_freeTokens += n;
00365   node->num_sendTokens -= n;
00366 }
00367 
00368 /* CmiMachineInit()
00369  *   This is called as the node is starting up.  It does some initialization of the machine layer.
00370  */
00371 void CmiMachineInit(char **argv) {
00372 
00373   char buf[128];
00374   cc_status_t rtn;
00375 
00376   AMMASSO_STATS_INIT
00377 
00378   AMMASSO_STATS_START(MachineInit)
00379 
00380   MACHSTATE(2, "CmiMachineInit() - INFO: (***** Ammasso Specific*****) - Called... Initializing RNIC...");
00381   MACHSTATE1(1, "CmiMachineInit() - INFO: Cmi_charmrun_pid = %d", Cmi_charmrun_pid);
00382 
00383 
00384   //CcdCallOnConditionKeep(CcdPERIODIC, (CcdVoidFn)periodicFunc, NULL);
00385 
00386 
00387   // Allocate a context block that will be used throughout this machine layer
00388   if (contextBlock != NULL) {
00389     MACHSTATE(5, "CmiMachineInit() - ERROR: contextBlock != NULL");
00390     sprintf(buf, "CmiMachineInit() - ERROR: contextBlock != NULL");
00391     CmiAbort(buf);
00392   }
00393   contextBlock = (mycb_t*)malloc(sizeof(mycb_t));
00394   if (contextBlock == NULL) {
00395     MACHSTATE(5, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00396     sprintf(buf, "CmiMachineInit() - ERROR: Unable to malloc memory for contextBlock");
00397     CmiAbort(buf);
00398   }
00399 
00400   // Initialize the contextBlock by zero-ing everything out and then setting anything special
00401   memset(contextBlock, 0, sizeof(mycb_t));
00402   contextBlock->rnic = -1;
00403 
00404   MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-OPEN_RNIC)");
00405 
00406   // Check to see if in stand-alone mode
00407   if (Cmi_charmrun_pid != 0) {
00408 
00409     // Try to Open the RNIC
00410     //   TODO : Look-up the difference between CC_PBL_PAGE_MODE and CC_PBL_BLOCK_MODE
00411     //   TODO : Would a call to cc_rnic_enum or cc_rnic_query do any good here?
00412     rtn = cc_rnic_open(0, CC_PBL_PAGE_MODE, contextBlock, &(contextBlock->rnic));
00413     if (rtn != CC_OK) {
00414       MACHSTATE2(5, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00415       sprintf(buf, "CmiMachineInit() - ERROR: Unable to open RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00416       CmiAbort(buf);
00417     }
00418 
00419     MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-SET-ASYNC-HANDLER)");
00420 
00421     // Set the asynchronous event handler function
00422     CC_CHECK(cc_eh_set_async_handler,(contextBlock->rnic, AsynchronousEventHandler, contextBlock));
00423 
00424     /*
00425     MACHSTATE(3, "CmiMachineInit() - INFO: (PRE-SET-CE-HANDLER)");
00426 
00427     // Set the Completion Event Handler
00428     contextBlock->eh_id = 0;
00429     CC_CHECK(cc_eh_set_ce_handler,(contextBlock->rnic, CompletionEventHandler, &(contextBlock->eh_id)));
00430     */
00431 
00432     MACHSTATE(1, "CmiMachineInit() - INFO: (PRE-PD-ALLOC)");
00433 
00434     // Allocate the Protection Domain
00435     CC_CHECK(cc_pd_alloc,(contextBlock->rnic, &(contextBlock->pd_id)));
00436 
00437     MACHSTATE(1, "CmiMachineInit() - INFO: RNIC Open For Business!!!");
00438 
00439   } else {  // Otherwise, not in stand-alone mode
00440 
00441     // Flag the rnic variable as invalid
00442     contextBlock->rnic = -1;
00443   }
00444 
00445   MACHSTATE(2, "CmiMachineInit() - INFO: Completed Successfully !!!");
00446 
00447   AMMASSO_STATS_END(MachineInit)
00448 }
00449 
00450 void CmiCommunicationInit(char **argv)
00451 {
00452 }
00453 
00454 void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port) {
00455 
00456   // DMK : NOTE : The hope is that this can be used to request the RMDA addresses of the other nodes after the
00457   //              initial addresses from charmrun are given to the node.  Get the address here, use that to request
00458   //              the RDMA address, use the RDMA address to create the QP connection (in establishQPConnection(), which
00459   //              only subtracts one from the address at the moment... the way our cluster is setup).
00460 
00461   MACHSTATE1(2, "CmiNodeAddressesStoreHandler() - INFO: pe = %d", pe);
00462   MACHSTATE1(1, "                                       addr = { sin_family = %d,", addr->sin_family);
00463   MACHSTATE1(1, "                                                sin_port = %d,", addr->sin_port);
00464   MACHSTATE4(1, "                                                sin_addr.s_addr = %d.%d.%d.%d }", (addr->sin_addr.s_addr & 0xFF), ((addr->sin_addr.s_addr >> 8) & 0xFF), ((addr->sin_addr.s_addr >> 16) & 0xFF), ((addr->sin_addr.s_addr >> 24) & 0xFF));
00465   MACHSTATE1(1, "                                       port = %d", port);
00466 }
00467 
00468 
00469 void AmmassoDoIdle() {
00470 
00471   int i;
00472   cc_wc_t wc;
00473 
00474   AMMASSO_STATS_START(AmmassoDoIdle)
00475 
00476   /* DYNAMIC ALLOCATOR: Callbacks */
00477   /*if (contextBlock->conditionRegistered == 0) {
00478     CcdCallOnConditionKeep(CcdPERIODIC_1s, (CcdVoidFn) AllocatorCheck, NULL);
00479     //CcdCallFnAfter((CcdVoidFn) AllocatorCheck, NULL, 100);
00480     contextBlock->conditionRegistered = 1;
00481     }*/
00482   /* DYNAMIC ALLOCATOR END */
00483 
00484   for (i = 0; i < contextBlock->numNodes; i++) {
00485     if (i == contextBlock->myNode) continue;
00486     CheckRecvBufForMessage(&(nodes[i]));
00487     while (cc_cq_poll(contextBlock->rnic, nodes[i].send_cq, &wc) == CC_OK) {
00488       MACHSTATE1(3, "AmmassoDoIdle() - INFO: Send completed with node %d... now waiting for acknowledge...", i);
00489     }
00490   }
00491 
00492   AMMASSO_STATS_END(AmmassoDoIdle)
00493 }
00494 
00495 void CmiNotifyIdle(void) {
00496   AmmassoDoIdle();
00497 }
00498 
00499 static CmiIdleState* CmiNotifyGetState(void) {
00500   return NULL;
00501 }
00502 
00503 static void CmiNotifyBeginIdle(CmiIdleState *s) {
00504   AmmassoDoIdle();
00505 }
00506 
00507 static void CmiNotifyStillIdle(CmiIdleState *s) {
00508   AmmassoDoIdle();
00509 }
00510 
00511 /* NOTE: if the ack overflows, we cannot use this method of sending, but we need
00512    to send a special message for the purpose */
00513 void sendAck(OtherNode node) {
00514 
00515   int nWR;
00516 
00517   AMMASSO_STATS_START(sendAck)
00518 
00519   MACHSTATE2(3, "sendAck() - Ammasso - INFO: Called... sending ACK %d to node %d", *node->remoteAck, node->myNode);
00520 
00521   if (*node->remoteAck < ACK_MASK) {
00522 
00523     // Send an ACK message to the specified QP/Connection/Node
00524     CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00525 
00526   } else {
00527     // Rare case: happens only after days of run! In this case, do not update
00528     // directly the ack, but zero it on the other side, wait one second to be
00529     // safe, and then send a regular message with the ACK
00530     AmmassoToken *token;
00531     AmmassoBuffer *tokenBuf;
00532     int tmp_ack = *node->remoteAck;
00533     *node->remoteAck = 0;
00534     CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, node->ack_sq_wr, 1, &nWR),node->myNode);
00535 
00536     sleep(1);
00537 
00538     token = getQPSendToken(node);
00539     tokenBuf = token->localBuf;
00540     tokenBuf->tail.ack = tmp_ack;
00541     tokenBuf->tail.flags = ACK_WRAPPING;
00542     tokenBuf->tail.length = 1; // So it will be seen by the receiver
00543     token->wr.wr_u.rdma_write.local_sgl.sge_list->length = sizeof(Tailer);
00544     token->wr.wr_u.rdma_write.local_sgl.sge_list->to = (unsigned long)&tokenBuf->tail;
00545     token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
00546     CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
00547     LIST_ENQUEUE(node->,usedTokens,token);
00548     node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00549     *node->remoteAck = tmp_ack & ACK_MASK;
00550   }
00551 
00552   node->messagesNotYetAcknowledged = 0;
00553 
00554   AMMASSO_STATS_END(sendAck)
00555 }
00556 
00557 /* NOTE, even in SMP versions, only the communication server should be sending
00558    messages out, thus no locking should be necessary */
00559 /* This function returns the token usable for next communication. If no token is
00560    available, it blocks until one becomes available */
00561 AmmassoToken *getQPSendToken(OtherNode node) {
00562   AmmassoToken *token;
00563   int i;
00564   cc_wc_t wc;
00565   ammasso_ack_t newAck;
00566   while (node->connectionState != QP_CONN_STATE_CONNECTED ||
00567          node->sendTokens == NULL) {
00568     // Try to see if an ACK has been sent directly, so we free some tokens The
00569     // direct token will never be greater than ACK_MASK (by protocol
00570     // definition), so we do not need to wrap around
00571     MACHSTATE(3, "getQPSendBuffer() - INFO: No tokens available");
00572     if (*node->directAck > node->localAck) {
00573       newAck = *node->directAck;
00574       for (i=node->localAck; i<newAck; ++i) {
00575         LIST_DEQUEUE(node->,usedTokens,token);
00576         LIST_ENQUEUE(node->,sendTokens,token);
00577       }
00578       node->localAck = newAck;
00579     }
00580 
00581     CheckRecvBufForMessage(node);
00582 
00583     while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00584       MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00585     }
00586   }
00587   LIST_DEQUEUE(node->,sendTokens,token);
00588   return token;
00589 }
00590 
00591 /*
00592 int getQPSendBuffer(OtherNode node, char force) {
00593 
00594   int rtnBufIndex, i;
00595   cc_wc_t wc;
00596 
00597   AMMASSO_STATS_START(getQPSendBuffer)
00598 
00599   MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Called (send to node %d)...", node->myNode);
00600 
00601   while (1) {
00602 
00603     AMMASSO_STATS_START(getQPSendBuffer_loop)
00604 
00605     rtnBufIndex = -1;
00606 
00607     AMMASSO_STATS_START(getQPSendBuffer_lock)
00608 
00609     MACHSTATE(3, "getQPSendBuffer() - INFO: Pre-sendBufLock");
00610     #if CMK_SHARED_VARS_UNAVAILABLE
00611       while (node->sendBufLock != 0) { usleep(1); } // Since CmiLock() is not really a lock, actually wait
00612     #endif
00613     CmiLock(node->sendBufLock);
00614 
00615     AMMASSO_STATS_END(getQPSendBuffer_lock)
00616     
00617     // If force is set, let the message use any of the available send buffers.  Otherwise, there can only
00618     // be AMMASSO_NUMMSGBUFS_PER_QP message outstanding (haven't gotten an ACK for) so wait for that.
00619     // VERY IMPORTANT !!! Only use force for sending ACKs !!!!!!!!  If this is done, it is ensured that there
00620     // will always be at least one buffer available when the force code is executed
00621     if (node->connectionState == QP_CONN_STATE_CONNECTED) {
00622       if (force) {
00623         rtnBufIndex = node->send_UseIndex;
00624         node->send_UseIndex++;
00625         if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
00626           node->send_UseIndex = 0;
00627       } else {
00628         if (node->send_InUseCounter < AMMASSO_NUMMSGBUFS_PER_QP) {
00629           rtnBufIndex = node->send_UseIndex;
00630           node->send_InUseCounter++;
00631           node->send_UseIndex++;
00632           if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
00633             node->send_UseIndex = 0;
00634         }
00635       }
00636     }
00637  
00638     CmiUnlock(node->sendBufLock);
00639     MACHSTATE3(3, "getQPSendBuffer() - INFO: Post-sendBufLock - rtnBufIndex = %d, node->connectionState = %d, node->send_UseIndex = %d", rtnBufIndex, node->connectionState, node->send_UseIndex);
00640 
00641     if (rtnBufIndex >= 0) {
00642 
00643       AMMASSO_STATS_END(getQPSendBuffer_loop)
00644       break;
00645 
00646     } else {
00647 
00648       //usleep(1);
00649 
00650       AMMASSO_STATS_START(getQPSendBuffer_CEH)
00651 
00652       CheckRecvBufForMessage(node);
00653       //CompletionEventHandlerWithAckFlag(contextBlock->rnic, node->recv_cq, node, 1);
00654 
00655       //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
00656       while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00657         MACHSTATE1(3, "getQPSendBuffer() - INFO: Send completed with node %d... now waiting for acknowledge...", nextNode);
00658       }
00659 
00660 
00661       AMMASSO_STATS_END(getQPSendBuffer_CEH)
00662 
00663       AMMASSO_STATS_END(getQPSendBuffer_loop)
00664     }
00665   }
00666 
00668   //node->send_UseIndex++;
00669   //if (node->send_UseIndex >= AMMASSO_NUMMSGBUFS_PER_QP * 2)
00670   //  node->send_UseIndex = 0;
00671 
00672   MACHSTATE1(3, "getQPSendBuffer() - Ammasso - Finished (returning buffer index: %d)", rtnBufIndex);
00673 
00674   AMMASSO_STATS_END(getQPSendBuffer)
00675 
00676   return rtnBufIndex;
00677 
00678 }
00679 */
00680 
00681 // NOTE: The force parameter can be thought of as an "is ACK" control message flag (see comments in getQPSendBuffer())
00682 int sendDataOnQP(char* data, int len, OtherNode node, char flags) {
00683 
00684   AmmassoToken *sendBufToken;
00685   AmmassoBuffer *tokenBuf;
00686   cc_data_addr_t *tokenSgl;
00687   int toSendLength;
00688   cc_wc_t wc;
00689   cc_uint32_t nWR;
00690   char isFirst = 1;
00691   char *origMsgStart = data;
00692   char *sendBufBegin;
00693 
00694   int origSize = len;
00695 
00696   if (origSize <= 1024) {
00697     AMMASSO_STATS_START(sendDataOnQP_1024)
00698   } else if (origSize <= 2048) { 
00699     AMMASSO_STATS_START(sendDataOnQP_2048)
00700   } else if (origSize <= 4096) { 
00701     AMMASSO_STATS_START(sendDataOnQP_4096)
00702   } else if (origSize <= 16384) { 
00703     AMMASSO_STATS_START(sendDataOnQP_16384)
00704   } else {
00705     AMMASSO_STATS_START(sendDataOnQP_over)
00706   }
00707 
00708   AMMASSO_STATS_START(sendDataOnQP)
00709 
00710   //CompletionEventHandler(contextBlock->rnic, node->recv_cq, node);
00711   //CompletionEventHandler(contextBlock->rnic, node->send_cq, node);
00712   while (cc_cq_poll(contextBlock->rnic, node->send_cq, &wc) == CC_OK) {
00713     MACHSTATE1(3, "sendDataOnQP() - INFO: Send completed with node %d... now waiting for acknowledge...", node->myNode);
00714   }
00715 
00716 
00717   MACHSTATE2(2, "sendDataOnQP() - Ammasso - INFO: Called (send to node %d, len = %d)...", node->myNode, len);
00718 
00719   // Assert that control messages will not be fragmented
00720   CmiAssert(flags==0 || len<=AMMASSO_BUFSIZE);
00721 
00722   // DMK : For each message that is fragmented, attach another DGRAM header to
00723   // it, (keeping in mind that the control messages are no where near large
00724   // enough for this to occur).
00725 
00726   while (len > 0) {
00727 
00728     AMMASSO_STATS_START(sendDataOnQP_pre_send)  
00729 
00730     // Get a free send buffer (NOTE: This call will block until a send buffer is free)
00731     sendBufToken = getQPSendToken(node);
00732     tokenBuf = sendBufToken->localBuf;
00733     // Enqueue the token to the used queue immediately, so it is safe to be
00734     // interrupted by other calls
00735     LIST_ENQUEUE(node->,usedTokens,sendBufToken);
00736     node->max_used_tokens = (node->num_usedTokens>node->max_used_tokens)?node->num_usedTokens:node->max_used_tokens;
00737 
00738     // Copy the contents (up to AMMASSO_BUFSIZE worth) of data into the send buffer
00739 
00740     // The toSendLength includes the DGRAM header size. If the chunk sent is not
00741     // the first, the initial DGRAM_HEADER_SIZE bytes need to be contructed from
00742     // the DGRAM header of the original message (instead of just being copied
00743     // together with the message itself)
00744 
00745     if (isFirst) {
00746 
00747       toSendLength = len > AMMASSO_BUFSIZE ? AMMASSO_BUFSIZE : len;  // MIN of len and AMMASSO_BUFSIZE
00748       sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00749 
00750       memcpy(sendBufBegin, data, toSendLength);
00751 
00752       MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending 1st Fragment - toSendLength = %d...", toSendLength);
00753 
00754     } else {
00755 
00756       toSendLength = len > (AMMASSO_BUFSIZE - DGRAM_HEADER_SIZE) ? AMMASSO_BUFSIZE : (len+DGRAM_HEADER_SIZE);  // MIN of len and AMMASSO_BUFSIZE
00757       sendBufBegin = tokenBuf->buf + AMMASSO_BUFSIZE - ALIGN8(toSendLength);
00758 
00759       memcpy(sendBufBegin+DGRAM_HEADER_SIZE, data, toSendLength-DGRAM_HEADER_SIZE);
00760       
00761       // This dgram header is the same of the original message, except for the
00762       // sequence number, so copy the original and just modify the sequence
00763       // number.
00764 
00765       // NOTE: If the message is large enough that fragmentation needs to
00766       // happen, the send_next_lock is already owned by the thread executing
00767       // this code.
00768       memcpy(sendBufBegin, origMsgStart, DGRAM_HEADER_SIZE);
00769 
00770       ((DgramHeader*)sendBufBegin)->seqno = node->send_next;
00771       node->send_next = ((node->send_next+1) & DGRAM_SEQNO_MASK);  // Increase the sequence number
00772 
00773       MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: Sending Continuation Fragment - toSendLength = %d...", toSendLength);
00774     }
00775 
00776     // Write the size of the message at the end of the buffer, with the ack and
00777     // flags
00778     tokenBuf->tail.length = toSendLength;
00779     node->messagesNotYetAcknowledged = 0;
00780     tokenBuf->tail.ack = *node->remoteAck;
00781     if (*node->remoteAck > ACK_MASK) {
00782       // Rare case of ACK wrapping
00783       tokenBuf->tail.ack = 0;
00784       // in the rare case that we didn't send an ack with this message because
00785       // the ack just wrapped around (rare case), send a full ACK message. It is
00786       // safe to send it now, since the queues are consistent between sender and
00787       // receiver. The fact that this ack is effectively sent before the regular
00788       // message is not important, since on the other side it will be discovered
00789       // only after this one is received
00790       sendAck(node);
00791     }
00792     tokenBuf->tail.flags = flags;
00793 
00794     // Setup the local SGL
00795     tokenSgl = sendBufToken->wr.wr_u.rdma_write.local_sgl.sge_list;
00796     tokenSgl->length = ALIGN8(toSendLength) + sizeof(Tailer);
00797     tokenSgl->to = (unsigned long)sendBufBegin;
00798     sendBufToken->wr.wr_u.rdma_write.remote_to = (unsigned long)(((char*)sendBufToken->remoteBuf)+AMMASSO_BUFSIZE-ALIGN8(toSendLength));
00799 
00800     // The remote_to and remote_stag are already fixed part of the token
00801  
00802     AMMASSO_STATS_END(sendDataOnQP_pre_send)  
00803     AMMASSO_STATS_START(sendDataOnQP_send)
00804 
00805     MACHSTATE(3, "sendDataOnQP() - Ammasso - INFO: Enqueuing RDMA Write WR...");
00806 
00807     MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tokenSgl->to = %p", tokenSgl->to);
00808     MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: sendBufToken->wr.wr_u.rdma_write.remote_to = %p", sendBufToken->wr.wr_u.rdma_write.remote_to);
00809     MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.ack = %d", tokenBuf->tail.ack);
00810     MACHSTATE1(1, "sendDataOnQP() - Ammasso - INFO: tail.flags = %d", tokenBuf->tail.flags);
00811 
00812     CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &sendBufToken->wr, 1, &nWR),node->myNode);
00813     
00814     MACHSTATE(1, "sendDataOnQP() - Ammasso - INFO: RDMA Write WR Enqueue Completed");
00815 
00816     AMMASSO_STATS_END(sendDataOnQP_send)
00817     AMMASSO_STATS_START(sendDataOnQP_post_send)  
00818 
00819     // Update the data and len variables for the next while (if fragmenting is needed)
00820     data += toSendLength;
00821     len -= toSendLength;
00822     if (isFirst == 0) {
00823       data -= DGRAM_HEADER_SIZE;
00824       len += DGRAM_HEADER_SIZE;
00825     }
00826     isFirst = 0;
00827 
00828     AMMASSO_STATS_END(sendDataOnQP_post_send)  
00829   }
00830 
00831   AMMASSO_STATS_END(sendDataOnQP)
00832 
00833   if (origSize <= 1024) {
00834     AMMASSO_STATS_END(sendDataOnQP_1024)
00835   } else if (origSize <= 2048) { 
00836     AMMASSO_STATS_END(sendDataOnQP_2048)
00837   } else if (origSize <= 4096) { 
00838     AMMASSO_STATS_END(sendDataOnQP_4096)
00839   } else if (origSize <= 16384) { 
00840     AMMASSO_STATS_END(sendDataOnQP_16384)
00841   } else {
00842     AMMASSO_STATS_END(sendDataOnQP_over)
00843   }
00844 
00845 }
00846 
00847 
00848 /* DeliverViaNetwork()
00849  *
00850  */
00851 void DeliverViaNetwork(OutgoingMsg msg, OtherNode otherNode, int rank, unsigned int broot, int copy) {
00852 
00853   cc_status_t rtn;
00854   cc_stag_index_t stag;
00855   cc_data_addr_t sgl;
00856   cc_sq_wr_t wr;
00857   cc_uint32_t WRsPosted;
00858 
00859   AMMASSO_STATS_START(DeliverViaNetwork)
00860 
00861   MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Called...");
00862 
00863   // We don't need to do this since the message data is being copied into the
00864   // send_buf, the OutgoingMsg can be free'd ASAP
00865 
00866   // The lock will be already held by the calling function in machine.c
00867   // (CommLock)
00868 
00869   AMMASSO_STATS_START(DeliverViaNetwork_post_lock)
00870 
00871   DgramHeaderMake(msg->data, rank, msg->src, Cmi_charmrun_pid, otherNode->send_next, broot);  // Set DGram Header Fields In-Place
00872   otherNode->send_next = ((otherNode->send_next+1) & DGRAM_SEQNO_MASK);  // Increase the sequence number
00873 
00874   MACHSTATE1(1, "DeliverViaNetwork() - INFO: Sending message to  node %d", otherNode->myNode);
00875   MACHSTATE1(1, "DeliverViaNetwork() - INFO:                     rank %d", rank);
00876   MACHSTATE1(1, "DeliverViaNetwork() - INFO:                    broot %d", broot);
00877 
00878   AMMASSO_STATS_START(DeliverViaNetwork_send)
00879 
00880   sendDataOnQP(msg->data, msg->size, otherNode, 0);
00881 
00882   AMMASSO_STATS_END(DeliverViaNetwork_send)
00883 
00884     //CmiUnlock(otherNode->send_next_lock);
00885   MACHSTATE(1, "DeliverViaNetwork() - INFO: Post-send_next_lock");
00886 
00887 
00888   // DMK : NOTE : I left this in as an example of how to retister the memory with the RNIC on the fly.  Since the ccil
00889   //              library has a bug which causes it not to de-pin memory that we unregister, it will probably be a better
00890   //              idea to fragment a message that is too large for a single buffer from the buffer pool.
00891   /***************************************************************
00892   // DMK : TODO : This is an important optimization area.  This is registering the memory where the outgoing message
00893   //              is located with the RNIC.  One balancing act that we will need to do is the cost of copying messages
00894   //              into memory regions already allocated VS. the cost of registering the memory.  The cost of registering
00895   //              memory might be constant as the memory doesn't have to traversed.  If the cost of doing a memcpy on a
00896   //              small message, since memcpy traverses the memory range, is less than the cost of registering the
00897   //              memory with the RNIC, it would be better to copy the message into a pre-registered memory location.
00898  
00899   // Start by registering the memory of the outgoing message with the RNIC
00900   rtn = cc_nsmr_register_virt(contextBlock->rnic,
00901                               CC_ADDR_TYPE_VA_BASED,
00902                               msg->data + DGRAM_HEADER_SIZE,
00903                               msg->size - DGRAM_HEADER_SIZE,
00904                               contextBlock->pd_id,
00905                               0, 0,
00906                               CC_ACF_LOCAL_READ | CC_ACF_LOCAL_WRITE,
00907                               &stag
00908                              );
00909   if (rtn != CC_OK) {
00910     // Let the user know what happened
00911     MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable to register OutgoingMsg memory with RNIC: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00912     return;
00913   }
00914 
00915   // Setup the Scatter/Gather List
00916   sgl.stag = stag;
00917   sgl.to = (cc_uint64_t)(unsigned long)(msg->data + DGRAM_HEADER_SIZE);
00918   sgl.length = msg->size - DGRAM_HEADER_SIZE;
00919 
00920   // Create the Work Request
00921   wr.wr_id = (cc_uint64_t)(unsigned long)&(wr);
00922   wr.wr_type = CC_WR_TYPE_SEND;
00923   wr.wr_u.send.local_sgl.sge_count = 1;
00924   wr.wr_u.send.local_sgl.sge_list = &sgl;
00925   wr.signaled = 1;
00926 
00927   // Post the Work Request to the Send Queue
00928   // DMK : TODO : Update this code to handle CCERR_QP_TOO_MANY_WRS_POSTED errors (pause breifly and retry)
00929   rtn = cc_qp_post_sq(contextBlock->rnic, otherNode->qp, &(wr), 1, &(WRsPosted));
00930   if (rtn != CC_OK || WRsPosted != 1) {
00931     // Let the user know what happened
00932     MACHSTATE2(3, "DeliverViaNetwork() - Ammasso - ERROR - Unable post Work Request to Send Queue: %d, \"%s\"", rtn, cc_status_to_string(rtn));
00933     displayQueueQuery(otherNode->qp, &(otherNode->qp_attrs));
00934   }
00935   ***************************************************************/
00936 
00937   MACHSTATE(2, "DeliverViaNetwork() - Ammasso - INFO: Completed.");
00938 
00939   AMMASSO_STATS_END(DeliverViaNetwork_post_lock)
00940   AMMASSO_STATS_END(DeliverViaNetwork)
00941 }
00942 
00943 
00944 /****************************************************************************
00945  *                                                                          
00946  * CheckSocketsReady
00947  *
00948  * Checks both sockets to see which are readable and which are writeable.
00949  * We check all these things at the same time since this can be done for
00950  * free with ``select.'' The result is stored in global variables, since
00951  * this is essentially global state information and several routines need it.
00952  *
00953  ***************************************************************************/
00954 
00955 int CheckSocketsReady(int withDelayMs)
00956 {   
00957   int nreadable;
00958   CMK_PIPE_DECL(withDelayMs);
00959 
00960   CmiStdoutAdd(CMK_PIPE_SUB);
00961   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
00962 
00963   nreadable=CMK_PIPE_CALL();
00964   ctrlskt_ready_read = 0;
00965   dataskt_ready_read = 0;
00966   dataskt_ready_write = 0;
00967   
00968   if (nreadable == 0) {
00969     MACHSTATE(2, "} CheckSocketsReady (nothing readable)");
00970     return nreadable;
00971   }
00972   if (nreadable==-1) {
00973     CMK_PIPE_CHECKERR();
00974     MACHSTATE(3, "} CheckSocketsReady (INTERRUPTED!)");
00975     return CheckSocketsReady(0);
00976   }
00977   CmiStdoutCheck(CMK_PIPE_SUB);
00978   if (Cmi_charmrun_fd!=-1) 
00979           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
00980   MACHSTATE(3, "} CheckSocketsReady");
00981   return nreadable;
00982 }
00983 
00984 /***********************************************************************
00985  * CommunicationServer()
00986  * 
00987  * This function does the scheduling of the tasks related to the
00988  * message sends and receives. 
00989  * It first check the charmrun port for message, and poll the gm event
00990  * for send complete and outcoming messages.
00991  *
00992  ***********************************************************************/
00993 
00994 // NOTE: Always called from interrupt
00995 static void ServiceCharmrun_nolock() {
00996 
00997   int again = 1;
00998 
00999   MACHSTATE(2, "ServiceCharmrun_nolock begin {");
01000 
01001   while (again) {
01002     again = 0;
01003 
01004     CheckSocketsReady(0);
01005     if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
01006     if (CmiStdoutNeedsService()) { CmiStdoutService(); }
01007   }
01008 
01009   MACHSTATE(2, "} ServiceCharmrun_nolock end");
01010 }
01011 
01012 void processAmmassoControlMessage(char* msg, int len, Tailer *tail, OtherNode from) {
01013 
01014   int nodeIndex, ctrlType, i, n, nWR;
01015   AmmassoToken *token, *pretoken;
01016   AmmassoBuffer *tokenBuf;
01017   cc_data_addr_t *tokenSgl;
01018   OtherNode node;
01019   AmmassoTokenDescription *tokenDesc;
01020 
01021   AMMASSO_STATS_START(processAmmassoControlMessage)
01022 
01023   // Do not check the message, the flags field was set, and this is enough
01024 
01025   // Perform an action based on the control message type
01026   switch (tail->flags) {
01027 
01028   case AMMASSO_READY:
01029 
01030     // Decrement the node ready count by one
01031     contextBlock->nodeReadyCount--;
01032     MACHSTATE1(3, "processAmmassoControlMessage() - Ammasso - INFO: Received READY packet... still waiting for %d more...", contextBlock->nodeReadyCount);
01033 
01034     break;
01035 
01036   case AMMASSO_ALLOCATE: // Sent by the receiver to allocate more tokens
01037 
01038     token = getQPSendToken(from);
01039     tokenBuf = token->localBuf;
01040 
01041     tokenBuf->tail.length = 1;
01042     tokenBuf->tail.ack = 0;  // do not send any ACK with this message
01043     tokenBuf->tail.flags = AMMASSO_ALLOCATED;
01044 
01045     // Setup the local SGL
01046     tokenSgl = token->wr.wr_u.rdma_write.local_sgl.sge_list;
01047     tokenSgl->length = sizeof(Tailer);
01048     tokenSgl->to = (unsigned long)&tokenBuf->tail;
01049     token->wr.wr_u.rdma_write.remote_to = (unsigned long)&token->remoteBuf->tail;
01050 
01051     CC_POST_CHECK(cc_qp_post_sq,(contextBlock->rnic, node->qp, &token->wr, 1, &nWR),node->myNode);
01052 
01053     LIST_ENQUEUE(from->,usedTokens,token);
01054     from->max_used_tokens = (from->num_usedTokens>from->