00001
00008 #ifndef PIPE_BROADCAST_CONVERSE
00009 #define PIPE_BROADCAST_CONVERSE
00010 #include "ckhashtable.h"
00011
00012
00013 #include "convcomlibmanager.h"
00014
00015 #define DEFAULT_PIPE 8196
00016
00017 CkpvExtern(int, pipeline_handler);
00018 extern void PipelineHandler(void *msg);
00019 CkpvExtern(int, pipeline_frag_handler);
00020 extern void PipelineFragmentHandler(void *msg);
00021
00026 struct PipeBcastInfo {
00027 short bcastPe;
00028 short seqNumber;
00029 int chunkSize;
00030 int chunkNumber;
00031 int messageSize;
00032 short srcPe;
00033 };
00034
00040 class PipeBcastHashKey {
00041 public:
00042
00043 int srcPe;
00044 int seq;
00045 PipeBcastHashKey(int _pe, int _seq):srcPe(_pe), seq(_seq){};
00046
00047
00048
00049 CkHashCode hash(void) const;
00050 static CkHashCode staticHash(const void *a,size_t);
00051 int compare(const PipeBcastHashKey &ind) const;
00052 static int staticCompare(const void *a,const void *b,size_t);
00053 };
00054
00056 inline CkHashCode PipeBcastHashKey::hash(void) const {
00057 register int _seq = seq;
00058 register int _pe = srcPe;
00059
00060 register CkHashCode ret = (_seq << 16) + _pe;
00061 return ret;
00062 }
00063
00064 inline int PipeBcastHashKey::compare(const PipeBcastHashKey &k2) const {
00065 if(seq == k2.seq && srcPe == k2.srcPe)
00066 return 1;
00067
00068 return 0;
00069 }
00070
00074 class PipeBcastHashObj {
00075 public:
00076 char *message;
00077 int dimension;
00078 int remaining;
00079 PipeBcastHashObj (int dim, int rem, char *msg) :dimension(dim),remaining(rem),message(msg) {};
00080
00081 };
00082
00083
00084
00085
00094 class PipeBroadcastConverse : public Strategy {
00095 protected:
00096
00097 int pipeSize;
00098 short topology;
00099
00100 CmiUInt2 seqNumber;
00101
00102
00104 CkHashtableT<PipeBcastHashKey, PipeBcastHashObj *> fragments;
00105
00106
00107
00108
00109
00110
00112 virtual CmiFragmentHeader *getFragmentHeader(char *msg);
00113
00114 public:
00115 PipeBroadcastConverse(short top=USE_HYPERCUBE, int size=DEFAULT_PIPE);
00116 PipeBroadcastConverse(CkMigrateMessage *m): Strategy(m) {};
00117 int getPipeSize() { return pipeSize; };
00118
00119
00120
00121 virtual void deliver(char *msg, int dim);
00122
00123 void handleMessage(void*) {
00124 CmiAbort("PipeBroadcastConverse::handleMessage, this should never be used!\n");
00125 }
00126
00129 void store(char *msg);
00130
00132 void propagate(char *msg, int isFrag);
00133
00134
00135 void insertMessage(MessageHolder *msg);
00136
00137
00138 virtual void pup(PUP::er &p);
00139 PUPable_decl(PipeBroadcastConverse);
00140 };
00141
00142 #endif
00143