00001
00002
00003 #ifndef MSA_DISTPAGEMGR_H
00004 #define MSA_DISTPAGEMGR_H
00005
00006 #include <charm++.h>
00007 #include <string.h>
00008 #include <list>
00009 #include <stack>
00010 #include <map>
00011 #include <set>
00012 #include <vector>
00013 #include "msa-common.h"
00014
00015
00016
00018 struct MSA_WriteSpan_t {
00019 int start,end;
00020 inline void pup(PUP::er &p) {
00021 p|start; p|end;
00022 }
00023 };
00024
00025 template <class ENTRY, class MERGER,
00026 unsigned int ENTRIES_PER_PAGE>
00027 class MSA_PageT;
00028 #include "msa.decl.h"
00029
00030
00031
00032
00034 class MSA_Listener {
00035 public:
00036 MSA_Listener() {}
00037 virtual ~MSA_Listener();
00039 virtual void add(void) =0;
00041 virtual void signal(unsigned int pageNo) =0;
00042 };
00043
00045 class MSA_Listeners {
00046 CkVec<MSA_Listener *> listeners;
00047 public:
00048 MSA_Listeners();
00049 ~MSA_Listeners();
00050
00052 void add(MSA_Listener *l);
00053
00055 unsigned int size(void) const {return listeners.size();}
00056
00058 void signal(unsigned int pageNo);
00059 };
00060
00061
00063 class MSA_Thread_Listener : public MSA_Listener {
00064 CthThread thread;
00065 int count;
00066 public:
00067 MSA_Thread_Listener() :thread(0), count(0) {}
00068
00070 void add(void);
00071
00073 void suspend(void);
00074
00076 void signal(unsigned int pageNo);
00077 };
00078
00079
00080
00082 template <unsigned int NUM_BITS>
00083 class fixedlength_bitvector {
00084 public:
00086 typedef unsigned long store_t;
00087 enum { store_bits=8*sizeof(store_t) };
00088
00090 enum { len=(NUM_BITS+(store_bits-1))/store_bits };
00091 store_t store[len];
00092
00093 fixedlength_bitvector() {reset();}
00094
00096 void fill(store_t s) {
00097 for (int i=0;i<len;i++) store[i]=s;
00098 }
00099
00100 void reset(void) {fill(0);}
00101
00103 void set(unsigned int i) { store[i/store_bits] |= (1lu<<(i%store_bits)); }
00104
00106 void reset(unsigned int i) { store[i/store_bits] &= ~(1lu<<(i%store_bits)); }
00107
00109 bool test(unsigned int i) { return (store[i/store_bits] & (1lu<<(i%store_bits))); }
00110 };
00111
00114 template <class ENTRY, unsigned int ENTRIES_PER_PAGE>
00115 class MSA_Page_StateT
00116 {
00123 typedef fixedlength_bitvector<ENTRIES_PER_PAGE> writes_t;
00124 writes_t writes;
00125 typedef typename writes_t::store_t writes_store_t;
00126 enum {writes_bits=writes_t::store_bits};
00127
00131 typedef fixedlength_bitvector<writes_t::len> writes2_t;
00132 writes2_t writes2;
00133 typedef typename writes2_t::store_t writes2_store_t;
00134 enum {writes2_bits=writes2_t::store_bits*writes_t::store_bits};
00135
00136 public:
00138 MSA_Page_Fault_t state;
00139
00142 bool locked;
00143
00145 MSA_Listeners readRequests;
00147 MSA_Listeners writeRequests;
00148
00150 bool canPageOut(void) const {
00151 return (!locked) && canDelete();
00152 }
00153
00155 bool canDelete(void) const {
00156 return (readRequests.size()==0) && (writeRequests.size()==0);
00157 }
00158
00159 MSA_Page_StateT()
00160 : writes(), writes2(), state(Uninit_State), locked(false),
00161 readRequests(), writeRequests()
00162 { }
00163
00165 void write(unsigned int i) {
00166 writes.set(i);
00167 writes2.set(i/writes_t::store_bits);
00168 }
00169
00171 void writeClear(void) {
00172 for (int i2=0;i2<writes2_t::len;i2++)
00173 if (writes2.store[i2]) {
00174 int o=i2*writes2_t::store_bits;
00175 for (int i=0;i<writes_t::len;i++)
00176 writes.store[o+i]=0;
00177 writes2.store[i2]=0;
00178 }
00179 }
00180
00182 inline int roundUp(int v,int m) {
00183 return (v+m-1)/m*m;
00184 }
00185
00188 int writeSpans(MSA_WriteSpan_t *span) {
00189 int nSpans=0;
00190
00191 int cur=0;
00192 while (true) {
00193
00194 while (true) {
00195 if (writes2.store[cur/writes2_bits]==(writes2_store_t)0)
00196 cur=roundUp(cur+1,writes2_bits);
00197 else if (writes.store[cur/writes_bits]==(writes_store_t)0)
00198 cur=roundUp(cur+1,writes_bits);
00199 else if (writes.test(cur)==false)
00200 cur++;
00201 else
00202 break;
00203 if (cur>=ENTRIES_PER_PAGE) return nSpans;
00204 }
00205
00206 span[nSpans].start=cur;
00207
00208 while (true) {
00209
00210
00211
00212
00213 if (writes.store[cur/writes_bits]==~(writes_store_t)0)
00214 cur=roundUp(cur+1,writes_bits);
00215 else if (writes.test(cur)==true)
00216 cur++;
00217 else
00218 break;
00219 if (cur>=ENTRIES_PER_PAGE) {
00220 span[nSpans++].end=ENTRIES_PER_PAGE;
00221 return nSpans;
00222 }
00223 }
00224
00225 span[nSpans++].end=cur;
00226 }
00227 }
00228 };
00229
00230
00231
00232
00233
00239 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
00240 class MSA_PageReplacementPolicy
00241 {
00242 public:
00244 virtual void pageAccessed(unsigned int page) = 0;
00245
00247 virtual unsigned int selectPage() = 0;
00248 };
00249
00261 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
00262 class vmLRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
00263 {
00264 protected:
00265 unsigned int nPages;
00266 const std::vector<ENTRY_TYPE *> &pageTable;
00267 typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
00268 const std::vector<pageState_t *> &pageState;
00269 std::list<unsigned int> stackOfPages;
00270 unsigned int lastPageAccessed;
00271
00272 public:
00273 inline vmLRUReplacementPolicy(unsigned int nPages_,
00274 const std::vector<ENTRY_TYPE *> &pageTable_,
00275 const std::vector<pageState_t *> &pageState_)
00276 : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), lastPageAccessed(MSA_INVALID_PAGE_NO) {}
00277
00278 inline void pageAccessed(unsigned int page)
00279 {
00280 if(page != lastPageAccessed)
00281 {
00282 lastPageAccessed = page;
00283
00284
00285 std::list<unsigned int>::iterator i;
00286 for(i = stackOfPages.begin(); i != stackOfPages.end(); i++)
00287 if(*i == page)
00288 i = stackOfPages.erase(i);
00289
00290 stackOfPages.push_back(page);
00291 }
00292 }
00293
00294 inline unsigned int selectPage()
00295 {
00296 if(stackOfPages.size() == 0)
00297 return MSA_INVALID_PAGE_NO;
00298
00299
00300 std::list<unsigned int>::iterator i = stackOfPages.begin();
00301 while(i != stackOfPages.end())
00302 {
00303 if(pageTable[*i] == NULL) i = stackOfPages.erase(i);
00304 else if(!pageState[*i]->canPageOut()) i++;
00305 else break;
00306 }
00307
00308 if(i != stackOfPages.end())
00309 return *i;
00310 else
00311 return MSA_INVALID_PAGE_NO;
00312 }
00313 };
00314
00328 template <class ENTRY_TYPE, unsigned int ENTRIES_PER_PAGE>
00329 class vmNRUReplacementPolicy : public MSA_PageReplacementPolicy <ENTRY_TYPE, ENTRIES_PER_PAGE>
00330 {
00331 protected:
00332 unsigned int nPages;
00333 const std::vector<ENTRY_TYPE *> &pageTable;
00334 typedef MSA_Page_StateT<ENTRY_TYPE, ENTRIES_PER_PAGE> pageState_t;
00335 const std::vector<pageState_t *> &pageState;
00336 enum {K=5};
00337 unsigned int last[K];
00338 unsigned int Klast;
00339
00340 unsigned int victim;
00341
00342 bool recentlyUsed(unsigned int page) {
00343 for (int k=0;k<K;k++) if (page==last[k]) return true;
00344 return false;
00345 }
00346
00347 public:
00348 inline vmNRUReplacementPolicy(unsigned int nPages_,
00349 const std::vector<ENTRY_TYPE *> &pageTable_,
00350 const std::vector<pageState_t *> &pageState_)
00351 : nPages(nPages_), pageTable(pageTable_), pageState(pageState_), Klast(0), victim(0)
00352 {
00353 for (int k=0;k<K;k++) last[k]=MSA_INVALID_PAGE_NO;
00354 }
00355
00356 inline void pageAccessed(unsigned int page)
00357 {
00358 if (page!=last[Klast]) {
00359 Klast++; if (Klast>=K) Klast=0;
00360 last[Klast]=page;
00361 }
00362 }
00363
00364 inline unsigned int selectPage() {
00365 unsigned int last_victim=victim;
00366 do {
00367 victim++; if (victim>=nPages) victim=0;
00368 if (pageTable[victim]
00369 &&pageState[victim]->canPageOut()
00370 &&!recentlyUsed(victim)) {
00371
00372 return victim;
00373 }
00374 } while (victim!=last_victim);
00375 return MSA_INVALID_PAGE_NO;
00376 }
00377 };
00378
00379
00380
00385 template <
00386 class ENTRY,
00387 class MERGER=DefaultEntry<ENTRY>,
00388 unsigned int ENTRIES_PER_PAGE=MSA_DEFAULT_ENTRIES_PER_PAGE
00389 >
00390 class MSA_PageT {
00391 unsigned int n;
00393 ENTRY *data;
00395 MERGER m;
00396 bool duplicate;
00397
00398 public:
00399
00400 MSA_PageT()
00401 : n(ENTRIES_PER_PAGE), data(new ENTRY[ENTRIES_PER_PAGE]), duplicate(false)
00402 {
00403 for (int i=0;i<ENTRIES_PER_PAGE;i++){
00404 data[i]=m.getIdentity();
00405 }
00406 }
00407
00408
00409
00410
00411
00412
00413 MSA_PageT(ENTRY *d):data(d), duplicate(true), n(ENTRIES_PER_PAGE) {
00414 }
00415 MSA_PageT(ENTRY *d, unsigned int n_):data(d), duplicate(true), n(n_) {
00416 }
00417 virtual ~MSA_PageT() {
00418 if (!duplicate) {
00419 delete [] data;
00420 }
00421 }
00422
00423 virtual void pup(PUP::er &p) {
00424 p | n;
00425
00426
00427
00428
00429
00430 bool nulldata = false;
00431 if(!p.isUnpacking()){
00432 nulldata = (data == NULL);
00433 }
00434 p | nulldata;
00435 if(nulldata){
00436 data = NULL;
00437 return;
00438 }
00439 if(p.isUnpacking()){
00440 data = new ENTRY[n];
00441 }
00442 for (int i=0;i<n;i++){
00443 p|data[i];
00444 }
00445 }
00446
00447 virtual void merge(MSA_PageT<ENTRY, MERGER, ENTRIES_PER_PAGE> &otherPage) {
00448 for (int i=0;i<ENTRIES_PER_PAGE;i++)
00449 m.accumulate(data[i],otherPage.data[i]);
00450 }
00451
00452
00453 inline ENTRY &operator[](int i) {return data[i];}
00454 inline const ENTRY &operator[](int i) const {return data[i];}
00455 inline ENTRY *getData() { return data; }
00456 };
00457
00458
00459
00460 template <class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
00461 class MSA_CacheGroup : public CBase_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE>
00462 {
00463 typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
00464
00465 protected:
00466 ENTRY_OPS_CLASS *entryOpsObject;
00467 unsigned int numberOfWorkerThreads;
00468
00469 unsigned int numberLocalWorkerThreads;
00470 unsigned int numberLocalWorkerThreadsActive;
00471 unsigned int enrollDoneq;
00472 MSA_Listeners enrollWaiters;
00473 MSA_Listeners syncWaiters;
00474 std::set<int> enrolledPEs;
00475
00476 unsigned int nPages;
00477 std::vector<ENTRY_TYPE*> pageTable;
00478 typedef MSA_Page_StateT<ENTRY_TYPE,ENTRIES_PER_PAGE> pageState_t;
00479 std::vector<pageState_t *> pageStateStorage;
00480
00481 std::stack<ENTRY_TYPE*> pagePool;
00482
00483 typedef vmNRUReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> vmPageReplacementPolicy;
00484 MSA_PageReplacementPolicy<ENTRY_TYPE, ENTRIES_PER_PAGE> *replacementPolicy;
00485
00486
00487 typedef struct { unsigned int begin; unsigned int end; } writebounds_t;
00488
00489
00490 typedef std::list<writebounds_t> writelist_t;
00491
00492 writelist_t** writes;
00493
00494 unsigned int resident_pages;
00495 unsigned int max_resident_pages;
00496 unsigned int nEntries;
00497 unsigned int syncAckCount;
00498 int outOfBufferInPrefetch;
00499
00500 int syncThreadCount;
00501
00502
00503
00504 MSA_WriteSpan_t writeSpans[ENTRIES_PER_PAGE];
00505 ENTRY_TYPE writeEntries[ENTRIES_PER_PAGE];
00506
00507 typedef CProxy_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_PageArray_t;
00508 CProxy_PageArray_t pageArray;
00509 typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
00510
00511 std::map<CthThread, MSA_Thread_Listener *> threadList;
00512
00513 bool clear;
00514
00516 inline pageState_t *stateN(unsigned int pageNo) {
00517 return pageStateStorage[pageNo];
00518 }
00519
00521 pageState_t *state(unsigned int pageNo)
00522 {
00523 pageState_t *ret=pageStateStorage[pageNo];
00524 if (ret==NULL)
00525 {
00526 ret=new pageState_t;
00527 pageStateStorage[pageNo]=ret;
00528 }
00529 return ret;
00530 }
00531
00533 MSA_Thread_Listener *getListener(void) {
00534 CthThread t=CthSelf();
00535 MSA_Thread_Listener *l=threadList[t];
00536 if (l==NULL) {
00537 l=new MSA_Thread_Listener;
00538 threadList[t]=l;
00539 }
00540 return l;
00541 }
00543 void addAndSuspend(MSA_Listeners &dest) {
00544 MSA_Thread_Listener *l=getListener();
00545 dest.add(l);
00546 l->suspend();
00547 }
00548
00549
00552
00553
00554 inline void IncrementPagesWaiting(unsigned int page)
00555 {
00556 state(page)->readRequests.add(getListener());
00557 }
00558
00559 inline void IncrementChangesWaiting(unsigned int page)
00560 {
00561 state(page)->writeRequests.add(getListener());
00562 }
00563
00564
00565
00568 inline ENTRY_TYPE* tryBuffer(int async=0)
00569 {
00570 ENTRY_TYPE* nu = NULL;
00571
00572
00573 if(!pagePool.empty())
00574 {
00575 nu = pagePool.top();
00576 CkAssert(nu != NULL);
00577 pagePool.pop();
00578 }
00579
00580
00581 if(nu == NULL && resident_pages < max_resident_pages)
00582 {
00583 nu = new ENTRY_TYPE[ENTRIES_PER_PAGE];
00584 resident_pages++;
00585 }
00586
00587
00588 if(nu == NULL)
00589 {
00590 int pageToSwap = replacementPolicy->selectPage();
00591 if(pageToSwap != MSA_INVALID_PAGE_NO)
00592 {
00593 CkAssert(pageTable[pageToSwap] != NULL);
00594 CkAssert(state(pageToSwap)->canPageOut() == true);
00595
00596 relocatePage(pageToSwap, async);
00597 nu = pageTable[pageToSwap];
00598 pageTable[pageToSwap] = 0;
00599 delete pageStateStorage[pageToSwap];
00600 pageStateStorage[pageToSwap]=0;
00601 }
00602 }
00603
00604
00605
00606 return nu;
00607 }
00608
00611 inline ENTRY_TYPE* makePage(unsigned int page)
00612 {
00613 ENTRY_TYPE* nu=pageTable[page];
00614 if (nu==0) {
00615 nu=tryBuffer();
00616 if (nu==0) CkAbort("MSA: No available space to create pages.\n");
00617 pageTable[page]=nu;
00618 }
00619 return nu;
00620 }
00621
00624 ENTRY_TYPE* destroyPage(unsigned int page)
00625 {
00626 ENTRY_TYPE* nu=pageTable[page];
00627 pageTable[page] = 0;
00628 if (pageStateStorage[page]->canDelete()) {
00629 delete pageStateStorage[page];
00630 pageStateStorage[page]=0;
00631 }
00632 resident_pages--;
00633 return nu;
00634 }
00635
00636
00637 void pageFault(unsigned int page, MSA_Page_Fault_t why)
00638 {
00639
00640 state(page)->state = why;
00641 if(why == Read_Fault)
00642 {
00643
00644 if (stateN(page)->readRequests.size()==0) {
00645 pageArray[page].GetPage(CkMyPe());
00646
00647 } else {
00648 ;
00649 }
00650 MSA_Thread_Listener *l=getListener();
00651 stateN(page)->readRequests.add(l);
00652 l->suspend();
00653 }
00654 else {
00655
00656 ENTRY_TYPE* nu = makePage(page);
00657 writeIdentity(nu);
00658 }
00659 }
00660
00662
00663 inline void accessPage(unsigned int page,MSA_Page_Fault_t access)
00664 {
00665 if (pageTable[page] == 0) {
00666
00667 pageFault(page, access);
00668 }
00669 #if CMK_ERROR_CHECKING
00670 if (stateN(page)->state!=access) {
00671 CkPrintf("page=%d mode=%d pagestate=%d", page, access, stateN(page)->state);
00672 CkAbort("MSA Runtime error: Attempting to access a page that is still in another mode.");
00673 }
00674 #endif
00675 replacementPolicy->pageAccessed(page);
00676 }
00677
00678
00679
00680
00681 void writeIdentity(ENTRY_TYPE* pagePtr)
00682 {
00683 for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
00684 pagePtr[i] = entryOpsObject->getIdentity();
00685 }
00686
00687
00688 bool shouldWriteback(unsigned int page) {
00689 if (!pageTable[page]) return false;
00690 return (stateN(page)->state == Write_Fault || stateN(page)->state == Accumulate_Fault);
00691 }
00692
00693 inline void relocatePage(unsigned int page, int async)
00694 {
00695
00696 if(shouldWriteback(page))
00697 {
00698
00699
00700 sendChangesToPageArray(page, async);
00701 }
00702 }
00703
00704 inline void sendChangesToPageArray(const unsigned int page, const int async)
00705 {
00706 sendRLEChangesToPageArray(page);
00707
00708 MSA_Thread_Listener *l=getListener();
00709 state(page)->writeRequests.add(l);
00710 if (!async)
00711 l->suspend();
00712
00713 }
00714
00715
00716
00717 inline void sendNonRLEChangesToPageArray(const unsigned int page)
00718 {
00719 pageArray[page].PAReceivePage(pageTable[page], ENTRIES_PER_PAGE, CkMyPe(), stateN(page)->state);
00720 }
00721
00722
00723
00724 inline void sendRLEChangesToPageArray(const unsigned int page)
00725 {
00726 ENTRY_TYPE *writePage=pageTable[page];
00727 int nSpans=stateN(page)->writeSpans(writeSpans);
00728 if (nSpans==1)
00729 {
00730 int nEntries=writeSpans[0].end-writeSpans[0].start;
00731 if (entryOpsObject->pupEveryElement()) {
00732 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
00733 page_t(&writePage[writeSpans[0].start],nEntries),nEntries,
00734 CkMyPe(),stateN(page)->state);
00735 } else {
00736 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
00737 &writePage[writeSpans[0].start], nEntries,
00738 CkMyPe(),stateN(page)->state);
00739 }
00740 }
00741 else
00742 {
00743 int nEntries=0;
00744 for (int s=0;s<nSpans;s++) {
00745 for (int i=writeSpans[s].start;i<writeSpans[s].end;i++)
00746 writeEntries[nEntries++]=writePage[i];
00747 }
00748 if (entryOpsObject->pupEveryElement()) {
00749 pageArray[page].PAReceiveRLEPageWithPup(writeSpans,nSpans,
00750 page_t(writeEntries,nEntries),nEntries,
00751 CkMyPe(),stateN(page)->state);
00752 } else {
00753 pageArray[page].PAReceiveRLEPage(writeSpans,nSpans,
00754 writeEntries,nEntries,
00755 CkMyPe(),stateN(page)->state);
00756 }
00757 }
00758 }
00759
00760
00761 public:
00762
00763
00764
00765 inline MSA_CacheGroup(unsigned int nPages_, CkArrayID pageArrayID,
00766 unsigned int max_bytes_, unsigned int nEntries_,
00767 unsigned int numberOfWorkerThreads_)
00768 : numberOfWorkerThreads(numberOfWorkerThreads_),
00769 nPages(nPages_),
00770 nEntries(nEntries_),
00771 pageTable(nPages, NULL),
00772 pageStateStorage(nPages, NULL),
00773 pageArray(pageArrayID),
00774 max_resident_pages(max_bytes_/(sizeof(ENTRY_TYPE)*ENTRIES_PER_PAGE)),
00775 entryOpsObject(new ENTRY_OPS_CLASS),
00776 replacementPolicy(new vmPageReplacementPolicy(nPages, pageTable, pageStateStorage)),
00777 outOfBufferInPrefetch(0), syncAckCount(0),syncThreadCount(0),
00778 resident_pages(0), numberLocalWorkerThreads(0),
00779 numberLocalWorkerThreadsActive(0), enrollDoneq(0),
00780 clear(false)
00781 {
00782 MSADEBPRINT(printf("MSA_CacheGroup nEntries %d \n",nEntries););
00783 }
00784
00785
00786 inline ~MSA_CacheGroup()
00787 {
00788 FreeMem();
00789 }
00790
00791
00792 inline void changeEntryOpsObject(ENTRY_OPS_CLASS *e) {
00793 entryOpsObject = e;
00794 pageArray.changeEntryOpsObject(e);
00795 }
00796
00797
00798 inline const ENTRY_TYPE* readablePage(unsigned int page)
00799 {
00800 accessPage(page,Read_Fault);
00801
00802 return pageTable[page];
00803 }
00804
00805
00806
00807
00808 inline const void* readablePage2(unsigned int page)
00809 {
00810 return pageTable[page];
00811 }
00812
00813
00814
00815 inline ENTRY_TYPE* writeablePage(unsigned int page, unsigned int offset)
00816 {
00817 accessPage(page,Write_Fault);
00818
00819
00820
00821
00822
00823
00824
00825 stateN(page)->write(offset);
00826
00827
00828 return pageTable[page];
00829 }
00830
00831
00832 inline ENTRY_TYPE &accumulate(unsigned int page, unsigned int offset)
00833 {
00834 accessPage(page,Accumulate_Fault);
00835 stateN(page)->write(offset);
00836 return pageTable[page][offset];
00837 }
00838
00841 inline void ReceivePageWithPUP(unsigned int page, page_t &pageData, int size)
00842 {
00843 ReceivePage(page, pageData.getData(), size);
00844 }
00845
00846 inline void ReceivePage(unsigned int page, ENTRY_TYPE* pageData, int size)
00847 {
00848 CkAssert(0==size || ENTRIES_PER_PAGE == size);
00849
00850 ENTRY_TYPE *nu=makePage(page);
00851 if(size!=0)
00852 {
00853 for(unsigned int i = 0; i < size; i++)
00854 nu[i] = pageData[i];
00855 }
00856 else
00857 {
00858
00859 writeIdentity(nu);
00860 }
00861
00862 state(page)->readRequests.signal(page);
00863 }
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874 inline void AckPage(unsigned int page)
00875 {
00876 state(page)->writeRequests.signal(page);
00877 }
00878
00879
00880
00881 inline void SyncReq(int single, bool clear_)
00882 {
00883 clear = clear || clear_;
00884 MSADEBPRINT(printf("SyncReq single %d\n",single););
00885 if(single)
00886 {
00887
00888
00889
00890 SingleSync();
00891 EmptyCache();
00892
00893 getListener()->suspend();
00894 }
00895 else{
00896 Sync(clear_);
00897 }
00898 }
00899
00900
00901 inline void FlushCache()
00902 {
00903
00904
00905 for(unsigned int i = 0; i < nPages; i++)
00906 {
00907 if(shouldWriteback(i)) {
00908
00909 sendChangesToPageArray(i, 1);
00910 }
00911 }
00912 }
00913
00914
00915 void EmptyCache()
00916 {
00917
00918
00919 for(unsigned int i = 0; i < nPages; i++)
00920 {
00921 if(pageTable[i]) pagePool.push(destroyPage(i));
00922 }
00923 }
00924
00925
00927
00928 inline void enroll(unsigned int num_workers)
00929 {
00930 CkAssert(num_workers == numberOfWorkerThreads);
00931 CkAssert(enrollDoneq == 0);
00932 numberLocalWorkerThreads++;
00933 numberLocalWorkerThreadsActive++;
00934
00935
00936
00937 this->thisProxy[0].enrollAck(CkMyPe());
00938
00939 addAndSuspend(enrollWaiters);
00940
00941
00942 CkAssert(enrollDoneq == 1);
00943 return;
00944 }
00945
00946
00947
00948
00949 void enroll()
00950 {
00951 enroll(numberOfWorkerThreads);
00952 }
00953
00955 inline void enrollAck(int originator)
00956 {
00957 CkAssert(CkMyPe() == 0);
00958 CkAssert(enrollDoneq == 0);
00959
00960 syncAckCount++;
00961 enrolledPEs.insert(originator);
00962
00963 if(syncAckCount == numberOfWorkerThreads) {
00964
00965 syncAckCount = 0;
00966 enrollDoneq = 1;
00967
00968
00969 this->thisProxy.enrollDone();
00970 }
00971 }
00972
00974 inline void enrollDone()
00975 {
00976
00977
00978
00979 enrollDoneq = 1;
00980 enrollWaiters.signal(0);
00981 }
00982
00983
00984
00985 inline void SingleSync()
00986 {
00987
00988
00989 FlushCache();
00990 }
00991
00992 void SyncRelease()
00993 {
00994 numberLocalWorkerThreadsActive--;
00995
00996 syncDebug();
00997
00998 if(syncThreadCount < numberLocalWorkerThreadsActive)
00999 {
01000 return;
01001 }
01002
01003 this->thisProxy[CkMyPe()].FinishSync();
01004 }
01005
01006 void syncDebug()
01007 {
01008 MSADEBPRINT(printf("Sync (Total threads: %d, Active: %d, Synced: %d)\n",
01009 numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
01010 }
01011
01012 void activate()
01013 {
01014 numberLocalWorkerThreadsActive++;
01015
01016 CkAssert(numberLocalWorkerThreadsActive <= numberLocalWorkerThreads);
01017 }
01018
01019 void FinishSync()
01020 {
01021
01022
01023
01024 FlushCache();
01025
01026
01027
01028 EmptyCache();
01029
01030
01031
01032
01033 MSADEBPRINT(printf("Sync calling suspend on getListener\n"););
01034 getListener()->suspend();
01035 MSADEBPRINT(printf("Sync awakening after suspend\n"););
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045 if(CkMyPe() != 0)
01046 {
01047 this->thisProxy[0].SyncAck(clear);
01048 }
01049 else
01050 {
01051 SyncAck(clear);
01052 }
01053 MSADEBPRINT(printf("Sync all local threads done, going to addAndSuspend\n"););
01054
01055 addAndSuspend(syncWaiters);
01056
01057 MSADEBPRINT(printf("Sync all local threads done waking up after addAndSuspend\n"););
01058
01059 }
01060
01061
01062 inline void Sync(bool clear_)
01063 {
01064 syncThreadCount++;
01065
01066
01067
01068
01069 syncDebug();
01070
01071 clear |= clear_;
01072
01073
01074
01075
01076
01077 MSADEBPRINT(printf("Sync (Total threads: %d, Active: %d, Synced: %d)\n",
01078 numberLocalWorkerThreads, numberLocalWorkerThreadsActive, syncThreadCount));
01079 if(syncThreadCount < numberLocalWorkerThreadsActive)
01080 {
01081 MSADEBPRINT(printf("Sync addAndSuspend\n"));
01082 addAndSuspend(syncWaiters);
01083 return;
01084 }
01085
01086 FinishSync();
01087 }
01088
01089 inline unsigned int getNumEntries() { return nEntries; }
01090 inline CProxy_PageArray_t getArray() { return pageArray; }
01091
01092
01093
01094 inline void SyncAck(bool clear_)
01095 {
01096 CkAssert(CkMyPe() == 0);
01097 syncAckCount++;
01098 clear = clear || clear_;
01099
01100
01101
01102 if (syncAckCount == enrolledPEs.size()) {
01103 MSADEBPRINT(printf("SyncAck starting reduction on pageArray of size %d number of pages %d\n",
01104 nEntries, nPages););
01105 pageArray.Sync(clear);
01106 }
01107 }
01108
01109 inline void SyncDone(CkReductionMsg *m)
01110 {
01111 delete m;
01112
01113
01114
01115 syncThreadCount = 0;
01116 syncAckCount = 0;
01117 clear = false;
01118 MSADEBPRINT(printf("SyncDone syncWaiters signal to be called\n"););
01119 syncWaiters.signal(0);
01120 }
01121
01122 inline void FreeMem()
01123 {
01124 for(unsigned int i = 0; i < nPages; i++)
01125 {
01126 if(pageTable[i]) delete [] destroyPage(i);
01127 }
01128
01129 while(!pagePool.empty())
01130 {
01131 delete [] pagePool.top();
01132 pagePool.pop();
01133 }
01134
01135 resident_pages=0;
01136 }
01137
01142 inline void unroll() {
01143 numberLocalWorkerThreads--;
01144 if(numberLocalWorkerThreads == 0){
01145 FreeMem();
01146 }
01147 }
01148
01153 inline void Prefetch(unsigned int pageStart, unsigned int pageEnd)
01154 {
01155
01156
01157
01158 if(!outOfBufferInPrefetch)
01159 {
01160
01161 for(unsigned int p = pageStart; p <= pageEnd; p++)
01162 {
01163 if(NULL == pageTable[p])
01164 {
01165
01166
01167 ENTRY_TYPE* nu = tryBuffer(1);
01168 if(NULL == nu)
01169 {
01170
01171 outOfBufferInPrefetch = 1;
01172 break;
01173 }
01174
01175 pageTable[p] = nu;
01176 state(p)->state = Read_Fault;
01177
01178 pageArray[p].GetPage(CkMyPe());
01179 IncrementPagesWaiting(p);
01180
01181
01182 }
01183
01184
01185 state(p)->locked = true;
01186 }
01187 }
01188 }
01189
01194 inline int WaitAll(void)
01195 {
01196 if(outOfBufferInPrefetch)
01197 {
01198
01199 outOfBufferInPrefetch = 0;
01200 getListener()->suspend();
01201 UnlockPages();
01202 return 1;
01203 }
01204 else
01205 {
01206
01207
01208 outOfBufferInPrefetch = 0;
01209 getListener()->suspend();
01210 return 0;
01211 }
01212 }
01213
01214 inline void UnlockPage(unsigned int page) {
01215 pageState_t *s=stateN(page);
01216 if(s && s->locked) {
01217 replacementPolicy->pageAccessed(page);
01218 s->locked = false;
01219 }
01220 }
01221
01225 inline void UnlockPages()
01226 {
01227
01228 for(unsigned int page = 0; page < nPages; page++)
01229 UnlockPage(page);
01230 }
01231
01236 inline void UnlockPages(unsigned int startPage, unsigned int endPage)
01237 {
01238 for(unsigned int page = startPage; page <= endPage; page++)
01239 UnlockPage(page);
01240 }
01241
01243 inline void emitBufferValue(int ID, unsigned int pageNum, unsigned int offset)
01244 {
01245 CkAssert( pageNum < nPages );
01246 CkAssert( offset < ENTRIES_PER_PAGE );
01247
01248
01249
01250
01251
01252
01253 }
01254 };
01255
01256
01257
01258
01259
01260 template<class ENTRY_TYPE, class ENTRY_OPS_CLASS,unsigned int ENTRIES_PER_PAGE>
01261 class MSA_PageArray : public CBase_MSA_PageArray<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE>
01262 {
01263 typedef CProxy_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> CProxy_CacheGroup_t;
01264 typedef MSA_PageT<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE> page_t;
01265
01266 protected:
01267 ENTRY_TYPE *epage;
01268 ENTRY_OPS_CLASS entryOpsObject;
01269 CProxy_CacheGroup_t cache;
01270
01271 unsigned int pageNo() { return this->thisIndex; }
01272
01273 inline void allocatePage(MSA_Page_Fault_t access)
01274 {
01275 if(epage == NULL)
01276 {
01277 epage = new ENTRY_TYPE[ENTRIES_PER_PAGE];
01278 writeIdentity();
01279 }
01280 }
01281
01282
01283 inline void set(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
01284 {
01285
01286 for(unsigned int i = 0; i < (end - begin); i++) {
01287 epage[begin + i] = buffer[i];
01288
01289 }
01290 }
01291
01292
01293 inline void combine(const ENTRY_TYPE* buffer, unsigned int begin, unsigned int end)
01294 {
01295 ENTRY_TYPE* pagePtr = epage + begin;
01296 for(unsigned int i = 0; i < (end - begin); i++)
01297 entryOpsObject.accumulate(pagePtr[i], buffer[i]);
01298 }
01299
01300
01301 inline void writeIdentity()
01302 {
01303 for(unsigned int i = 0; i < ENTRIES_PER_PAGE; i++)
01304 epage[i] = entryOpsObject.getIdentity();
01305 }
01306
01307 public:
01308 inline MSA_PageArray() : epage(NULL) { }
01309 inline MSA_PageArray(CkMigrateMessage* m) { delete m; }
01310
01311 void setCacheProxy(CProxy_CacheGroup_t &cache_)
01312 {
01313 cache=cache_;
01314 }
01315
01316 void pup(PUP::er& p)
01317 {
01318 int epage_present=(epage!=0);
01319 p|epage_present;
01320 if (epage_present) {
01321 if(p.isUnpacking())
01322 allocatePage(Write_Fault);
01323 for (int i=0;i<ENTRIES_PER_PAGE;i++)
01324 p|epage[i];
01325 }
01326 }
01327
01328 inline ~MSA_PageArray()
01329 {
01330 if(epage) delete [] epage;
01331 }
01332
01335 inline void GetPage(int pe)
01336 {
01337 if(epage == NULL) {
01338
01339 if (entryOpsObject.pupEveryElement())
01340 cache[pe].ReceivePageWithPUP(pageNo(), page_t((ENTRY_TYPE*)NULL), 0);
01341 else
01342 cache[pe].ReceivePage(pageNo(), (ENTRY_TYPE*)NULL, 0);
01343 } else {
01344
01345 if (entryOpsObject.pupEveryElement())
01346 cache[pe].ReceivePageWithPUP(pageNo(), page_t(epage), ENTRIES_PER_PAGE);
01347 else
01348 cache[pe].ReceivePage(pageNo(), epage, ENTRIES_PER_PAGE);
01349 }
01350 }
01351
01353
01354 inline void PAReceivePage(ENTRY_TYPE *pageData,
01355 int pe, MSA_Page_Fault_t pageState)
01356 {
01357 allocatePage(pageState);
01358
01359 if(pageState == Write_Fault)
01360 set(pageData, 0, ENTRIES_PER_PAGE);
01361 else
01362 combine(pageData, 0, ENTRIES_PER_PAGE);
01363
01364
01365
01366 cache[pe].AckPage(this->thisIndex);
01367 }
01368
01370 inline void PAReceiveRLEPageWithPup(
01371 const MSA_WriteSpan_t *spans, unsigned int nSpans,
01372 page_t &entries, unsigned int nEntries,
01373 int pe, MSA_Page_Fault_t pageState)
01374 {
01375 PAReceiveRLEPage(spans, nSpans, entries.getData(), nEntries, pe, pageState);
01376 }
01377
01378
01379 inline void PAReceiveRLEPage(
01380 const MSA_WriteSpan_t *spans, unsigned int nSpans,
01381 const ENTRY_TYPE *entries, unsigned int nEntries,
01382 int pe, MSA_Page_Fault_t pageState)
01383 {
01384 allocatePage(pageState);
01385
01386
01387 int e=0;
01388 for (int s=0;s<nSpans;s++) {
01389 if(pageState == Write_Fault)
01390 set(&entries[e], spans[s].start,spans[s].end);
01391 else
01392 combine(&entries[e], spans[s].start,spans[s].end);
01393 e+=spans[s].end-spans[s].start;
01394 }
01395
01396
01397
01398 cache[pe].AckPage(this->thisIndex);
01399 }
01400
01401
01402 inline void Sync(bool clear)
01403 {
01404 if (clear && epage)
01405 writeIdentity();
01406 MSADEBPRINT(printf("MSA_PageArray::Sync about to call contribute \n"););
01407 CkCallback cb(CkIndex_MSA_CacheGroup<ENTRY_TYPE, ENTRY_OPS_CLASS, ENTRIES_PER_PAGE>::SyncDone(NULL), cache);
01408 this->contribute(0, NULL, CkReduction::concat, cb);
01409 }
01410
01411 inline void emit(int ID, int index)
01412 {
01413
01414
01415
01416
01417
01418 }
01419 };
01420
01421 #define CK_TEMPLATES_ONLY
01422 #include "msa.def.h"
01423 #undef CK_TEMPLATES_ONLY
01424
01425 #endif