00001 #include <stdlib.h> 00002 #include <string.h> 00003 #include "converse.h" 00004 00005 typedef struct Cfuture_data_s 00006 { 00007 void *value; 00008 int ready; 00009 CthThread waiters; 00010 } 00011 *futdata; 00012 00013 typedef struct CfutureValue_s 00014 { 00015 char core[CmiMsgHeaderSizeBytes]; 00016 struct Cfuture_data_s *data; 00017 int valsize; 00018 double rest[1]; 00019 } 00020 *CfutureValue; 00021 00022 #define field_offset(t, f) ((size_t)(((t)0)->f)) 00023 #define void_to_value(v) ((CfutureValue)(((char*)v)-field_offset(CfutureValue,rest))) 00024 00025 CpvDeclare(int, CfutureStoreIndex); 00026 00027 Cfuture CfutureCreate(void) 00028 { 00029 futdata data = (futdata)malloc(sizeof(struct Cfuture_data_s)); 00030 Cfuture result; 00031 _MEMCHECK(data); 00032 data->value = 0; 00033 data->ready = 0; 00034 data->waiters = 0; 00035 result.pe = CmiMyPe(); 00036 result.data = data; 00037 return result; 00038 } 00039 00040 static void CfutureAwaken(futdata data, CfutureValue val) 00041 { 00042 CthThread t; 00043 data->value = val; 00044 data->ready = 1; 00045 for (t=data->waiters; t; t=CthGetNext(t)) 00046 CthAwaken(t); 00047 data->waiters=0; 00048 } 00049 00050 static void CfutureStore(CfutureValue m) 00051 { 00052 CfutureAwaken(m->data, m); 00053 } 00054 00055 void *CfutureCreateBuffer(int bytes) 00056 { 00057 int valsize = sizeof(struct CfutureValue_s) + bytes; 00058 CfutureValue m = (CfutureValue)CmiAlloc(valsize); 00059 CmiSetHandler(m, CpvAccess(CfutureStoreIndex)); 00060 m->valsize = valsize; 00061 return (void*)(m->rest); 00062 } 00063 00064 void CfutureDestroyBuffer(void *v) 00065 { 00066 CmiFree(v); 00067 } 00068 00069 void CfutureStoreBuffer(Cfuture f, void *value) 00070 { 00071 CfutureValue m = void_to_value(value); 00072 if (f.pe == CmiMyPe()) { 00073 CfutureAwaken(f.data, m); 00074 } else { 00075 m->data = f.data; 00076 CmiSyncSendAndFree(f.pe, m->valsize, m); 00077 } 00078 } 00079 00080 void CfutureSet(Cfuture f, void *value, int len) 00081 { 00082 void *copy = CfutureCreateBuffer(len); 00083 memcpy(copy, value, len); 00084 CfutureStoreBuffer(f, copy); 00085 } 00086 00087 void *CfutureWait(Cfuture f) 00088 { 00089 CthThread self; CfutureValue value; futdata data; 00090 if (f.pe != CmiMyPe()) { 00091 CmiPrintf("error: CfutureWait: future not local.\n"); 00092 exit(1); 00093 } 00094 data = f.data; 00095 if (data->ready == 0) { 00096 self = CthSelf(); 00097 CthSetNext(self, data->waiters); 00098 data->waiters = self; 00099 CthSuspend(); 00100 } 00101 value = (CfutureValue)data->value; 00102 return (void*)(value->rest); 00103 } 00104 00105 void CfutureDestroy(Cfuture f) 00106 { 00107 if (f.pe != CmiMyPe()) { 00108 CmiPrintf("error: CfutureDestroy: future not local.\n"); 00109 exit(1); 00110 } 00111 if (f.data->waiters) { 00112 CmiPrintf("error: CfutureDestroy: destroying an active future.\n"); 00113 exit(1); 00114 } 00115 if (f.data->value) CmiFree(f.data->value); 00116 free(f.data); 00117 } 00118 00119 extern "C" void CfutureModuleInit(void) 00120 { 00121 CpvInitialize(int, CfutureStoreIndex); 00122 CpvAccess(CfutureStoreIndex) = CmiRegisterHandler((CmiHandler)CfutureStore); 00123 }