00001
00012 #ifndef MESH_STREAMING_STRATEGY
00013 #define MESH_STREAMING_STRATEGY
00014
00015 #include <math.h>
00016
00017 #include "convcomlibmanager.h"
00018
00019 #define DEFAULT_FLUSH_PERIOD 10 // milliseconds
00020 #define DEFAULT_MAX_BUCKET_SIZE 1000 // number of messages
00021
00022 CkpvExtern(int, streaming_column_handler_id);
00023 extern void streaming_column_handler(void *msg);
00024
00027 struct MeshStreamingHeader {
00028 char conv_hdr[CmiMsgHeaderSizeBytes];
00029 int strategy_id;
00030 int num_msgs;
00031 };
00032
00033 PUPbytes(MeshStreamingHeader)
00034
00035
00074 class MeshStreamingStrategy : public Strategy
00075 {
00076
00077 public:
00078 MeshStreamingStrategy (int period=DEFAULT_FLUSH_PERIOD,
00079 int bucket_size=DEFAULT_MAX_BUCKET_SIZE);
00080 MeshStreamingStrategy (CkMigrateMessage *m) : Strategy(m){ }
00081
00082 void insertMessage (MessageHolder *msg);
00083 void doneInserting ();
00084
00085 void RegisterPeriodicFlush (void);
00086 void FlushColumn (int column);
00087 void FlushRow (int row);
00088 void FlushBuffers (void);
00089 void InsertIntoRowBucket (int row, char *msg);
00090 int GetRowLength (void);
00091 virtual void pup (PUP::er &p);
00092 PUPable_decl (MeshStreamingStrategy);
00093
00094 virtual void handleMessage(void *msg) {
00095 CmiAbort("[%d] MeshStreamingStrategy::handleMessage should never be called\n");
00096 }
00097
00098
00099
00100
00101
00102 private:
00103
00104 int num_pe;
00105 int num_columns;
00106 int num_rows;
00107 int row_length;
00108
00109 int my_pe;
00110 int my_column;
00111 int my_row;
00112
00113 int flush_period;
00114 int max_bucket_size;
00115
00116
00117
00118
00119
00120 CkQ<char *> *column_bucket;
00121 CkQ<int> *column_destQ;
00122
00123 int *column_bytes;
00124 CkQ<char *> *row_bucket;
00125 };
00126 #endif
00127