00001 #include <stdlib.h>
00002 #include <string.h>
00003 #include "queueing.h"
00004 #include <converse.h>
00005
00006 static void CpmLSend(int pe, int len, void *msg)
00007 {
00008 if (pe==CpmALL) CmiSyncBroadcastAllAndFree(len, msg);
00009 else if (pe==CpmOTHERS) CmiSyncBroadcastAndFree(len, msg);
00010 else CmiSyncSendAndFree(pe,len,msg);
00011 }
00012
00013
00014
00015
00016
00017
00018
00019 typedef struct CpmDestinationSend_s
00020 {
00021 void *(*sendfn)(struct CpmDestinationSend_s *, int, void *);
00022 int envsize;
00023 int pe;
00024 }
00025 *CpmDestinationSend;
00026
00027 typedef struct CpmDestinationSend_s DestinationSend;
00028
00029 void *CpmSend1(CpmDestinationSend ctrl, int len, void *msg)
00030 {
00031 CpmLSend(ctrl->pe, len, msg);
00032 return (void *) 0;
00033 }
00034
00035 CpvStaticDeclare(DestinationSend, ctrlSend);
00036
00037 CpmDestination CpmSend(int pe)
00038 {
00039 CpvAccess(ctrlSend).envsize = 0;
00040 CpvAccess(ctrlSend).sendfn = CpmSend1;
00041 CpvAccess(ctrlSend).pe = pe;
00042 return (CpmDestination)&CpvAccess(ctrlSend);
00043 }
00044
00045
00046
00047
00048
00049
00050
00051 typedef struct CpmDestinationEnq_s
00052 {
00053 void *(*sendfn)(struct CpmDestinationEnq_s *, int, void *);
00054 int envsize;
00055 int pe, qs, priobits;
00056 unsigned int *prioptr;
00057 }
00058 *CpmDestinationEnq;
00059
00060 typedef struct CpmDestinationEnq_s DestinationEnq;
00061
00062 CpvStaticDeclare(DestinationEnq, ctrlEnq);
00063
00064 CpvDeclare(int, CpmEnqueue2_Index);
00065
00066 void CpmEnqueue2(void *msg)
00067 {
00068 int *env;
00069 env = (int *)CpmEnv(msg);
00070 CmiSetHandler(msg, env[0]);
00071 CsdEnqueueGeneral(msg, env[1], env[2], (unsigned int *)(env+3));
00072 }
00073
00074 void *CpmEnqueue1(CpmDestinationEnq ctrl, int len, void *msg)
00075 {
00076 int *env = (int *)CpmEnv(msg);
00077 int intbits = sizeof(int)*8;
00078 int priobits = ctrl->priobits;
00079 int prioints = (priobits+intbits-1) / intbits;
00080 env[0] = CmiGetHandler(msg);
00081 env[1] = ctrl->qs;
00082 env[2] = ctrl->priobits;
00083 memcpy(env+3, ctrl->prioptr, prioints*sizeof(int));
00084 CmiSetHandler(msg, CpvAccess(CpmEnqueue2_Index));
00085 CpmLSend(ctrl->pe, len, msg);
00086 return (void *) 0;
00087 }
00088
00089 CpmDestination CpmEnqueue(int pe, int qs, int priobits,unsigned int *prioptr)
00090 {
00091 int intbits = sizeof(int)*8;
00092 int prioints = (priobits+intbits-1) / intbits;
00093 CpvAccess(ctrlEnq).envsize = (3+prioints)*sizeof(int);
00094 CpvAccess(ctrlEnq).sendfn = CpmEnqueue1;
00095 CpvAccess(ctrlEnq).pe = pe; CpvAccess(ctrlEnq).qs = qs;
00096 CpvAccess(ctrlEnq).priobits = priobits; CpvAccess(ctrlEnq).prioptr = prioptr;
00097 return (CpmDestination)&CpvAccess(ctrlEnq);
00098 }
00099
00100 CpvStaticDeclare(unsigned int, fiprio);
00101
00102 CpmDestination CpmEnqueueIFIFO(int pe, int prio)
00103 {
00104 CpvAccess(fiprio) = prio;
00105 return CpmEnqueue(pe, CQS_QUEUEING_IFIFO, sizeof(int)*8, &CpvAccess(fiprio));
00106 }
00107
00108 CpvStaticDeclare(unsigned int, liprio);
00109
00110 CpmDestination CpmEnqueueILIFO(int pe, int prio)
00111 {
00112 CpvAccess(liprio) = prio;
00113 return CpmEnqueue(pe, CQS_QUEUEING_ILIFO, sizeof(int)*8, &CpvAccess(liprio));
00114 }
00115
00116 CpmDestination CpmEnqueueBFIFO(int pe, int priobits,unsigned int *prioptr)
00117 {
00118 return CpmEnqueue(pe, CQS_QUEUEING_BFIFO, priobits, prioptr);
00119 }
00120
00121 CpmDestination CpmEnqueueBLIFO(int pe, int priobits,unsigned int *prioptr)
00122 {
00123 return CpmEnqueue(pe, CQS_QUEUEING_BLIFO, priobits, prioptr);
00124 }
00125
00126 CpmDestination CpmEnqueueLFIFO(int pe, int priobits,unsigned int *prioptr)
00127 {
00128 return CpmEnqueue(pe, CQS_QUEUEING_LFIFO, priobits, prioptr);
00129 }
00130
00131 CpmDestination CpmEnqueueLLIFO(int pe, int priobits,unsigned int *prioptr)
00132 {
00133 return CpmEnqueue(pe, CQS_QUEUEING_LLIFO, priobits, prioptr);
00134 }
00135
00136
00137
00138
00139
00140
00141
00142 CpvDeclare(int, CpmEnqueueFIFO2_Index);
00143
00144 void CpmEnqueueFIFO2(void *msg)
00145 {
00146 int *env;
00147 env = (int *)CpmEnv(msg);
00148 CmiSetHandler(msg, env[0]);
00149 CsdEnqueueFifo(msg);
00150 }
00151
00152 void *CpmEnqueueFIFO1(CpmDestinationSend ctrl, int len, void *msg)
00153 {
00154 int *env = (int *)CpmEnv(msg);
00155 env[0] = CmiGetHandler(msg);
00156 CmiSetHandler(msg, CpvAccess(CpmEnqueueFIFO2_Index));
00157 CpmLSend(ctrl->pe, len, msg);
00158 return (void *) 0;
00159 }
00160
00161 CpvStaticDeclare(DestinationSend, ctrlFIFO);
00162
00163 CpmDestination CpmEnqueueFIFO(int pe)
00164 {
00165 CpvAccess(ctrlFIFO).envsize = sizeof(int);
00166 CpvAccess(ctrlFIFO).sendfn = CpmEnqueueFIFO1;
00167 CpvAccess(ctrlFIFO).pe = pe;
00168 return (CpmDestination)&CpvAccess(ctrlFIFO);
00169 }
00170
00171
00172
00173
00174
00175
00176
00177 CpvDeclare(int, CpmEnqueueLIFO2_Index);
00178
00179 void CpmEnqueueLIFO2(void *msg)
00180 {
00181 int *env;
00182 env = (int *)CpmEnv(msg);
00183 CmiSetHandler(msg, env[0]);
00184 CsdEnqueueLifo(msg);
00185 }
00186
00187 void *CpmEnqueueLIFO1(CpmDestinationSend ctrl, int len, void *msg)
00188 {
00189 int *env = (int *)CpmEnv(msg);
00190 env[0] = CmiGetHandler(msg);
00191 CmiSetHandler(msg, CpvAccess(CpmEnqueueLIFO2_Index));
00192 CpmLSend(ctrl->pe, len, msg);
00193 return (void *) 0;
00194 }
00195
00196 CpvStaticDeclare(DestinationSend, ctrlLIFO);
00197
00198 CpmDestination CpmEnqueueLIFO(int pe)
00199 {
00200 CpvAccess(ctrlLIFO).envsize = sizeof(int);
00201 CpvAccess(ctrlLIFO).sendfn = CpmEnqueueLIFO1;
00202 CpvAccess(ctrlLIFO).pe = pe;
00203 return (CpmDestination)&CpvAccess(ctrlLIFO);
00204 }
00205
00206
00207
00208
00209
00210
00211
00212 CpvDeclare(int, CpmThread2_Index);
00213
00214 void CpmThread3(void *msg)
00215 {
00216 int *env = (int *)CpmEnv(msg);
00217 CmiHandlerInfo *h=&CmiHandlerToInfo(env[0]);
00218 (h->hdlr)(msg,h->userPtr);
00219 CthFree(CthSelf()); CthSuspend();
00220 }
00221
00222 void CpmThread2(void *msg)
00223 {
00224 CthThread t;
00225 t = CthCreate(CpmThread3, msg, 0);
00226 CthSetStrategyDefault(t); CthAwaken(t);
00227 }
00228
00229 void *CpmThread1(CpmDestinationSend ctrl, int len, void *msg)
00230 {
00231 int *env = (int *)CpmEnv(msg);
00232 env[0] = CmiGetHandler(msg);
00233 CmiSetHandler(msg, CpvAccess(CpmThread2_Index));
00234 CpmLSend(ctrl->pe, len, msg);
00235 return (void *) 0;
00236 }
00237
00238 CpvStaticDeclare(DestinationSend, ctrlThread);
00239
00240 CpmDestination CpmMakeThread(int pe)
00241 {
00242 CpvAccess(ctrlThread).envsize = sizeof(int);
00243 CpvAccess(ctrlThread).sendfn = CpmThread1;
00244 CpvAccess(ctrlThread).pe = pe;
00245 return (CpmDestination)&CpvAccess(ctrlThread);
00246 }
00247
00248
00249
00250
00251
00252
00253
00254 CpvDeclare(int, CpmThreadSize2_Index);
00255
00256 typedef struct CpmDestinationThreadSize_s
00257 {
00258 void *(*sendfn)(struct CpmDestinationThreadSize_s *, int, void *);
00259 int envsize;
00260 int pe;
00261 int size;
00262 }
00263 *CpmDestinationThreadSize;
00264
00265 typedef struct CpmDestinationThreadSize_s DestinationThreadSize;
00266
00267 void CpmThreadSize2(void *msg)
00268 {
00269 int *env = (int *)CpmEnv(msg);
00270 CthThread t;
00271 t = CthCreate(CpmThread3, msg, env[1]);
00272 CthSetStrategyDefault(t); CthAwaken(t);
00273 }
00274
00275 void *CpmThreadSize1(CpmDestinationThreadSize ctrl, int len, void *msg)
00276 {
00277 int *env = (int *)CpmEnv(msg);
00278 env[0] = CmiGetHandler(msg);
00279 env[1] = ctrl->size;
00280 CmiSetHandler(msg, CpvAccess(CpmThreadSize2_Index));
00281 CpmLSend(ctrl->pe, len, msg);
00282 return (void *) 0;
00283 }
00284
00285 CpvStaticDeclare(DestinationThreadSize, ctrlThreadSize);
00286
00287 CpmDestination CpmMakeThreadSize(int pe, int size)
00288 {
00289 CpvAccess(ctrlThreadSize).envsize = 2*sizeof(int);
00290 CpvAccess(ctrlThreadSize).sendfn = CpmThreadSize1;
00291 CpvAccess(ctrlThreadSize).pe = pe;
00292 CpvAccess(ctrlThreadSize).size = size;
00293 return (CpmDestination)&CpvAccess(ctrlThreadSize);
00294 }
00295
00296
00297
00298
00299
00300
00301
00302 extern "C" void CpmModuleInit(void)
00303 {
00304 CpvInitialize(int, CpmThread2_Index);
00305 CpvAccess(CpmThread2_Index) = CmiRegisterHandler(CpmThread2);
00306 CpvInitialize(int, CpmThreadSize2_Index);
00307 CpvAccess(CpmThreadSize2_Index) = CmiRegisterHandler(CpmThreadSize2);
00308 CpvInitialize(int, CpmEnqueueFIFO2_Index);
00309 CpvAccess(CpmEnqueueFIFO2_Index) = CmiRegisterHandler(CpmEnqueueFIFO2);
00310 CpvInitialize(int, CpmEnqueueLIFO2_Index);
00311 CpvAccess(CpmEnqueueLIFO2_Index) = CmiRegisterHandler(CpmEnqueueLIFO2);
00312 CpvInitialize(int, CpmEnqueue2_Index);
00313 CpvAccess(CpmEnqueue2_Index) = CmiRegisterHandler(CpmEnqueue2);
00314 CpvInitialize(DestinationSend, ctrlSend);
00315 CpvInitialize(DestinationEnq, ctrlEnq);
00316 CpvInitialize(DestinationSend, ctrlFIFO);
00317 CpvInitialize(DestinationSend, ctrlLIFO);
00318 CpvInitialize(DestinationSend, ctrlThread);
00319 CpvInitialize(DestinationThreadSize, ctrlThreadSize);
00320 CpvInitialize(unsigned int, fiprio);
00321 CpvInitialize(unsigned int, liprio);
00322 }
00323