00001
00009 #ifndef PIPELINE_CONVERSE
00010 #define PIPELINE_CONVERSE
00011 #include "ckhashtable.h"
00012 #include "charm++.h"
00013 #include "convcomlibstrategy.h"
00014 #include "convcomlibmanager.h"
00015
00016 #define DEFAULT_PIPE 8196
00017
00018 struct PipelineInfo {
00019 short bcastPe;
00020 short seqNumber;
00021 int chunkSize;
00022 int chunkNumber;
00023 int messageSize;
00024 short srcPe;
00025 };
00026
00027 class PipelineHashKey{
00028 public:
00029
00030 int srcPe;
00031 int seq;
00032 PipelineHashKey(int _pe, int _seq):srcPe(_pe), seq(_seq){};
00033
00034
00035
00036 CkHashCode hash(void) const;
00037 static CkHashCode staticHash(const void *a,size_t);
00038 int compare(const PipelineHashKey &ind) const;
00039 static int staticCompare(const void *a,const void *b,size_t);
00040 };
00041
00042
00043 inline CkHashCode PipelineHashKey::hash(void) const
00044 {
00045 register int _seq = seq;
00046 register int _pe = srcPe;
00047
00048 register CkHashCode ret = (_seq << 16) + _pe;
00049 return ret;
00050 }
00051
00052 inline int PipelineHashKey::compare(const PipelineHashKey &k2) const
00053 {
00054 if(seq == k2.seq && srcPe == k2.srcPe)
00055 return 1;
00056
00057 return 0;
00058 }
00059
00060 class PipelineHashObj{
00061 public:
00062 char *message;
00063 int dimension;
00064 int remaining;
00065 PipelineHashObj (int dim, int rem, char *msg) :dimension(dim),remaining(rem),message(msg) {};
00066
00067 };
00068
00069 typedef const UInt constUInt;
00070 typedef void (*setFunction)(char*, constUInt);
00071
00072 class PipelineStrategy : public Strategy {
00073 protected:
00074
00075 int pipeSize;
00076
00077 int seqNumber;
00078 CkQ <MessageHolder*> *messageBuf;
00079 CkHashtableT<PipelineHashKey, PipelineHashObj *> fragments;
00080 int deliverHandle;
00081
00082 public:
00083 PipelineStrategy(int size=DEFAULT_PIPE, Strategy* st=NULL);
00084 PipelineStrategy(CkMigrateMessage *) {};
00085 int getPipeSize() { return pipeSize; };
00086 void commonInit();
00087 void deliverer(char *msg, int dim);
00088 void storing(char *msg);
00089
00090 void conversePipeline(char *env, int size, int destination);
00091 void insertMessage(MessageHolder *msg);
00092 void doneInserting();
00093
00094 virtual void pup(PUP::er &p);
00095 PUPable_decl(PipelineStrategy);
00096 };
00097
00098 #endif
00099