00001 #include <stdlib.h>
00002 #include <string.h>
00003 #include "queueing.h"
00004 #include <converse.h>
00005
00006 static void CpmLSend(int pe, int len, void *msg)
00007 {
00008 if (pe==CpmALL) CmiSyncBroadcastAllAndFree(len, msg);
00009 else if (pe==CpmOTHERS) CmiSyncBroadcastAndFree(len, msg);
00010 else CmiSyncSendAndFree(pe,len,msg);
00011 }
00012
00013
00014
00015
00016
00017
00018
00019 typedef struct CpmDestinationSend
00020 {
00021 void *(*sendfn)();
00022 int envsize;
00023 int pe;
00024 }
00025 *CpmDestinationSend;
00026
00027 typedef struct CpmDestinationSend DestinationSend;
00028
00029 void CpmSend1(CpmDestinationSend ctrl, int len, void *msg)
00030 {
00031 CpmLSend(ctrl->pe, len, msg);
00032 }
00033
00034 CpvStaticDeclare(DestinationSend, ctrlSend);
00035
00036 CpmDestination CpmSend(int pe)
00037 {
00038 CpvAccess(ctrlSend).envsize = 0;
00039 CpvAccess(ctrlSend).sendfn = (CpmSender)CpmSend1;
00040 CpvAccess(ctrlSend).pe = pe;
00041 return (CpmDestination)&CpvAccess(ctrlSend);
00042 }
00043
00044
00045
00046
00047
00048
00049
00050 typedef struct CpmDestinationEnq
00051 {
00052 void *(*sendfn)();
00053 int envsize;
00054 int pe, qs, priobits;
00055 unsigned int *prioptr;
00056 }
00057 *CpmDestinationEnq;
00058
00059 typedef struct CpmDestinationEnq DestinationEnq;
00060
00061 CpvStaticDeclare(DestinationEnq, ctrlEnq);
00062
00063 CpvDeclare(int, CpmEnqueue2_Index);
00064
00065 void CpmEnqueue2(void *msg)
00066 {
00067 int *env;
00068 env = (int *)CpmEnv(msg);
00069 CmiSetHandler(msg, env[0]);
00070 CsdEnqueueGeneral(msg, env[1], env[2], (unsigned int *)(env+3));
00071 }
00072
00073 void *CpmEnqueue1(CpmDestinationEnq ctrl, int len, void *msg)
00074 {
00075 int *env = (int *)CpmEnv(msg);
00076 int intbits = sizeof(int)*8;
00077 int priobits = ctrl->priobits;
00078 int prioints = (priobits+intbits-1) / intbits;
00079 env[0] = CmiGetHandler(msg);
00080 env[1] = ctrl->qs;
00081 env[2] = ctrl->priobits;
00082 memcpy(env+3, ctrl->prioptr, prioints*sizeof(int));
00083 CmiSetHandler(msg, CpvAccess(CpmEnqueue2_Index));
00084 CpmLSend(ctrl->pe, len, msg);
00085 return (void *) 0;
00086 }
00087
00088 CpmDestination CpmEnqueue(int pe, int qs, int priobits,unsigned int *prioptr)
00089 {
00090 int intbits = sizeof(int)*8;
00091 int prioints = (priobits+intbits-1) / intbits;
00092 CpvAccess(ctrlEnq).envsize = (3+prioints)*sizeof(int);
00093 CpvAccess(ctrlEnq).sendfn = CpmEnqueue1;
00094 CpvAccess(ctrlEnq).pe = pe; CpvAccess(ctrlEnq).qs = qs;
00095 CpvAccess(ctrlEnq).priobits = priobits; CpvAccess(ctrlEnq).prioptr = prioptr;
00096 return (CpmDestination)&CpvAccess(ctrlEnq);
00097 }
00098
00099 CpvStaticDeclare(unsigned int, fiprio);
00100
00101 CpmDestination CpmEnqueueIFIFO(int pe, int prio)
00102 {
00103 CpvAccess(fiprio) = prio;
00104 return CpmEnqueue(pe, CQS_QUEUEING_IFIFO, sizeof(int)*8, &CpvAccess(fiprio));
00105 }
00106
00107 CpvStaticDeclare(unsigned int, liprio);
00108
00109 CpmDestination CpmEnqueueILIFO(int pe, int prio)
00110 {
00111 CpvAccess(liprio) = prio;
00112 return CpmEnqueue(pe, CQS_QUEUEING_ILIFO, sizeof(int)*8, &CpvAccess(liprio));
00113 }
00114
00115 CpmDestination CpmEnqueueBFIFO(int pe, int priobits,unsigned int *prioptr)
00116 {
00117 return CpmEnqueue(pe, CQS_QUEUEING_BFIFO, priobits, prioptr);
00118 }
00119
00120 CpmDestination CpmEnqueueBLIFO(int pe, int priobits,unsigned int *prioptr)
00121 {
00122 return CpmEnqueue(pe, CQS_QUEUEING_BLIFO, priobits, prioptr);
00123 }
00124
00125 CpmDestination CpmEnqueueLFIFO(int pe, int priobits,unsigned int *prioptr)
00126 {
00127 return CpmEnqueue(pe, CQS_QUEUEING_LFIFO, priobits, prioptr);
00128 }
00129
00130 CpmDestination CpmEnqueueLLIFO(int pe, int priobits,unsigned int *prioptr)
00131 {
00132 return CpmEnqueue(pe, CQS_QUEUEING_LLIFO, priobits, prioptr);
00133 }
00134
00135
00136
00137
00138
00139
00140
00141 CpvDeclare(int, CpmEnqueueFIFO2_Index);
00142
00143 void CpmEnqueueFIFO2(void *msg)
00144 {
00145 int *env;
00146 env = (int *)CpmEnv(msg);
00147 CmiSetHandler(msg, env[0]);
00148 CsdEnqueueFifo(msg);
00149 }
00150
00151 void *CpmEnqueueFIFO1(CpmDestinationSend ctrl, int len, void *msg)
00152 {
00153 int *env = (int *)CpmEnv(msg);
00154 env[0] = CmiGetHandler(msg);
00155 CmiSetHandler(msg, CpvAccess(CpmEnqueueFIFO2_Index));
00156 CpmLSend(ctrl->pe, len, msg);
00157 return (void *) 0;
00158 }
00159
00160 CpvStaticDeclare(DestinationSend, ctrlFIFO);
00161
00162 CpmDestination CpmEnqueueFIFO(int pe)
00163 {
00164 CpvAccess(ctrlFIFO).envsize = sizeof(int);
00165 CpvAccess(ctrlFIFO).sendfn = CpmEnqueueFIFO1;
00166 CpvAccess(ctrlFIFO).pe = pe;
00167 return (CpmDestination)&CpvAccess(ctrlFIFO);
00168 }
00169
00170
00171
00172
00173
00174
00175
00176 CpvDeclare(int, CpmEnqueueLIFO2_Index);
00177
00178 void CpmEnqueueLIFO2(void *msg)
00179 {
00180 int *env;
00181 env = (int *)CpmEnv(msg);
00182 CmiSetHandler(msg, env[0]);
00183 CsdEnqueueLifo(msg);
00184 }
00185
00186 void *CpmEnqueueLIFO1(CpmDestinationSend ctrl, int len, void *msg)
00187 {
00188 int *env = (int *)CpmEnv(msg);
00189 env[0] = CmiGetHandler(msg);
00190 CmiSetHandler(msg, CpvAccess(CpmEnqueueLIFO2_Index));
00191 CpmLSend(ctrl->pe, len, msg);
00192 return (void *) 0;
00193 }
00194
00195 CpvStaticDeclare(DestinationSend, ctrlLIFO);
00196
00197 CpmDestination CpmEnqueueLIFO(int pe)
00198 {
00199 CpvAccess(ctrlLIFO).envsize = sizeof(int);
00200 CpvAccess(ctrlLIFO).sendfn = CpmEnqueueLIFO1;
00201 CpvAccess(ctrlLIFO).pe = pe;
00202 return (CpmDestination)&CpvAccess(ctrlLIFO);
00203 }
00204
00205
00206
00207
00208
00209
00210
00211 CpvDeclare(int, CpmThread2_Index);
00212
00213 void CpmThread3(void *msg)
00214 {
00215 int *env = (int *)CpmEnv(msg);
00216 CmiHandlerInfo *h=&CmiHandlerToInfo(env[0]);
00217 (h->hdlr)(msg,h->userPtr);
00218 CmiFree(msg);
00219 CthFree(CthSelf()); CthSuspend();
00220 }
00221
00222 void CpmThread2(void *msg)
00223 {
00224 CthThread t;
00225 t = CthCreate(CpmThread3, msg, 0);
00226 CthSetStrategyDefault(t); CthAwaken(t);
00227 }
00228
00229 void CpmThread1(CpmDestinationSend ctrl, int len, void *msg)
00230 {
00231 int *env = (int *)CpmEnv(msg);
00232 env[0] = CmiGetHandler(msg);
00233 CmiSetHandler(msg, CpvAccess(CpmThread2_Index));
00234 CpmLSend(ctrl->pe, len, msg);
00235 }
00236
00237 CpvStaticDeclare(DestinationSend, ctrlThread);
00238
00239 CpmDestination CpmMakeThread(int pe)
00240 {
00241 CpvAccess(ctrlThread).envsize = sizeof(int);
00242 CpvAccess(ctrlThread).sendfn = (CpmSender)CpmThread1;
00243 CpvAccess(ctrlThread).pe = pe;
00244 return (CpmDestination)&CpvAccess(ctrlThread);
00245 }
00246
00247
00248
00249
00250
00251
00252
00253 CpvDeclare(int, CpmThreadSize2_Index);
00254
00255 typedef struct CpmDestinationThreadSize
00256 {
00257 void *(*sendfn)();
00258 int envsize;
00259 int pe;
00260 int size;
00261 }
00262 *CpmDestinationThreadSize;
00263
00264 typedef struct CpmDestinationThreadSize DestinationThreadSize;
00265
00266 void CpmThreadSize2(void *msg)
00267 {
00268 int *env = (int *)CpmEnv(msg);
00269 CthThread t;
00270 t = CthCreate(CpmThread3, msg, env[1]);
00271 CthSetStrategyDefault(t); CthAwaken(t);
00272 }
00273
00274 void CpmThreadSize1(CpmDestinationThreadSize ctrl, int len, void *msg)
00275 {
00276 int *env = (int *)CpmEnv(msg);
00277 env[0] = CmiGetHandler(msg);
00278 env[1] = ctrl->size;
00279 CmiSetHandler(msg, CpvAccess(CpmThreadSize2_Index));
00280 CpmLSend(ctrl->pe, len, msg);
00281 }
00282
00283 CpvStaticDeclare(DestinationThreadSize, ctrlThreadSize);
00284
00285 CpmDestination CpmMakeThreadSize(int pe, int size)
00286 {
00287 CpvAccess(ctrlThreadSize).envsize = 2*sizeof(int);
00288 CpvAccess(ctrlThreadSize).sendfn = (CpmSender)CpmThreadSize1;
00289 CpvAccess(ctrlThreadSize).pe = pe;
00290 CpvAccess(ctrlThreadSize).size = size;
00291 return (CpmDestination)&CpvAccess(ctrlThreadSize);
00292 }
00293
00294
00295
00296
00297
00298
00299
00300 void CpmModuleInit()
00301 {
00302 CpvInitialize(int, CpmThread2_Index);
00303 CpvAccess(CpmThread2_Index) = CmiRegisterHandler(CpmThread2);
00304 CpvInitialize(int, CpmThreadSize2_Index);
00305 CpvAccess(CpmThreadSize2_Index) = CmiRegisterHandler(CpmThreadSize2);
00306 CpvInitialize(int, CpmEnqueueFIFO2_Index);
00307 CpvAccess(CpmEnqueueFIFO2_Index) = CmiRegisterHandler(CpmEnqueueFIFO2);
00308 CpvInitialize(int, CpmEnqueueLIFO2_Index);
00309 CpvAccess(CpmEnqueueLIFO2_Index) = CmiRegisterHandler(CpmEnqueueLIFO2);
00310 CpvInitialize(int, CpmEnqueue2_Index);
00311 CpvAccess(CpmEnqueue2_Index) = CmiRegisterHandler(CpmEnqueue2);
00312 CpvInitialize(DestinationSend, ctrlSend);
00313 CpvInitialize(DestinationEnq, ctrlEnq);
00314 CpvInitialize(DestinationSend, ctrlFIFO);
00315 CpvInitialize(DestinationSend, ctrlLIFO);
00316 CpvInitialize(DestinationSend, ctrlThread);
00317 CpvInitialize(DestinationThreadSize, ctrlThreadSize);
00318 CpvInitialize(unsigned int, fiprio);
00319 CpvInitialize(unsigned int, liprio);
00320 }
00321