00001 #include <vector>
00002 #include "armci_impl.h"
00003
00004 using namespace std;
00005
00006 int **_armciRednLookupTable;
00007
00008
00009
00010
00011 extern "C" void armciLibStart(void) {
00012 int argc=CkGetArgc();
00013 char **argv=CkGetArgv();
00014 ARMCI_Main_cpp(argc, argv);
00015 }
00016
00017 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(sum,ret[i]+=value[i];)
00018 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(product,ret[i]*=value[i];)
00019 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
00020 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
00021 _ARMCI_GENERATE_ABS_REDUCTION()
00022
00023 static int armciLibStart_idx = -1;
00024
00025 #if CMK_TRACE_ENABLED
00026 #include "register.h"
00027 CsvExtern(funcmap*, tcharm_funcmap);
00028 #endif
00029
00030 void armciNodeInit(void) {
00031 #if CMK_TRACE_ENABLED
00032 TCharm::nodeInit();
00033 int funclength = sizeof(funclist)/sizeof(char*);
00034 for (int i=0; i<funclength; i++) {
00035 int event_id = traceRegisterUserEvent(funclist[i], -1);
00036 CsvAccess(tcharm_funcmap)->insert(std::pair<std::string, int>(funclist[i], event_id));
00037 }
00038
00039
00040
00041 for (int i=0; i<_chareTable.size(); i++){
00042 if (strcmp(_chareTable[i]->name, "dummy_thread_chare") == 0)
00043 _chareTable[i]->name = "ARMCI";
00044 }
00045 for (int i=0; i<_entryTable.size(); i++){
00046 if (strcmp(_entryTable[i]->name, "dummy_thread_ep") == 0)
00047 _entryTable[i]->setName("thread");
00048 }
00049 #endif
00050 CmiAssert(armciLibStart_idx == -1);
00051 armciLibStart_idx = TCHARM_Register_thread_function((TCHARM_Thread_data_start_fn)armciLibStart);
00052
00053
00054 _armciRednLookupTable = new int*[_ARMCI_NUM_REDN_OPS];
00055 for (int ops=0; ops<_ARMCI_NUM_REDN_OPS; ops++) {
00056 _armciRednLookupTable[ops] = new int[ARMCI_NUM_DATATYPES];
00057 }
00058
00059
00060 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(sum,_ARMCI_REDN_OP_SUM);
00061 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(product,_ARMCI_REDN_OP_SUM);
00062 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(max,_ARMCI_REDN_OP_MAX);
00063 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(min,_ARMCI_REDN_OP_MIN);
00064 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmax,_ARMCI_REDN_OP_ABSMAX);
00065 _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmin,_ARMCI_REDN_OP_ABSMIN);
00066 }
00067
00068
00069
00070
00071 static void ArmciDefaultSetup(void) {
00072
00073 TCHARM_Create(TCHARM_Get_num_chunks(), armciLibStart_idx);
00074 }
00075
00076 CtvDeclare(ArmciVirtualProcessor *, _armci_ptr);
00077
00078
00079 void armciProcInit(void) {
00080 CtvInitialize(ArmciVirtualProcessor, _armci_ptr);
00081 CtvAccess(_armci_ptr) = NULL;
00082
00083
00084 TCHARM_Set_fallback_setup(ArmciDefaultSetup);
00085 }
00086
00087 ArmciVirtualProcessor::ArmciVirtualProcessor(const CProxy_TCharm &_thr_proxy)
00088 : TCharmClient1D(_thr_proxy) {
00089 thisProxy = this;
00090 tcharmClientInit();
00091 thread->semaPut(ARMCI_TCHARM_SEMAID,this);
00092 memBlock = CmiIsomallocBlockListNew();
00093 thisProxy = CProxy_ArmciVirtualProcessor(thisArrayID);
00094 addressReply = NULL;
00095
00096 }
00097
00098 ArmciVirtualProcessor::ArmciVirtualProcessor(CkMigrateMessage *m)
00099 : TCharmClient1D(m)
00100 {
00101
00102 thread = NULL;
00103 addressReply = NULL;
00104 }
00105
00106 ArmciVirtualProcessor::~ArmciVirtualProcessor()
00107 {
00108 CmiIsomallocBlockListDelete(memBlock);
00109 if (addressReply) {delete addressReply;}
00110 }
00111
00112 void ArmciVirtualProcessor::setupThreadPrivate(CthThread forThread) {
00113 CtvAccessOther(forThread, _armci_ptr) = this;
00114 armci_nproc = thread->getNumElements();
00115 }
00116
00117 void ArmciVirtualProcessor::getAddresses(AddressMsg *msg) {
00118 addressReply = msg;
00119 thread->resume();
00120 }
00121
00122
00123 void ArmciVirtualProcessor::put(pointer src, pointer dst,
00124 int nbytes, int dst_proc) {
00125
00126
00127
00128
00129 int hdl = hdlList.size();
00130 Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src, dst);
00131 hdlList.push_back(entry);
00132
00133 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00134 memcpy(msg->data, src, nbytes);
00135 thisProxy[dst_proc].putData(msg);
00136
00137 }
00138
00139 int ArmciVirtualProcessor::nbput(pointer src, pointer dst,
00140 int nbytes, int dst_proc) {
00141
00142
00143
00144
00145 int hdl = hdlList.size();
00146 Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src, dst);
00147 hdlList.push_back(entry);
00148
00149 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00150 memcpy(msg->data, src, nbytes);
00151 thisProxy[dst_proc].putData(msg);
00152
00153 return hdl;
00154 }
00155
00156 void ArmciVirtualProcessor::nbput_implicit(pointer src, pointer dst,
00157 int nbytes, int dst_proc) {
00158 int hdl = hdlList.size();
00159 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, src, dst);
00160 hdlList.push_back(entry);
00161
00162 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00163 memcpy(msg->data, src, nbytes);
00164 thisProxy[dst_proc].putData(msg);
00165 }
00166
00167 void ArmciVirtualProcessor::putData(pointer dst, int nbytes, char *data,
00168 int src_proc, int hdl) {
00169 memcpy(dst, data, nbytes);
00170 thisProxy[src_proc].putAck(hdl);
00171 }
00172
00173 void ArmciVirtualProcessor::putData(ArmciMsg *m) {
00174 memcpy(m->dst, m->data, m->nbytes);
00175 thisProxy[m->src_proc].putAck(m->hdl);
00176 delete m;
00177 }
00178
00179 void ArmciVirtualProcessor::putAck(int hdl) {
00180 if(hdl != -1) {
00181 hdlList[hdl]->acked = 1;
00182 if (hdlList[hdl]->wait == 1) {
00183 hdlList[hdl]->wait = 0;
00184 thread->resume();
00185 }
00186 }
00187 thread->resume();
00188 }
00189
00190 void ArmciVirtualProcessor::get(pointer src, pointer dst,
00191 int nbytes, int src_proc) {
00192
00193
00194
00195
00196 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, -1);
00197
00198 thread->suspend();
00199 }
00200
00201 int ArmciVirtualProcessor::nbget(pointer src, pointer dst,
00202 int nbytes, int src_proc) {
00203
00204
00205
00206
00207
00208 int hdl = hdlList.size();
00209 Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src, dst);
00210 hdlList.push_back(entry);
00211
00212 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00213
00214 return hdl;
00215 }
00216
00217 void ArmciVirtualProcessor::nbget_implicit(pointer src, pointer dst,
00218 int nbytes, int src_proc) {
00219 int hdl = hdlList.size();
00220 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src, dst);
00221 hdlList.push_back(entry);
00222
00223 thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00224 }
00225
00226 void ArmciVirtualProcessor::wait(int hdl){
00227 if(hdl == -1) return;
00228 while (1) {
00229 if(hdlList[hdl]->acked != 0)
00230 break;
00231 else
00232 thread->suspend();
00233 }
00234 }
00235
00236
00237
00238
00239
00240
00241
00242
00243 void ArmciVirtualProcessor::waitmulti(vector<int> procs){
00244 for(int i=0;i<procs.size();i++){
00245 wait(procs[i]);
00246 }
00247 }
00248
00249 void ArmciVirtualProcessor::waitproc(int proc){
00250 vector<int> procs;
00251 for(int i=0;i<hdlList.size();i++){
00252 if((hdlList[i]->acked == 0) &&
00253 (hdlList[i]->proc == proc) &&
00254 ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00255 hdlList[i]->wait = 1;
00256 procs.push_back(i);
00257 }
00258 }
00259 waitmulti(procs);
00260 }
00261
00262 void ArmciVirtualProcessor::waitall(){
00263 vector<int> procs;
00264 for(int i=0;i<hdlList.size();i++){
00265 if((hdlList[i]->acked == 0) &&
00266 ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00267 hdlList[i]->wait = 1;
00268 procs.push_back(i);
00269 }
00270 }
00271 waitmulti(procs);
00272 }
00273
00274 void ArmciVirtualProcessor::fence(int proc){
00275 vector<int> procs;
00276 for(int i=0;i<hdlList.size();i++){
00277 if((hdlList[i]->acked == 0) &&
00278 ((hdlList[i]->op & BLOCKING_MASK) != 0) &&
00279 (hdlList[i]->proc == proc))
00280 procs.push_back(i);
00281 }
00282 waitmulti(procs);
00283 }
00284 void ArmciVirtualProcessor::allfence(){
00285 vector<int> procs;
00286 for(int i=0;i<hdlList.size();i++){
00287 if((hdlList[i]->acked == 0) &&
00288 ((hdlList[i]->op & BLOCKING_MASK) != 0))
00289 procs.push_back(i);
00290 }
00291 waitmulti(procs);
00292 }
00293 void ArmciVirtualProcessor::barrier(){
00294 allfence();
00295 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00296 contribute(0,NULL,CkReduction::sum_int,cb);
00297 thread->suspend();
00298 }
00299
00300 void ArmciVirtualProcessor::resumeThread(void){
00301 thread->resume();
00302 }
00303
00304 int ArmciVirtualProcessor::test(int hdl){
00305 if(hdl == -1) return 1;
00306 return hdlList[hdl]->acked;
00307 }
00308
00309 void ArmciVirtualProcessor::requestFromGet(pointer src, pointer dst, int nbytes,
00310 int dst_proc, int hdl) {
00311 ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,-1,hdl);
00312 memcpy(msg->data, src, nbytes);
00313 thisProxy[dst_proc].putDataFromGet(msg);
00314 }
00315
00316
00317
00318
00319 void ArmciVirtualProcessor::putDataFromGet(pointer dst, int nbytes, char *data, int hdl) {
00320 memcpy(dst, data, nbytes);
00321 if(hdl != -1) {
00322 hdlList[hdl]->acked = 1;
00323 if (hdlList[hdl]->wait == 1) {
00324 hdlList[hdl]->wait = 0;
00325 thread->resume();
00326 }
00327 }
00328 thread->resume();
00329 }
00330
00331 void ArmciVirtualProcessor::putDataFromGet(ArmciMsg *m) {
00332 memcpy(m->dst, m->data, m->nbytes);
00333 if(m->hdl != -1) {
00334 hdlList[m->hdl]->acked = 1;
00335 if (hdlList[m->hdl]->wait == 1) {
00336 hdlList[m->hdl]->wait = 0;
00337 thread->resume();
00338 }
00339 }
00340 delete m;
00341 thread->resume();
00342 }
00343
00344 void ArmciVirtualProcessor::puts(pointer src_ptr, int src_stride_ar[],
00345 pointer dst_ptr, int dst_stride_ar[],
00346 int count[], int stride_levels, int dst_proc){
00347 int nbytes = 1;
00348 for(int i=0;i<stride_levels+1;i++)
00349 nbytes *= count[i];
00350
00351
00352
00353
00354
00355
00356
00357
00358 int hdl = hdlList.size();
00359 Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src_ptr, dst_ptr);
00360 hdlList.push_back(entry);
00361
00362 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00363
00364 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00365 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00366 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00367 thisProxy[dst_proc].putsData(m);
00368 }
00369
00370 int ArmciVirtualProcessor::nbputs(pointer src_ptr, int src_stride_ar[],
00371 pointer dst_ptr, int dst_stride_ar[],
00372 int count[], int stride_levels, int dst_proc){
00373 int nbytes = 1;
00374 for(int i=0;i<stride_levels+1;i++)
00375 nbytes *= count[i];
00376
00377
00378
00379
00380
00381
00382
00383
00384 int hdl = hdlList.size();
00385 Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src_ptr, dst_ptr);
00386 hdlList.push_back(entry);
00387
00388 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00389
00390 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00391 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00392 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00393 thisProxy[dst_proc].putsData(m);
00394 return hdl;
00395 }
00396
00397 void ArmciVirtualProcessor::nbputs_implicit(pointer src_ptr,
00398 int src_stride_ar[],
00399 pointer dst_ptr,
00400 int dst_stride_ar[],
00401 int count[], int stride_levels,
00402 int dst_proc){
00403 int nbytes = 1;
00404 for(int i=0;i<stride_levels+1;i++)
00405 nbytes *= count[i];
00406 int hdl = hdlList.size();
00407 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes,
00408 src_ptr, dst_ptr);
00409 hdlList.push_back(entry);
00410
00411 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00412
00413 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00414 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00415 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00416 thisProxy[dst_proc].putsData(m);
00417 }
00418
00419 void ArmciVirtualProcessor::putsData(pointer dst_ptr, int dst_stride_ar[],
00420 int count[], int stride_levels,
00421 int nbytes, char *data, int src_proc, int hdl){
00422 stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00423 thisProxy[src_proc].putAck(hdl);
00424 }
00425
00426 void ArmciVirtualProcessor::putsData(ArmciStridedMsg *m){
00427 stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00428 thisProxy[m->src_proc].putAck(m->hdl);
00429 delete m;
00430 }
00431
00432 void ArmciVirtualProcessor::gets(pointer src_ptr, int src_stride_ar[],
00433 pointer dst_ptr, int dst_stride_ar[],
00434 int count[], int stride_levels, int src_proc){
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00447 count, stride_levels, thisIndex, -1);
00448
00449 thread->suspend();
00450 }
00451
00452 int ArmciVirtualProcessor::nbgets(pointer src_ptr, int src_stride_ar[],
00453 pointer dst_ptr, int dst_stride_ar[],
00454 int count[], int stride_levels, int src_proc){
00455 int hdl = hdlList.size();
00456 int nbytes = 1;
00457 for(int i=0;i<stride_levels+1;i++)
00458 nbytes *= count[i];
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468 Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src_ptr, dst_ptr);
00469 hdlList.push_back(entry);
00470
00471 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00472 count, stride_levels, thisIndex, hdl);
00473
00474 return hdl;
00475 }
00476
00477 void ArmciVirtualProcessor::nbgets_implicit(pointer src_ptr,
00478 int src_stride_ar[],
00479 pointer dst_ptr,
00480 int dst_stride_ar[],
00481 int count[], int stride_levels,
00482 int src_proc) {
00483 int hdl = hdlList.size();
00484 int nbytes = 1;
00485 for(int i=0;i<stride_levels+1;i++)
00486 nbytes *= count[i];
00487
00488 Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src_ptr, dst_ptr);
00489 hdlList.push_back(entry);
00490
00491 thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar,
00492 count, stride_levels, thisIndex, hdl);
00493 }
00494
00495 void ArmciVirtualProcessor::requestFromGets(pointer src_ptr, int src_stride_ar[],
00496 pointer dst_ptr, int dst_stride_ar[], int count[], int stride_levels, int dst_proc, int hdl){
00497 int nbytes = 1;
00498 for(int i=0;i<stride_levels+1;i++)
00499 nbytes *= count[i];
00500
00501 ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00502
00503 memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00504 memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00505 stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00506 thisProxy[dst_proc].putDataFromGets(m);
00507 }
00508 void ArmciVirtualProcessor::putDataFromGets(pointer dst_ptr, int dst_stride_ar[],
00509 int count[], int stride_levels, int nbytes, char *data, int hdl){
00510 stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00511 if(hdl != -1) {
00512 hdlList[hdl]->acked = 1;
00513 if (hdlList[hdl]->wait == 1) {
00514 hdlList[hdl]->wait = 0;
00515 thread->resume();
00516 }
00517 }
00518 thread->resume();
00519 }
00520
00521 void ArmciVirtualProcessor::putDataFromGets(ArmciStridedMsg *m){
00522 stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00523 if(m->hdl != -1) {
00524 hdlList[m->hdl]->acked = 1;
00525 if (hdlList[m->hdl]->wait == 1) {
00526 hdlList[m->hdl]->wait = 0;
00527 thread->resume();
00528 }
00529 }
00530 delete m;
00531 thread->resume();
00532 }
00533
00534 void ArmciVirtualProcessor::notify(int proc){
00535 thisProxy[proc].sendNote(thisIndex);
00536 }
00537 void ArmciVirtualProcessor::sendNote(int proc){
00538
00539
00540
00541 int hasNote = -1;
00542 for(int i=0;i<noteList.size();i++){
00543 if(noteList[i]->proc == proc){
00544 hasNote = i;
00545 break;
00546 }
00547 }
00548 if(hasNote!=-1){
00549 (noteList[hasNote]->notified)++;
00550 } else {
00551 Armci_Note* newNote = new Armci_Note(proc, 0, 1);
00552 noteList.push_back(newNote);
00553 hasNote = noteList.size() - 1;
00554 }
00555 if(noteList[hasNote]->notified >= noteList[hasNote]->waited){
00556
00557
00558
00559
00560 thread->resume();
00561 }
00562 }
00563 void ArmciVirtualProcessor::notify_wait(int proc){
00564
00565
00566
00567 int hasNote = -1;
00568 for(int i=0;i<noteList.size();i++){
00569 if(noteList[i]->proc == proc){
00570 hasNote = i;
00571 break;
00572 }
00573 }
00574 if(hasNote!=-1){
00575 (noteList[hasNote]->waited)++;
00576 } else {
00577 Armci_Note* newNote = new Armci_Note(proc, 1, 0);
00578 noteList.push_back(newNote);
00579 hasNote = noteList.size() - 1;
00580 }
00581 if(noteList[hasNote]->notified < noteList[hasNote]->waited){
00582 thread->suspend();
00583 }
00584 }
00585
00586 void ArmciVirtualProcessor::pup(PUP::er &p) {
00587 TCharmClient1D::pup(p);
00588 CmiIsomallocBlockListPup(&p, &memBlock);
00589 p|thisProxy;
00590 p|hdlList;
00591 p|noteList;
00592 CkPupMessage(p, (void **)&addressReply, 1);
00593 }
00594
00595
00596 void ArmciVirtualProcessor::requestAddresses(pointer ptr, pointer ptr_arr[], int bytes) {
00597 int thisPE = armci_me;
00598 int numPE = armci_nproc;
00599
00600 addressReply = NULL;
00601 addressPair *pair = new addressPair;
00602 pair->pe = thisPE;
00603 pair->ptr = ptr;
00604
00605 CkCallback cb(CkIndex_ArmciVirtualProcessor::mallocClient(NULL),CkArrayIndex1D(0),thisProxy);
00606 contribute(sizeof(addressPair), pair, CkReduction::concat, cb);
00607
00608 while(addressReply==NULL) thread->suspend();
00609
00610
00611 for (int i=0; i<numPE; i++) {
00612 ptr_arr[i] = addressReply->addresses[i];
00613 }
00614 delete addressReply;
00615 addressReply = NULL;
00616 }
00617
00618 void ArmciVirtualProcessor::stridedCopy(void *base, void *buffer_ptr,
00619 int *stride, int *count, int stride_levels, bool flatten) {
00620 if (stride_levels == 0) {
00621 if (flatten) {
00622 memcpy(buffer_ptr, base, count[stride_levels]);
00623 } else {
00624 memcpy(base, buffer_ptr, count[stride_levels]);
00625 }
00626 } else {
00627 int mystride = 1;
00628 for(int i=0;i<stride_levels;i++)
00629 mystride *= count[i];
00630 for (int i=0; i<count[stride_levels]; i++) {
00631 stridedCopy((void *)((char *)base + stride[stride_levels-1]*i),
00632 (void *)((char *)buffer_ptr + mystride*i), stride, count, stride_levels-1, flatten);
00633 }
00634 }
00635 }
00636
00637
00638 void ArmciVirtualProcessor::mallocClient(CkReductionMsg *msg) {
00639 int numBlocks = msg->getSize()/sizeof(addressPair);
00640 addressPair *dataBlocks = (addressPair *)msg->getData();
00641 AddressMsg *addrmsg = new(numBlocks, 0) AddressMsg;
00642
00643 for (int i=0; i<numBlocks; i++) {
00644 addrmsg->addresses[dataBlocks[i].pe] = dataBlocks[i].ptr;
00645 }
00646
00647 thisProxy.getAddresses(addrmsg);
00648 delete msg;
00649 }
00650
00651
00652
00653
00654
00655
00656
00657
00658 void ArmciVirtualProcessor::msgBcast(void *buffer, int len, int root) {
00659 int me;
00660 ARMCI_Myid(&me);
00661 if (me == root) {
00662 thisProxy.recvMsgBcast(len, (char *)buffer, root);
00663 } else {
00664
00665 collectiveTmpBufferPtr = buffer;
00666 thread->suspend();
00667 }
00668 }
00669
00670
00671 void ArmciVirtualProcessor::recvMsgBcast(int len, char *buffer, int root) {
00672 int me;
00673 ARMCI_Myid(&me);
00674 if (me != root) {
00675
00676
00677
00678 collectiveTmpBufferPtr = memcpy(collectiveTmpBufferPtr, buffer, len);
00679 collectiveTmpBufferPtr = NULL;
00680 thread->resume();
00681 }
00682 }
00683
00684
00685
00686
00687 void ArmciVirtualProcessor::msgGop(void *x, int n, char *op, int type) {
00688 CkReduction::reducerType reducer;
00689 if (strcmp(op,"+") == 0) {
00690 } else if (strcmp(op,"*") == 0) {
00691 } else if (strcmp(op,"min") == 0) {
00692 } else if (strcmp(op,"max") == 0) {
00693 } else if (strcmp(op,"absmin") == 0) {
00694 } else if (strcmp(op,"absmax") == 0) {
00695 } else {
00696 CkPrintf("Operator %s not supported\n",op);
00697 CmiAbort("ARMCI ERROR: msgGop - Unknown operator\n");
00698 }
00699 switch (type) {
00700 case ARMCI_INT:
00701
00702 break;
00703 case ARMCI_LONG:
00704 break;
00705 case ARMCI_LONG_LONG:
00706 break;
00707 case ARMCI_FLOAT:
00708 break;
00709 case ARMCI_DOUBLE:
00710 break;
00711 default:
00712 CkPrintf("ARMCI Type %d not supported\n", type);
00713 CmiAbort("ARMCI ERROR: msgGop - Unknown type\n");
00714 }
00715 }
00716
00717
00718 class ckptClientStruct {
00719 public:
00720 const char *dname;
00721 ArmciVirtualProcessor *vp;
00722 ckptClientStruct(const char *s, ArmciVirtualProcessor *p): dname(s), vp(p) {}
00723 };
00724
00725 static void checkpointClient(void *param,void *msg)
00726 {
00727 ckptClientStruct *client = (ckptClientStruct*)param;
00728 const char *dname = client->dname;
00729 ArmciVirtualProcessor *vp = client->vp;
00730 vp->checkpoint(strlen(dname), dname);
00731 delete client;
00732 }
00733
00734 void ArmciVirtualProcessor::startCheckpoint(const char* dname){
00735 if (thisIndex==0) {
00736 ckptClientStruct *clientData = new ckptClientStruct(dname, this);
00737 CkCallback cb(checkpointClient, clientData);
00738 contribute(0, NULL, CkReduction::sum_int, cb);
00739 } else {
00740 contribute(0, NULL, CkReduction::sum_int);
00741 }
00742 thread->suspend();
00743 }
00744 void ArmciVirtualProcessor::checkpoint(int len, const char* dname){
00745 if (len == 0) {
00746 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00747 CkStartMemCheckpoint(cb);
00748 } else {
00749 char dirname[256];
00750 strncpy(dirname,dname,len);
00751 dirname[len]='\0';
00752 CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00753 CkStartCheckpoint(dirname,cb);
00754 }
00755 }
00756
00757 #include "armci.def.h"
00758