00001 #ifndef MSG_Q_H
00002 #define MSG_Q_H
00003
00004 #include <deque>
00005 #include <queue>
00006 #include <ostream>
00007 #include <functional>
00008 #include <limits>
00009
00010 #include <unordered_map>
00011
00012 namespace conv {
00013
00014
00015
00016 typedef void msg_t;
00017
00035 #if !CMK_NO_MSG_PRIOS
00036 template <typename P = int>
00037 class msgQ
00038 {
00039 public:
00041 typedef P prio_t;
00042
00044 msgQ(): qSize(0) {}
00045
00047 void enq(const msg_t *msg, const prio_t &prio = prio_t(), const bool isFifo = true);
00048
00050 const msg_t* deq();
00051
00053 inline size_t size() const { return qSize; }
00054 inline size_t max_size() const { return std::numeric_limits<size_t>::max(); }
00055
00057 inline bool empty() const { return (0 == qSize); }
00058
00060 void enumerate(msg_t **first, msg_t **last) const;
00061
00063 friend std::ostream& operator<< (std::ostream &out, const msgQ &q)
00064 {
00065 out <<"\nmsgQ[" << q.qSize << "]:";
00066 out<<"\n";
00067 return out;
00068 }
00069
00070 private:
00071 #if CMK_RANDOMIZED_MSGQ
00073 std::vector<const msg_t*> randQ;
00074 #endif
00075
00077 typedef std::deque<const msg_t*> bkt_t;
00079 typedef typename std::pair<prio_t, bkt_t*> prioidx_t;
00081 typedef typename std::unordered_map<prio_t, bkt_t> bktmap_t;
00082
00084 size_t qSize;
00086 bktmap_t msgbuckets;
00088 std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
00089 };
00090
00091
00092
00093 template <typename P>
00094 void msgQ<P>::enq(const msg_t *msg, const prio_t &prio, const bool isFifo)
00095 {
00096 #if ! CMK_RANDOMIZED_MSGQ
00097
00098 bkt_t &bkt = msgbuckets[prio];
00099
00100 if (bkt.empty())
00101 prioQ.push( std::make_pair(prio, &bkt) );
00102
00103 isFifo ? bkt.push_back(msg) : bkt.push_front(msg);
00104 #else
00105 randQ.push_back(msg);
00106 #endif
00107
00108
00109 qSize++;
00110 }
00111
00112
00113
00114 #if ! CMK_RANDOMIZED_MSGQ // charm NOT built with a randomized queue
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137 #define CMK_ITERATIVE_MSG_PRIOS 1
00138
00139 template <typename P>
00140 const msg_t* msgQ<P>::deq()
00141 {
00142 if (empty())
00143 return NULL;
00144
00145 bkt_t &bkt = * prioQ.top().second;
00146
00147 const msg_t *msg = bkt.front();
00148 bkt.pop_front();
00149
00150 if (bkt.empty())
00151 {
00152 #if ! CMK_ITERATIVE_MSG_PRIOS
00153
00154 msgbuckets.erase( prioQ.top().first );
00155 #endif
00156
00157 prioQ.pop();
00158 }
00159
00160 qSize--;
00161 return msg;
00162 }
00163
00164 #else // If charm is built with a randomized msg queue
00165
00173 template <typename P>
00174 const msg_t* msgQ<P>::deq()
00175 {
00176 if (empty())
00177 return NULL;
00178
00179 #if defined(_WIN64) || defined(_WIN32)
00180 long rnd = rand() % randQ.size();
00181 #else
00182 long rnd = lrand48() % randQ.size();
00183 #endif
00184 const msg_t *msg = randQ[rnd];
00185 randQ[rnd] = randQ[randQ.size()-1];
00186 randQ.pop_back();
00187
00188
00189 qSize--;
00190 return msg;
00191 }
00192
00193 #endif // CMK_RANDOMIZED_MSGQ
00194
00195 template <typename P>
00196 void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
00197 {
00198 if (first >= last)
00199 return;
00200
00201 msg_t **ptr = first;
00202
00203 #if !CMK_RANDOMIZED_MSGQ
00204 for (typename bktmap_t::const_iterator bktitr = msgbuckets.begin();
00205 ptr != last && bktitr != msgbuckets.end(); bktitr++)
00206 {
00207 bkt_t::const_iterator msgitr = bktitr->second.begin();
00208 while (ptr != last && msgitr != bktitr->second.end())
00209 *ptr++ = (msg_t*) *msgitr++;
00210 }
00211 #else
00212 for (size_t i = 0; i < randQ.size() && ptr != last; ++i, ++ptr)
00213 *ptr = (msg_t*) randQ[i];
00214 #endif
00215 }
00216
00217 #else //CMK_NO_MSG_PRIOS
00218 template <typename P = int>
00219 class msgQ
00220 {
00221 public:
00222 typedef P prio_t;
00223 msgQ() {}
00224 inline size_t size() const { return bkt.size(); }
00225 inline size_t max_size() const { return bkt.max_size(); }
00226 inline bool empty() const { return bkt.empty(); }
00227 void enumerate(msg_t **first, msg_t **last) const {}
00228 friend std::ostream& operator<< (std::ostream &out, const msgQ &q) { return out; }
00229
00230 inline void enq(const msg_t *msg, const prio_t &prio = prio_t(), const bool isFifo = true) {
00231 bkt.push_back(msg);
00232 }
00233
00234 inline const msg_t* deq() {
00235 if (bkt.empty())
00236 return NULL;
00237 const msg_t *msg = bkt.front();
00238 bkt.pop_front();
00239 return msg;
00240 }
00241
00242 private:
00243 std::deque<const msg_t*> bkt;
00244 };
00245 #endif //CMK_NO_MSG_PRIOS
00246
00247 }
00248
00249 #endif // MSG_Q_H
00250