int consumer(Response * r, int refnum)
{
CmiPrintf("consumer: response with refnum = %d is %f\n", refnum,
r->result);
}
main(int argc, char *argv[])
{
int i, j, k, ref;
/* 2nd parameter is the max number of tasks
* fired before "awaitResponses"
*/
CmsInit((CmsWorkerFn)worker, 20);
if (CmiMyPe() == 0) { /* I am the manager */
CmiPrintf("manager inited\n");
for (i = 0; i < 3; i++) { /* number of iterations or phases */
/* prepare the next generation of problems to solve */
/* then, fire the next batch of tasks for the worker */
for (j = 0; j < 5; j++) {
t.a = 10 * i + j;
ref = j; /* a ref number to associate with the task, */
/* so that the reponse for this task can be identified. */
CmsFireTask(ref, &t, sizeof(t));
}
/* Now wait for the responses */
CmsAwaitResponses(); /* allows proc 0 to be used as a worker. */
/* Now extract the resoneses from the system */
for (j = 0; j < 5; j++) {
Response *r = (Response *) CmsGetResponse(j);
CmiPrintf("Response %d is: %f \n", j, r->result);
}
/* End of one mast-slave phase */
CmiPrintf("End of phase %d\n", i);
}
}
CmiPrintf("Now the consumerFunction mode\n");
if (CmiMyPe() == 0) { /* I am the manager */
for (i = 0; i < 3; i++) {
t.a = 5 + i;
CmsFireTask(i, &t, sizeof(t));
}
CmsProcessResponses((CmsConsumerFn)consumer);
/* Also allows proc. 0 to be used as a worker.
* In addition, responses will be processed on processor 0
* via the "consumer" function as soon as they are available
*/
}
CmsExit();
}