00001
00007 #include "RingMulticastStrategy.h"
00008
00009
00010 void RingMulticastStrategy::createObjectOnSrcPe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *pelist) {
00011
00012 int next_pe = CkNumPes();
00013 int acount = 0;
00014 int min_dest = CkNumPes();
00015
00016
00017
00018
00019 for(acount = 0; acount < npes; acount++){
00020
00021 int p = pelist[acount].pe;
00022
00023
00024 if(p < min_dest) min_dest = p;
00025
00026
00027
00028 if(p > CkMyPe() && next_pe > p) next_pe = p;
00029
00030 }
00031
00032
00033 if(next_pe == CkNumPes() && min_dest != CkMyPe()) next_pe = min_dest;
00034
00035 if(next_pe == CkNumPes()) next_pe = -1;
00036
00037 if(next_pe != -1) {
00038 obj->pelist = new int[1];
00039 obj->npes = 1;
00040 obj->pelist[0] = next_pe;
00041 }
00042 else {
00043 obj->pelist = NULL;
00044 obj->npes = 0;
00045 }
00046 }
00047
00048 void RingMulticastStrategy::createObjectOnIntermediatePe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *counts, int src_pe) {
00049
00050 obj->pelist = new int[1];
00051 obj->npes = 1;
00052 obj->pelist[0] = CkMyPe();
00053
00054
00055 for (int i=0; i<npes; ++i) {
00056 if (obj->pelist[0] > CkMyPe()) {
00057
00058 if (counts[i].pe > CkMyPe() && counts[i].pe < obj->pelist[0])
00059 obj->pelist[0] = counts[i].pe;
00060 } else {
00061
00062 if (counts[i].pe < obj->pelist[0] || counts[i].pe > CkMyPe())
00063 obj->pelist[0] = counts[i].pe;
00064 }
00065 }
00066
00067
00068 if(obj->npes > 0 && isEndOfRing(*obj->pelist, src_pe)) {
00069 delete [] obj->pelist;
00070 obj->pelist = NULL;
00071 obj->npes = 0;
00072 }
00073 }
00074
00075
00076
00077
00078
00079
00080 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
00081
00082 if(next_pe < 0)
00083 return 1;
00084
00085 ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
00086
00087 if(next_pe > CkMyPe()){
00088 if(src_pe <= next_pe && src_pe > CkMyPe())
00089 return 1;
00090
00091 return 0;
00092 }
00093
00094 if(src_pe > CkMyPe() || src_pe <= next_pe)
00095 return 1;
00096
00097 return 0;
00098 }
00099