00001 #include <vector>
00002 #include "armci_impl.h"
00003
00004 using namespace std;
00005
00006 int **_armciRednLookupTable;
00007
00008
00009
00010
00011 extern "C" void armciLibStart(void) {
00012 int argc=CkGetArgc();
00013 char **argv=CkGetArgv();
00014 ARMCI_Main_cpp(argc, argv);
00015 }
00016
00017 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(sum,ret[i]+=value[i];)
00018 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(product,ret[i]*=value[i];)
00019 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
00020 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
00021 _ARMCI_GENERATE_ABS_REDUCTION()
00022
00023 static int armciLibStart_idx = -1;
00024
00025 void armciNodeInit(void) {
00026 CmiAssert(armciLibStart_idx == -1);
00027 armciLibStart_idx = TCHARM_Register_thread_function((TCHARM_Thread_data_start_fn)armciLibStart);
00028
00029
00030 _armciRednLookupTable = new int*[_ARMCI_NUM_REDN_OPS];
00031 for (int ops=0; ops<_ARMCI_NUM_REDN_OPS; ops++) {
00032 _armciRednLookupTable[ops] = new int[ARMCI_NUM_DATATYPES];
00033 }
00034
00035
00036 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(sum,_ARMCI_REDN_OP_SUM);
00037 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(product,_ARMCI_REDN_OP_SUM);
00038 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(max,_ARMCI_REDN_OP_MAX);
00039 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(min,_ARMCI_REDN_OP_MIN);
00040 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmax,_ARMCI_REDN_OP_ABSMAX);
00041 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmin,_ARMCI_REDN_OP_ABSMIN);
00042 }
00043
00044
00045
00046
00047 static void ArmciDefaultSetup(void) {
00048
00049 TCHARM_Create(TCHARM_Get_num_chunks(), armciLibStart_idx);
00050 }
00051
00052 CtvDeclare(ArmciVirtualProcessor *, _armci_ptr);
00053
00054
00055 void armciProcInit(void) {
00056 CtvInitialize(ArmciVirtualProcessor, _armci_ptr);
00057 CtvAccess(_armci_ptr) = NULL;
00058
00059
00060 TCHARM_Set_fallback_setup(ArmciDefaultSetup);
00061 }
00062
00063 ArmciVirtualProcessor::ArmciVirtualProcessor(const CProxy_TCharm &_thr_proxy)
00064 : TCharmClient1D(_thr_proxy) {
00065 thisProxy = this;
00066 tcharmClientInit();
00067 thread->semaPut(ARMCI_TCHARM_SEMAID,this);
00068 memBlock = CmiIsomallocBlockListNew(thread->getThread());
00069 thisProxy = CProxy_ArmciVirtualProcessor(thisArrayID);
00070 addressReply = NULL;
00071
00072 }
00073
00074 ArmciVirtualProcessor::ArmciVirtualProcessor(CkMigrateMessage *m)
00075 : TCharmClient1D(m)
00076 {
00077
00078 thread = NULL;
00079 addressReply = NULL;
00080 }
00081
00082 ArmciVirtualProcessor::~ArmciVirtualProcessor()
00083 {
00084 #if !CMK_USE_MEMPOOL_ISOMALLOC
00085 CmiIsomallocBlockListDelete(memBlock);
00086 #endif
00087 if (addressReply) {delete addressReply;}
00088 }
00089
00090 void ArmciVirtualProcessor::setupThreadPrivate(CthThread forThread) {
00091 CtvAccessOther(forThread, _armci_ptr) = this;
00092 armci_nproc = thread->getNumElements();
00093 }
00094
00095 void ArmciVirtualProcessor::getAddresses(AddressMsg *msg) {
00096 addressReply = msg;
00097 thread->resume();
00098 }
00099
00100
00101 void ArmciVirtualProcessor::put(pointer src, pointer dst,
00102 int nbytes, int dst_proc) {
00103
00104
00105
00106
00107 int hdl = hdlList.size();
00108 Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src, dst);
00109 hdlList.push_back(entry);
00110
00111 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00112 memcpy(msg->data, src, nbytes);
00113 thisProxy[dst_proc].putData(msg);
00114
00115 }
00116
00117 int ArmciVirtualProcessor::nbput(pointer src, pointer dst,
00118 int nbytes, int dst_proc) {
00119
00120
00121
00122
00123 int hdl = hdlList.size();
00124 Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src, dst);
00125 hdlList.push_back(entry);
00126
00127 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00128 memcpy(msg->data, src, nbytes);
00129 thisProxy[dst_proc].putData(msg);
00130
00131 return hdl;
00132 }
00133
00134 void ArmciVirtualProcessor::nbput_implicit(pointer src, pointer dst,
00135 int nbytes, int dst_proc) {
00136 int hdl = hdlList.size();
00137 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, src, dst);
00138 hdlList.push_back(entry);
00139
00140 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00141 memcpy(msg->data, src, nbytes);
00142 thisProxy[dst_proc].putData(msg);
00143 }
00144
00145 void ArmciVirtualProcessor::putData(pointer dst, int nbytes, char *data,
00146 int src_proc, int hdl) {
00147 memcpy(dst, data, nbytes);
00148 thisProxy[src_proc].putAck(hdl);
00149 }
00150
00151 void ArmciVirtualProcessor::putData(ArmciMsg *m) {
00152 memcpy(m->dst, m->data, m->nbytes);
00153 thisProxy[m->src_proc].putAck(m->hdl);
00154 delete m;
00155 }
00156
00157 void ArmciVirtualProcessor::putAck(int hdl) {
00158 if(hdl != -1) {
00159 hdlList[hdl]->acked = 1;
00160 if (hdlList[hdl]->wait == 1) {
00161 hdlList[hdl]->wait = 0;
00162 thread->resume();
00163 }
00164 }
00165 thread->resume();
00166 }
00167
00168 void ArmciVirtualProcessor::get(pointer src, pointer dst,
00169 int nbytes, int src_proc) {
00170
00171
00172
00173
00174 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, -1);
00175
00176 thread->suspend();
00177 }
00178
00179 int ArmciVirtualProcessor::nbget(pointer src, pointer dst,
00180 int nbytes, int src_proc) {
00181
00182
00183
00184
00185
00186 int hdl = hdlList.size();
00187 Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src, dst);
00188 hdlList.push_back(entry);
00189
00190 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00191
00192 return hdl;
00193 }
00194
00195 void ArmciVirtualProcessor::nbget_implicit(pointer src, pointer dst,
00196 int nbytes, int src_proc) {
00197 int hdl = hdlList.size();
00198 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src, dst);
00199 hdlList.push_back(entry);
00200
00201 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00202 }
00203
00204 void ArmciVirtualProcessor::wait(int hdl){
00205 if(hdl == -1) return;
00206 while (1) {
00207 if(hdlList[hdl]->acked != 0)
00208 break;
00209 else
00210 thread->suspend();
00211 }
00212 }
00213
00214
00215
00216
00217
00218
00219
00220
00221 void ArmciVirtualProcessor::waitmulti(vector<int> procs){
00222 for(int i=0;i<procs.size();i++){
00223 wait(procs[i]);
00224 }
00225 }
00226
00227 void ArmciVirtualProcessor::waitproc(int proc){
00228 vector<int> procs;
00229 for(int i=0;i<hdlList.size();i++){
00230 if((hdlList[i]->acked == 0) &&
00231 (hdlList[i]->proc == proc) &&
00232 ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00233 hdlList[i]->wait = 1;
00234 procs.push_back(i);
00235 }
00236 }
00237 waitmulti(procs);
00238 }
00239
00240 void ArmciVirtualProcessor::waitall(){
00241 vector<int> procs;
00242 for(int i=0;i<hdlList.size();i++){
00243 if((hdlList[i]->acked == 0) &&
00244 ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00245 hdlList[i]->wait = 1;
00246 procs.push_back(i);
00247 }
00248 }
00249 waitmulti(procs);
00250 }
00251
00252 void ArmciVirtualProcessor::fence(int proc){
00253 vector<int> procs;
00254 for(int i=0;i<hdlList.size();i++){
00255 if((hdlList[i]->acked == 0) &&
00256 ((hdlList[i]->op & BLOCKING_MASK) != 0) &&
00257 (hdlList[i]->proc == proc))
00258 procs.push_back(i);
00259 }
00260 waitmulti(procs);
00261 }
00262 void ArmciVirtualProcessor::allfence(){
00263 vector<int> procs;
00264 for(int i=0;i<hdlList.size();i++){
00265 if((hdlList[i]->acked == 0) &&
00266 ((hdlList[i]->op & BLOCKING_MASK) != 0))
00267 procs.push_back(i);
00268 }
00269 waitmulti(procs);
00270 }
00271 void ArmciVirtualProcessor::barrier(){
00272 allfence();
00273 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00274 contribute(0,NULL,CkReduction::sum_int,cb);
00275 thread->suspend();
00276 }
00277
00278 void ArmciVirtualProcessor::resumeThread(void){
00279 thread->resume();
00280 }
00281
00282 int ArmciVirtualProcessor::test(int hdl){
00283 if(hdl == -1) return 1;
00284 return hdlList[hdl]->acked;
00285 }
00286
00287 void ArmciVirtualProcessor::requestFromGet(pointer src, pointer dst, int nbytes,
00288 int dst_proc, int hdl) {
00289 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,-1,hdl);
00290 memcpy(msg->data, src, nbytes);
00291 thisProxy[dst_proc].putDataFromGet(msg);
00292 }
00293
00294
00295
00296
00297 void ArmciVirtualProcessor::putDataFromGet(pointer dst, int nbytes, char *data, int hdl) {
00298 memcpy(dst, data, nbytes);
00299 if(hdl != -1) {
00300 hdlList[hdl]->acked = 1;
00301 if (hdlList[hdl]->wait == 1) {
00302 hdlList[hdl]->wait = 0;
00303 thread->resume();
00304 }
00305 }
00306 thread->resume();
00307 }
00308
00309 void ArmciVirtualProcessor::putDataFromGet(ArmciMsg *m) {
00310 memcpy(m->dst, m->data, m->nbytes);
00311 if(m->hdl != -1) {
00312 hdlList[m->hdl]->acked = 1;
00313 if (hdlList[m->hdl]->wait == 1) {
00314 hdlList[m->hdl]->wait = 0;
00315 thread->resume();
00316 }
00317 }
00318 delete m;
00319 thread->resume();
00320 }
00321
00322 void ArmciVirtualProcessor::puts(pointer src_ptr, int src_stride_ar[],
00323 pointer dst_ptr, int dst_stride_ar[],
00324 int count[], int stride_levels, int dst_proc){
00325 int nbytes = 1;
00326 for(int i=0;i<stride_levels+1;i++)
00327 nbytes *= count[i];
00328
00329
00330
00331
00332
00333
00334
00335
00336 int hdl = hdlList.size();
00337 Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src_ptr, dst_ptr);
00338 hdlList.push_back(entry);
00339
00340 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00341
00342 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00343 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00344 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00345 thisProxy[dst_proc].putsData(m);
00346 }
00347
00348 int ArmciVirtualProcessor::nbputs(pointer src_ptr, int src_stride_ar[],
00349 pointer dst_ptr, int dst_stride_ar[],
00350 int count[], int stride_levels, int dst_proc){
00351 int nbytes = 1;
00352 for(int i=0;i<stride_levels+1;i++)
00353 nbytes *= count[i];
00354
00355
00356
00357
00358
00359
00360
00361
00362 int hdl = hdlList.size();
00363 Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src_ptr, dst_ptr);
00364 hdlList.push_back(entry);
00365
00366 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00367
00368 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00369 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00370 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00371 thisProxy[dst_proc].putsData(m);
00372 return hdl;
00373 }
00374
00375 void ArmciVirtualProcessor::nbputs_implicit(pointer src_ptr,
00376 int src_stride_ar[],
00377 pointer dst_ptr,
00378 int dst_stride_ar[],
00379 int count[], int stride_levels,
00380 int dst_proc){
00381 int nbytes = 1;
00382 for(int i=0;i<stride_levels+1;i++)
00383 nbytes *= count[i];
00384 int hdl = hdlList.size();
00385 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes,
00386 src_ptr, dst_ptr);
00387 hdlList.push_back(entry);
00388
00389 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00390
00391 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00392 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00393 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00394 thisProxy[dst_proc].putsData(m);
00395 }
00396
00397 void ArmciVirtualProcessor::putsData(pointer dst_ptr, int dst_stride_ar[],
00398 int count[], int stride_levels,
00399 int nbytes, char *data, int src_proc, int hdl){
00400 stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00401 thisProxy[src_proc].putAck(hdl);
00402 }
00403
00404 void ArmciVirtualProcessor::putsData(ArmciStridedMsg *m){
00405 stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00406 thisProxy[m->src_proc].putAck(m->hdl);
00407 delete m;
00408 }
00409
00410 void ArmciVirtualProcessor::gets(pointer src_ptr, int src_stride_ar[],
00411 pointer dst_ptr, int dst_stride_ar[],
00412 int count[], int stride_levels, int src_proc){
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00425 count, stride_levels, thisIndex, -1);
00426
00427 thread->suspend();
00428 }
00429
00430 int ArmciVirtualProcessor::nbgets(pointer src_ptr, int src_stride_ar[],
00431 pointer dst_ptr, int dst_stride_ar[],
00432 int count[], int stride_levels, int src_proc){
00433 int hdl = hdlList.size();
00434 int nbytes = 1;
00435 for(int i=0;i<stride_levels+1;i++)
00436 nbytes *= count[i];
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src_ptr, dst_ptr);
00447 hdlList.push_back(entry);
00448
00449 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00450 count, stride_levels, thisIndex, hdl);
00451
00452 return hdl;
00453 }
00454
00455 void ArmciVirtualProcessor::nbgets_implicit(pointer src_ptr,
00456 int src_stride_ar[],
00457 pointer dst_ptr,
00458 int dst_stride_ar[],
00459 int count[], int stride_levels,
00460 int src_proc) {
00461 int hdl = hdlList.size();
00462 int nbytes = 1;
00463 for(int i=0;i<stride_levels+1;i++)
00464 nbytes *= count[i];
00465
00466 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src_ptr, dst_ptr);
00467 hdlList.push_back(entry);
00468
00469 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00470 count, stride_levels, thisIndex, hdl);
00471 }
00472
00473 void ArmciVirtualProcessor::requestFromGets(pointer src_ptr, int src_stride_ar[],
00474 pointer dst_ptr, int dst_stride_ar[], int count[], int stride_levels, int dst_proc, int hdl){
00475 int nbytes = 1;
00476 for(int i=0;i<stride_levels+1;i++)
00477 nbytes *= count[i];
00478
00479 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00480
00481 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00482 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00483 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00484 thisProxy[dst_proc].putDataFromGets(m);
00485 }
00486 void ArmciVirtualProcessor::putDataFromGets(pointer dst_ptr, int dst_stride_ar[],
00487 int count[], int stride_levels, int nbytes, char *data, int hdl){
00488 stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00489 if(hdl != -1) {
00490 hdlList[hdl]->acked = 1;
00491 if (hdlList[hdl]->wait == 1) {
00492 hdlList[hdl]->wait = 0;
00493 thread->resume();
00494 }
00495 }
00496 thread->resume();
00497 }
00498
00499 void ArmciVirtualProcessor::putDataFromGets(ArmciStridedMsg *m){
00500 stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00501 if(m->hdl != -1) {
00502 hdlList[m->hdl]->acked = 1;
00503 if (hdlList[m->hdl]->wait == 1) {
00504 hdlList[m->hdl]->wait = 0;
00505 thread->resume();
00506 }
00507 }
00508 delete m;
00509 thread->resume();
00510 }
00511
00512 void ArmciVirtualProcessor::notify(int proc){
00513 thisProxy[proc].sendNote(thisIndex);
00514 }
00515 void ArmciVirtualProcessor::sendNote(int proc){
00516
00517
00518
00519 int hasNote = -1;
00520 for(int i=0;i<noteList.size();i++){
00521 if(noteList[i]->proc == proc){
00522 hasNote = i;
00523 break;
00524 }
00525 }
00526 if(hasNote!=-1){
00527 (noteList[hasNote]->notified)++;
00528 } else {
00529 Armci_Note* newNote = new Armci_Note(proc, 0, 1);
00530 noteList.push_back(newNote);
00531 hasNote = noteList.size() - 1;
00532 }
00533 if(noteList[hasNote]->notified >= noteList[hasNote]->waited){
00534
00535
00536
00537
00538 thread->resume();
00539 }
00540 }
00541 void ArmciVirtualProcessor::notify_wait(int proc){
00542
00543
00544
00545 int hasNote = -1;
00546 for(int i=0;i<noteList.size();i++){
00547 if(noteList[i]->proc == proc){
00548 hasNote = i;
00549 break;
00550 }
00551 }
00552 if(hasNote!=-1){
00553 (noteList[hasNote]->waited)++;
00554 } else {
00555 Armci_Note* newNote = new Armci_Note(proc, 1, 0);
00556 noteList.push_back(newNote);
00557 hasNote = noteList.size() - 1;
00558 }
00559 if(noteList[hasNote]->notified < noteList[hasNote]->waited){
00560 thread->suspend();
00561 }
00562 }
00563
00564 void ArmciVirtualProcessor::pup(PUP::er &p) {
00565 TCharmClient1D::pup(p);
00566
00567 #if CMK_USE_MEMPOOL_ISOMALLOC
00568 pup_bytes(&p, &memBlock, sizeof(CmiIsomallocBlockList*));
00569 #else
00570 CmiIsomallocBlockListPup(&p, &memBlock, NULL);
00571 #endif
00572 p|thisProxy;
00573 p|hdlList;
00574 p|noteList;
00575 CkPupMessage(p, (void **)&addressReply, 1);
00576 }
00577
00578
00579 void ArmciVirtualProcessor::requestAddresses(pointer ptr, pointer ptr_arr[], int bytes) {
00580 int thisPE = armci_me;
00581 int numPE = armci_nproc;
00582
00583 addressReply = NULL;
00584 addressPair *pair = new addressPair;
00585 pair->pe = thisPE;
00586 pair->ptr = ptr;
00587
00588 CkCallback cb(CkIndex_ArmciVirtualProcessor::mallocClient(NULL),CkArrayIndex1D(0),thisProxy);
00589 contribute(sizeof(addressPair), pair, CkReduction::concat, cb);
00590
00591 while(addressReply==NULL) thread->suspend();
00592
00593
00594 for (int i=0; i<numPE; i++) {
00595 ptr_arr[i] = addressReply->addresses[i];
00596 }
00597 delete addressReply;
00598 addressReply = NULL;
00599 }
00600
00601 void ArmciVirtualProcessor::stridedCopy(void *base, void *buffer_ptr,
00602 int *stride, int *count, int stride_levels, bool flatten) {
00603 if (stride_levels == 0) {
00604 if (flatten) {
00605 memcpy(buffer_ptr, base, count[stride_levels]);
00606 } else {
00607 memcpy(base, buffer_ptr, count[stride_levels]);
00608 }
00609 } else {
00610 int mystride = 1;
00611 for(int i=0;i<stride_levels;i++)
00612 mystride *= count[i];
00613 for (int i=0; i<count[stride_levels]; i++) {
00614 stridedCopy((void *)((char *)base + stride[stride_levels-1]*i),
00615 (void *)((char *)buffer_ptr + mystride*i), stride, count, stride_levels-1, flatten);
00616 }
00617 }
00618 }
00619
00620
00621 void ArmciVirtualProcessor::mallocClient(CkReductionMsg *msg) {
00622 int numBlocks = msg->getSize()/sizeof(addressPair);
00623 addressPair *dataBlocks = (addressPair *)msg->getData();
00624 AddressMsg *addrmsg = new(numBlocks, 0) AddressMsg;
00625
00626 for (int i=0; i<numBlocks; i++) {
00627 addrmsg->addresses[dataBlocks[i].pe] = dataBlocks[i].ptr;
00628 }
00629
00630 thisProxy.getAddresses(addrmsg);
00631 delete msg;
00632 }
00633
00634
00635
00636
00637
00638
00639
00640
00641 void ArmciVirtualProcessor::msgBcast(void *buffer, int len, int root) {
00642 int me;
00643 ARMCI_Myid(&me);
00644 if (me == root) {
00645 thisProxy.recvMsgBcast(len, (char *)buffer, root);
00646 } else {
00647
00648 collectiveTmpBufferPtr = buffer;
00649 thread->suspend();
00650 }
00651 }
00652
00653
00654 void ArmciVirtualProcessor::recvMsgBcast(int len, char *buffer, int root) {
00655 int me;
00656 ARMCI_Myid(&me);
00657 if (me != root) {
00658
00659
00660
00661 collectiveTmpBufferPtr = memcpy(collectiveTmpBufferPtr, buffer, len);
00662 collectiveTmpBufferPtr = NULL;
00663 thread->resume();
00664 }
00665 }
00666
00667
00668
00669
00670 void ArmciVirtualProcessor::msgGop(void *x, int n, char *op, int type) {
00671 CkReduction::reducerType reducer;
00672 if (strcmp(op,"+") == 0) {
00673 } else if (strcmp(op,"*") == 0) {
00674 } else if (strcmp(op,"min") == 0) {
00675 } else if (strcmp(op,"max") == 0) {
00676 } else if (strcmp(op,"absmin") == 0) {
00677 } else if (strcmp(op,"absmax") == 0) {
00678 } else {
00679 CkPrintf("Operator %s not supported\n",op);
00680 CmiAbort("ARMCI ERROR: msgGop - Unknown operator\n");
00681 }
00682 switch (type) {
00683 case ARMCI_INT:
00684
00685 break;
00686 case ARMCI_LONG:
00687 break;
00688 case ARMCI_LONG_LONG:
00689 break;
00690 case ARMCI_FLOAT:
00691 break;
00692 case ARMCI_DOUBLE:
00693 break;
00694 default:
00695 CkPrintf("ARMCI Type %d not supported\n", type);
00696 CmiAbort("ARMCI ERROR: msgGop - Unknown type\n");
00697 }
00698 }
00699
00700
00701 class ckptClientStruct {
00702 public:
00703 const char *dname;
00704 ArmciVirtualProcessor *vp;
00705 ckptClientStruct(const char *s, ArmciVirtualProcessor *p): dname(s), vp(p) {}
00706 };
00707
00708 static void checkpointClient(void *param,void *msg)
00709 {
00710 ckptClientStruct *client = (ckptClientStruct*)param;
00711 const char *dname = client->dname;
00712 ArmciVirtualProcessor *vp = client->vp;
00713 vp->checkpoint(strlen(dname), dname);
00714 delete client;
00715 }
00716
00717 void ArmciVirtualProcessor::startCheckpoint(const char* dname){
00718 if (thisIndex==0) {
00719 ckptClientStruct *clientData = new ckptClientStruct(dname, this);
00720 CkCallback cb(checkpointClient, clientData);
00721 contribute(0, NULL, CkReduction::sum_int, cb);
00722 } else {
00723 contribute(0, NULL, CkReduction::sum_int);
00724 }
00725 thread->suspend();
00726 }
00727 void ArmciVirtualProcessor::checkpoint(int len, const char* dname){
00728 if (len == 0) {
00729 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00730 CkStartMemCheckpoint(cb);
00731 } else {
00732 char dirname[256];
00733 strncpy(dirname,dname,len);
00734 dirname[len]='\0';
00735 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00736 CkStartCheckpoint(dirname,cb);
00737 }
00738 }
00739
00740 #include "armci.def.h"
00741