
00001 00007 #include "MeshStreamingStrategy.h" 00008 #include "pup_cmialloc.h" 00009 //#include "MsgPacker.h" 00010 00011 /**** not needed any-more after pup_CmiAlloc 00012 // These macros are taken directly from convcore.c. 00013 #define SIZEFIELD(m) (((CmiChunkHeader *)(m))[-1].size) 00014 #define REFFIELD(m) (((CmiChunkHeader *)(m))[-1].ref) 00015 #define BLKSTART(m) (((CmiChunkHeader *)(m))-1) 00016 ***/ 00017 00018 // These externs are defined inside ComlibManager.C. 00019 //CkpvExtern(CkGroupID, cmgrID); 00020 //CkpvExtern(int, RecvmsgHandle); 00021 00022 CkpvDeclare(int, streaming_column_handler_id); 00023 00024 /************************************************************************** 00025 ** This handler is invoked automatically when the processor goes idle. 00026 ** 00027 ** The idle handler automatically re-registers itself, so there is no need 00028 ** to re-register it from here. 00029 ** 00030 ** If nothing else is going on anyway, we might as well flush the buffers 00031 ** now instead of waiting for the flush period. 00032 */ 00033 void idle_flush_handler (void *ptr, double curT) 00034 { 00035 ComlibPrintf ("[%d] idle_flush_handler() invoked.\n", CkMyPe()); 00036 00037 MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr; 00038 classptr->FlushBuffers (); 00039 } 00040 00041 00042 00043 /************************************************************************** 00044 ** This handler is invoked automatically after a timeout occurs. 00045 ** 00046 ** The periodic handler does not automatically re-register itself, so it 00047 ** calls RegisterPeriodicFlush() to do so after it finishes flushing 00048 ** buffers. 00049 */ 00050 void periodic_flush_handler (void *ptr, double curT) 00051 { 00052 ComlibPrintf ("[%d] periodic_flush_handler() invoked.\n", CkMyPe()); 00053 00054 MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr; 00055 classptr->FlushBuffers (); 00056 classptr->RegisterPeriodicFlush (); 00057 } 00058 00059 00060 00061 /************************************************************************** 00062 ** This handler is invoked automatically when a packed message for a column 00063 ** is received. 00064 ** 00065 ** The layout of the message received is shown in the diagram below. 00066 ** 00067 ** \ / 00068 ** +------------||---------------------------------------------+ 00069 ** |Conv| I | # || dest || size | ref || Converse || user | ... 00070 ** |hdr | D | || PE || | cnt || header || data | ... 00071 ** +------------||---------------------------------------------+ 00072 ** / \ 00073 ** 00074 ** The function first retrieves the strategy ID and the number of messages 00075 ** in the packed message and then uses the strategy ID to obtain a pointer 00076 ** to the MeshStreamingStrategy class. It also obtains the row length by 00077 ** calling GetRowLength(). 00078 ** 00079 ** The function then iterates through the messages in the packed message. 00080 ** For each message within, it allocates space by calling CmiAlloc() and 00081 ** then copies the message from the packed buffer into the new message 00082 ** buffer. It is also able to obtain the destination PE for the message 00083 ** because this information is included in the packed message data for 00084 ** each packed message. If the destination PE is the current PE, the 00085 ** message is delivered immediately via a call to CmiSyncSendAndFree(). 00086 ** This routine calls CmiFree() on the message, which is appropriate 00087 ** since it was allocated with CmiAlloc(). Otherwise, the message is 00088 ** inserted into the row bucket for the necessary row by calling 00089 ** InsertIntoRowBucket(). When messages are delivered from the row 00090 ** bucket, they are freed by CmiFree(). 00091 */ 00092 void streaming_column_handler (void *msg) 00093 { 00094 int dest_row; 00095 int my_pe; 00096 //int num_msgs; 00097 int row_length; 00098 //int strategy_id; 00099 //char *msgptr; 00100 char *newmsg; 00101 MeshStreamingStrategy *classptr; 00102 00103 ComlibPrintf ("[%d] column_handler() invoked.\n", CkMyPe()); 00104 00105 my_pe = CkMyPe (); 00106 00107 //PUP_cmialloc mem lets us use the converse reference counting 00108 //black magic in a transparent way. PUP_fromCmiAllocMem lets sub 00109 //messages in a messages be used freely in the program as messages. 00110 PUP_fromCmiAllocMem fp(msg); 00111 MeshStreamingHeader mhdr; 00112 00113 //Read the header from the message 00114 fp | mhdr; 00115 00116 //strategy_id = ((int *) (msg + CmiMsgHeaderSizeBytes))[0]; 00117 //num_msgs = ((int *) (msg + CmiMsgHeaderSizeBytes))[1]; 00118 00119 classptr = (MeshStreamingStrategy *)ConvComlibGetStrategy(mhdr.strategy_id); 00120 // CProxy_ComlibManager (CkpvAccess (cmgrID)). 00121 // ckLocalBranch()->getStrategy (mhdr.strategy_id); 00122 00123 row_length = classptr->GetRowLength (); 00124 00125 //msgptr = (char *) (msg + CmiMsgHeaderSizeBytes + 2 * sizeof(int)); 00126 00127 for (int i = 0; i < mhdr.num_msgs; i++) { 00128 /* 00129 dest_pe = ((int *) msgptr)[0]; 00130 msgsize = ((int *) msgptr)[1]; 00131 00132 newmsg = (char *) CmiAlloc (msgsize); 00133 00134 memcpy (newmsg, (msgptr + 3 * sizeof(int)), msgsize); 00135 00136 if (dest_pe == my_pe) { 00137 CmiSyncSendAndFree (my_pe, msgsize, newmsg); 00138 } else { 00139 dest_row = dest_pe / row_length; 00140 classptr->InsertIntoRowBucket (dest_row, newmsg); 00141 } 00142 00143 msgptr += msgsize + 3 * sizeof(int); 00144 */ 00145 00146 int dest_pe; 00147 fp | dest_pe; 00148 00149 //Returns a part of a message as an independent message and 00150 //updates the reference count of the container message. 00151 fp.pupCmiAllocBuf((void **)&newmsg); 00152 int msgsize = SIZEFIELD(newmsg);// ((envelope*)newmsg)->getTotalsize(); 00153 00154 if (dest_pe == my_pe) { 00155 CmiSyncSendAndFree (my_pe, msgsize, newmsg); 00156 } else { 00157 dest_row = dest_pe / row_length; 00158 classptr->InsertIntoRowBucket (dest_row, newmsg); 00159 } 00160 } 00161 00162 CmiFree (msg); 00163 } 00164 00165 00166 00167 /************************************************************************** 00168 ** This is the MeshStreamingStrategy constructor. 00169 ** 00170 ** The period and bucket_size have default values specified in the .h file. 00171 ** 00172 ** The constructor is invoked when the client code instantiates this 00173 ** strategy. The constructor executes on a SINGLE PROCESS in the 00174 ** computation, so it cannot do things like determine an individual 00175 ** process's position within the mesh. 00176 ** 00177 ** After the constructor is invoked, the communications library creates 00178 ** instances that get pup'ed and shipped to each processor in the 00179 ** computation. To that end, the process that instantiates this strategy 00180 ** (most likely PE 0) will then use pup to pack copies of the strategy 00181 ** and then ship them off to other processes. They will be un-pup'ed 00182 ** there. Finally, beginProcessing() will be called on EACH instance on 00183 ** its target processor. 00184 */ 00185 MeshStreamingStrategy::MeshStreamingStrategy (int period, int bucket_size) 00186 : Strategy() 00187 { 00188 ComlibPrintf ("[%d] MeshStreamingStrategy::MeshStreamingStrategy() invoked.\n", CkMyPe()); 00189 00190 num_pe = CkNumPes (); 00191 00192 num_columns = (int) (ceil (sqrt ((double) num_pe))); 00193 num_rows = num_columns; 00194 row_length = num_columns; 00195 00196 flush_period = period; 00197 max_bucket_size = bucket_size; 00198 00199 column_bucket = new CkQ<char *>[num_columns]; 00200 column_destQ = new CkQ<int>[num_columns]; 00201 column_bytes = new int[num_columns]; 00202 row_bucket = new CkQ<char *>[num_rows]; 00203 00204 //shortMsgPackingFlag = CmiFalse; 00205 } 00206 00207 00208 00209 /************************************************************************** 00210 ** This method is called when the communications library sends a message 00211 ** from one PE to another PE. This could be due to a direct message being 00212 ** sent, or due to a method invocation with marshalled parameters. 00213 ** 00214 ** The method begins by getting the destination PE from the 00215 ** CharmMessageHolder that is passed in (and from this, computing the 00216 ** destination column) and getting a pointer to the User data for the 00217 ** message (and from this, computing the Envelope pointer and the Block 00218 ** pointer). The following diagram shows the layout of the message. 00219 ** 00220 ** +----------------------------------------------------+ 00221 ** | size | refcount || Converse || user | 00222 ** | | || header || data | 00223 ** +----------------------------------------------------+ 00224 ** ^ ^ ^ 00225 ** | | | 00226 ** blk (Block) msg usr (User) 00227 ** 00228 ** All Converse messages are allocated by CmiAlloc() which prepends two ints to 00229 ** all memory regions to hold a size field and a refcount field. BLKSTART() is a 00230 ** macro that gets the start of a block from the envelope pointer. 00231 ** 00232 ** If the destination PE is our current PE, we just deliver the message 00233 ** immediately. 00234 ** 00235 ** Otherwise, if the destination PE is in the same column as our PE, we 00236 ** allocate a new region of memory with CmiAlloc() and copy from the 00237 ** envelope pointer into the new region, and then deposit this new message 00238 ** into the appropriate row bucket for our column. (The row buckets are 00239 ** queues of pointers to memory regions exactly like the diagram above. 00240 ** All entries in the row bucket are allocated with CmiAlloc() and must 00241 ** be deallocated with CmiFree()!) 00242 ** 00243 ** Otherwise, the destination PE must be in a different column from our 00244 ** PE. We allocate a new region of memory with "new" that looks like 00245 ** the diagram below. 00246 ** 00247 ** +------------------------------------------------------------+ 00248 ** | dest || size | refcount || Converse || user | 00249 ** | PE || | || header || data | 00250 ** +------------------------------------------------------------+ 00251 ** ^ ^ ^ ^ 00252 ** | | | | 00253 ** newmsg blk (Block) msg usr (User) 00254 ** 00255 ** We then deposit this new message into the appropriate column bucket. 00256 ** (The column buckets are queues of pointers that are allocated with 00257 ** "new" and must be deallocated with "delete"!) 00258 */ 00259 00260 void MeshStreamingStrategy::insertMessage (MessageHolder *cmsg) 00261 { 00262 int dest_pe; 00263 int dest_row; 00264 int dest_col; 00265 int msg_size; 00266 int total_size; 00267 char *msg; 00268 //char *env; 00269 //char *blk; 00270 //char *newmsg; 00271 00272 ComlibPrintf ("[%d] MeshStreamingStrategy::insertMessage() invoked.\n", 00273 CkMyPe()); 00274 00275 dest_pe = cmsg->dest_proc; 00276 dest_col = dest_pe % num_columns; 00277 msg = cmsg->getMessage (); 00278 //env = (char *) UsrToEnv (usr); 00279 00280 //blk = (char *) BLKSTART (env); 00281 msg_size = SIZEFIELD(msg);//((envelope *)env)->getTotalsize(); 00282 00283 //misc_size = (env - blk); 00284 total_size = sizeof (int) + sizeof(CmiChunkHeader) + msg_size; 00285 00286 if (dest_pe == my_pe) { 00287 CmiSyncSend (my_pe, msg_size, msg); 00288 } else if (dest_col == my_column) { 00289 //newmsg = (char *) CmiAlloc (env_size); 00290 //memcpy (newmsg, env, env_size); 00291 //newmsg = env; 00292 00293 dest_row = dest_pe / row_length; 00294 00295 InsertIntoRowBucket (dest_row, msg); 00296 } else { 00297 //newmsg = new char[total_size]; 00298 //((int *) newmsg)[0] = dest_pe; 00299 //memcpy ( (void *) &(((int *) newmsg)[1]), blk, misc_size + env_size); 00300 00301 column_bucket[dest_col].enq (msg); 00302 column_destQ[dest_col].enq(dest_pe); 00303 column_bytes[dest_col] += total_size; 00304 00305 if (column_bucket[dest_col].length() > max_bucket_size) { 00306 FlushColumn (dest_col); 00307 } 00308 } 00309 00310 delete cmsg; 00311 } 00312 00313 00314 00315 /************************************************************************** 00316 ** This method is not used for streaming strategies. 00317 */ 00318 void MeshStreamingStrategy::doneInserting () 00319 { 00320 ComlibPrintf ("[%d] MeshStreamingStrategy::doneInserting() invoked.\n", CkMyPe()); 00321 // Empty for this strategy. 00322 00323 //FlushBuffers(); 00324 //Only want to flush local outgoing messages 00325 for (int column = 0; column < num_columns; column++) { 00326 FlushColumn ((column+my_column)%num_columns); 00327 } 00328 } 00329 00330 00331 /* ************************************************************************* 00332 ** This method is invoked prior to any processing taking place in the 00333 ** class. Various initializations take place here that cannot take place 00334 ** in the class constructor due to the communications library itself not 00335 ** being totally initialized. 00336 ** 00337 ** See MeshStreamingStrategy::MeshStreamingStrategy() for more details. 00338 */ 00339 /* 00340 void MeshStreamingStrategy::beginProcessing (int ignored) { 00341 ComlibPrintf ("[%d] MeshStreamingStrategy::beginProcessing() invoked.\n", CkMyPe()); 00342 00343 //strategy_id = myInstanceID; 00344 00345 my_pe = CkMyPe (); 00346 00347 my_column = my_pe % num_columns; 00348 my_row = my_pe / row_length; 00349 00350 //column_bucket = new CkQ<char *>[num_columns]; 00351 //column_bytes = new int[num_columns]; 00352 00353 for (int i = 0; i < num_columns; i++) { 00354 column_bytes[i] = 0; 00355 } 00356 00357 row_bucket = new CkQ<char *>[num_rows]; 00358 00359 column_handler_id = CkRegisterHandler ((CmiHandler) column_handler); 00360 00361 CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler, 00362 (void *) this, CkMyPe()); 00363 RegisterPeriodicFlush (); 00364 } 00365 */ 00366 00367 00368 /************************************************************************** 00369 ** This method exists so periodic_flush_handler() can re-register itself to 00370 ** be invoked periodically to flush buffers. 00371 */ 00372 void MeshStreamingStrategy::RegisterPeriodicFlush (void) 00373 { 00374 ComlibPrintf ("[%d] MeshStreamingStrategy::RegisterPeriodicFlush() invoked.\n", CkMyPe()); 00375 00376 CcdCallFnAfterOnPE(periodic_flush_handler, (void *) this, flush_period, CkMyPe()); 00377 } 00378 00379 00380 00381 /************************************************************************** 00382 ** This method is used to flush a specified column bucket, either as the 00383 ** result of the column bucket reaching its maximum capacity, as a result 00384 ** of the periodic flush handler being invoked, or as a result of the 00385 ** processor going idle. 00386 ** 00387 ** The method first finds the destination PE for the column. This is the 00388 ** PE in the target column that is within the same row as the current PE. 00389 ** 00390 ** If there are actually messages in the bucket, then space is allocated 00391 ** to hold the new message which will pack all of the messages in the 00392 ** column bucket together. The layout of this message is shown below: 00393 ** 00394 ** \ / 00395 ** +------------||-------------------------------------------+ 00396 ** |Conv| I | # || dest || size | ref || Converse || user | ... 00397 ** |hdr | D | || PE || | cnt || header || data | ... 00398 ** +------------||-------------------------------------------+ 00399 ** / \ 00400 ** 00401 ** Since the buffer represents a Converse message, it must begin with a 00402 ** Converse header. After the header is an int representing the Commlib 00403 ** strategy ID for this strategy. This is needed only so that the 00404 ** column_handler() can get a pointer to the MeshStreamingStrategy class 00405 ** later. Next, comes an int containing the number of messages within 00406 ** the packed message. Finally, the messages are removed from the column 00407 ** bucket and appended one after another into the buffer. 00408 ** 00409 ** After packing, a handler is set on the message to cause it to invoke 00410 ** column_handler() on the destination PE and the message is finally 00411 ** sent with CmiSyncSendAndFree(). 00412 ** 00413 ** The buffer that is allocated in this message is used as a Converse 00414 ** message, so it is allocated with CmiAlloc() so the send routine can 00415 ** properly free it with CmiFree(). Therefore it has two ints for size 00416 ** and ref count at the beginning of the buffer. These are not shown in 00417 ** the diagram above since they are basically irrelevant to this software. 00418 */ 00419 00420 void MeshStreamingStrategy::FlushColumn (int column) 00421 { 00422 int dest_column_pe; 00423 int num_msgs; 00424 int newmsgsize; 00425 char *newmsg; 00426 00427 CmiAssert (column < num_columns); 00428 00429 dest_column_pe = column + (my_row * row_length); 00430 if (dest_column_pe >= num_pe) { 00431 // This means that there is a hole in the mesh. 00432 //dest_column_pe = column + ((my_row % (num_rows - 1) - 1) * row_length); 00433 int new_row = my_column % (my_row + 1); 00434 if(new_row >= my_row) 00435 new_row = 0; 00436 00437 dest_column_pe = column + new_row * row_length; 00438 } 00439 00440 num_msgs = column_bucket[column].length (); 00441 00442 if(num_msgs == 0) 00443 return; 00444 00445 ComlibPrintf ("[%d] MeshStreamingStrategy::FlushColumn() invoked. to %d\n", 00446 CkMyPe(), dest_column_pe); 00447 00448 PUP_cmiAllocSizer sp; 00449 int i = 0; 00450 MeshStreamingHeader mhdr; 00451 00452 mhdr.strategy_id = getInstance(); 00453 mhdr.num_msgs = num_msgs; 00454 sp | mhdr; 00455 00456 for (i = 0; i < num_msgs; i++) { 00457 void *msg = column_bucket[column][i]; 00458 int size = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize(); 00459 00460 int destpe = column_destQ[column][i]; 00461 sp | destpe; 00462 sp.pupCmiAllocBuf((void **)&msg, size); 00463 } 00464 00465 newmsgsize = sp.size(); 00466 newmsg = (char *) CmiAlloc (newmsgsize); 00467 00468 //((int *) (newmsg + CmiMsgHeaderSizeBytes))[0] = strategy_id; 00469 //((int *) (newmsg + CmiMsgHeaderSizeBytes))[1] = num_msgs; 00470 00471 PUP_toCmiAllocMem mp(newmsg); 00472 //make a structure header 00473 mp | mhdr; 00474 00475 /* 00476 newmsgptr = (char *) (newmsg + CmiMsgHeaderSizeBytes + 2 * sizeof (int)); 00477 for (int i = 0; i < num_msgs; i++) { 00478 msgptr = column_bucket[column].deq (); 00479 msgsize = ((int *) msgptr)[1] + (3 * sizeof (int)); 00480 memcpy (newmsgptr, msgptr, msgsize); 00481 00482 newmsgptr += msgsize; 00483 00484 delete [] msgptr; 00485 } 00486 */ 00487 00488 for (i = 0; i < num_msgs; i++) { 00489 void *msg = column_bucket[column][i]; 00490 int destpe = column_destQ[column][i]; 00491 int size = SIZEFIELD(msg);//((envelope*)msg)->getTotalsize(); 00492 00493 mp | destpe; 00494 mp.pupCmiAllocBuf((void **)&msg, size); 00495 } 00496 00497 for (i = 0; i < num_msgs; i++) { 00498 void *msg = column_bucket[column].deq(); 00499 CmiFree(msg); 00500 00501 column_destQ[column].deq(); 00502 } 00503 00504 column_bytes[column] = 0; 00505 CmiSetHandler (newmsg, CkpvAccess(streaming_column_handler_id)); 00506 CmiSyncSendAndFree (dest_column_pe, newmsgsize, newmsg); 00507 } 00508 00509 00510 /************************************************************************** 00511 ** This method is used to flush a specified row bucket, either as the 00512 ** result of the row bucket reaching its maximum capacity, as a result 00513 ** of the periodic flush handler being invoked, or as a result of the 00514 ** processor going idle. 00515 ** 00516 ** The method first finds the destination PE for the row. The method then 00517 ** iterates through the messages in the row bucket and constructs an array 00518 ** for sizes[] of the message sizes and an array for msgComps[] of 00519 ** pointers to the messages in the row bucket. The method also sets the 00520 ** handler for each message to be "RecvmsgHandle" which is the handler 00521 ** for multi-message sends. Finally, the method calls CmiMultiSend() to 00522 ** send all messages to the destination PE in one go. 00523 ** 00524 ** After the row bucket is emptied, the method calls CmiFree() to 00525 ** deallocate space for the individual messages. Since each message was 00526 ** allocated via CmiAlloc() this is appropriate. 00527 ** 00528 ** Each message in the row bucket has the layout shown in the diagram 00529 ** below. 00530 ** 00531 ** +----------------------------------------------------+ 00532 ** | size | refcount || Converse || user | 00533 ** | | || header || data | 00534 ** +----------------------------------------------------+ 00535 ** ^ 00536 ** | 00537 ** msg 00538 ** 00539 */ 00540 00541 void MeshStreamingStrategy::FlushRow (int row) 00542 { 00543 int dest_pe; 00544 int num_msgs; 00545 int *sizes; 00546 char *msg; 00547 char **msgComps; 00548 int i; 00549 00550 ComlibPrintf ("[%d] MeshStreamingStrategy::FlushRow() invoked.\n", 00551 CkMyPe()); 00552 00553 CmiAssert (row < num_rows); 00554 00555 dest_pe = my_column + (row * row_length); 00556 00557 num_msgs = row_bucket[row].length (); 00558 if (num_msgs > 0) { 00559 00560 //Strip charm++ envelopes from messages 00561 /* 00562 if(shortMsgPackingFlag) { 00563 MsgPacker mpack(row_bucket[row], num_msgs); 00564 CombinedMessage *msg; 00565 int size; 00566 mpack.getMessage(msg, size); 00567 00568 CmiSyncSendAndFree(dest_pe, size, (char *)msg); 00569 return; 00570 } 00571 */ 00572 //Send messages without short message packing 00573 sizes = new int[num_msgs]; 00574 msgComps = new char *[num_msgs]; 00575 00576 for (i = 0; i < num_msgs; i++) { 00577 msg = row_bucket[row].deq (); 00578 //CmiSetHandler (msg, CkpvAccess(RecvmsgHandle)); 00579 sizes[i] = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize(); 00580 msgComps[i] = msg; 00581 } 00582 00583 CmiMultipleSend (dest_pe, num_msgs, sizes, msgComps); 00584 00585 for (i = 0; i < num_msgs; i++) { 00586 CmiFree (msgComps[i]); 00587 } 00588 00589 delete [] sizes; 00590 delete [] msgComps; 00591 } 00592 } 00593 00594 00595 00596 /************************************************************************** 00597 ** This method exists so various handlers can easily trigger all column 00598 ** buckets and row buckets to flush. 00599 */ 00600 void MeshStreamingStrategy::FlushBuffers (void) 00601 { 00602 ComlibPrintf ("[%d] MeshStreamingStrategy::PeriodicFlush() invoked.\n", 00603 CkMyPe()); 00604 00605 for (int column = 0; column < num_columns; column++) { 00606 FlushColumn ((column+my_column)%num_columns); 00607 } 00608 00609 for (int row = 0; row < num_rows; row++) { 00610 FlushRow ((row+my_row)%num_rows); 00611 } 00612 } 00613 00614 00615 00616 /************************************************************************** 00617 ** This method exists primarily so column_handler() can insert messages 00618 ** into a specified row bucket. 00619 */ 00620 void MeshStreamingStrategy::InsertIntoRowBucket (int row, char *msg) 00621 { 00622 ComlibPrintf ("[%d] MeshStreamingStrategy::InsertIntoRowBucket() invoked.\n", CkMyPe()); 00623 00624 CmiAssert (row < num_rows); 00625 00626 row_bucket[row].enq (msg); 00627 if (row_bucket[row].length() > max_bucket_size) { 00628 FlushRow (row); 00629 } 00630 } 00631 00632 00633 00634 /************************************************************************** 00635 ** This method exists only so column_handler() can get the length of a row 00636 ** in the mesh. Since it is outside of the MeshStreamingStrategy class, it 00637 ** does not have direct access to the class variables. 00638 */ 00639 int MeshStreamingStrategy::GetRowLength (void) 00640 { 00641 ComlibPrintf ("[%d] MeshStreamingStrategy::GetRowLength() invoked.\n", CkMyPe()); 00642 00643 return (row_length); 00644 } 00645 00646 00647 00648 /************************************************************************** 00649 ** This is a very complicated pack/unpack method. 00650 ** 00651 ** This method must handle the column_bucket[] and row_bucket[] data 00652 ** structures. These are arrays of queues of (char *). To pack these, 00653 ** we must iterate through the data structures and pack the sizes of 00654 ** each message (char *) pointed to by each queue entry. 00655 */ 00656 void MeshStreamingStrategy::pup (PUP::er &p) 00657 { 00658 00659 ComlibPrintf ("[%d] MeshStreamingStrategy::pup() invoked.\n", CkMyPe()); 00660 00661 // Call the superclass method -- easy. 00662 Strategy::pup (p); 00663 00664 // Pup the instance variables -- easy. 00665 p | num_pe; 00666 p | num_columns; 00667 p | num_rows; 00668 p | row_length; 00669 00670 //p | my_pe; 00671 //p | my_column; 00672 //p | my_row; 00673 00674 p | max_bucket_size; 00675 p | flush_period; 00676 //p | strategy_id; 00677 //p | column_handler_id; 00678 00679 //p | shortMsgPackingFlag; 00680 00681 // Handle the column_bucket[] data structure. 00682 // For each element in column_bucket[], pup the length of the queue 00683 // at that element followed by the contents of that queue. For each 00684 // queue, pup the size of the message pointed to by the (char *) 00685 // entry, followed by the memory for the (char *) entry. 00686 if (p.isUnpacking ()) { 00687 column_bucket = new CkQ<char *>[num_columns]; 00688 column_destQ = new CkQ<int>[num_columns]; 00689 } 00690 00691 /*In correct code, will only be useful for checkpointing though 00692 for (i = 0; i < num_columns; i++) { 00693 int length = column_bucket[i].length (); 00694 00695 p | length; 00696 00697 for (int j = 0; j < length; j++) { 00698 char *msg = column_bucket[i].deq (); 00699 int size = sizeof (int) + ((int *) msg)[1]; 00700 p | size; 00701 p(msg, size); 00702 } 00703 } 00704 */ 00705 00706 // Handle the column_bytes[] data structure. 00707 // This is a straightforward packing of an int array. 00708 if (p.isUnpacking ()) { 00709 column_bytes = new int[num_columns]; 00710 } 00711 00712 p(column_bytes, num_columns); 00713 00714 // Handle the row_bucket[] data structure. 00715 // This works exactly like the column_bucket[] above. 00716 if (p.isUnpacking ()) { 00717 row_bucket = new CkQ<char *>[num_rows]; 00718 } 00719 00720 /* In correct code, will only be useful for checkpointing though 00721 for (i = 0; i < num_rows; i++) { 00722 int length = row_bucket[i].length (); 00723 00724 p | length; 00725 00726 for (int j = 0; j < length; j++) { 00727 char *msg = row_bucket[i].deq (); 00728 int size = ((int *) msg)[0]; 00729 p | size; 00730 p(msg, size); 00731 } 00732 } 00733 */ 00734 00735 my_pe = CkMyPe (); 00736 00737 my_column = my_pe % num_columns; 00738 my_row = my_pe / row_length; 00739 00740 //column_bucket = new CkQ<char *>[num_columns]; 00741 //column_bytes = new int[num_columns]; 00742 00743 for (int i = 0; i < num_columns; i++) { 00744 column_bytes[i] = 0; 00745 } 00746 00747 // packing called once on processor 0, unpacking called once on all processors except 0 00748 if (p.isPacking() || p.isUnpacking()) { 00749 //column_handler_id = CkRegisterHandler ((CmiHandler) column_handler); 00750 00751 CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler, 00752 (void *) this, CkMyPe()); 00753 RegisterPeriodicFlush (); 00754 } 00755 } 00756 00757 PUPable_def(MeshStreamingStrategy) 00758 00759
1.5.5