2323#ifndef __MT_REQUEST_QUEUE_H__
2424#define __MT_REQUEST_QUEUE_H__
2525
26- #include < queue >
26+ #include < deque >
2727#include " sys/Thread.h"
2828#include " sys/ConditionVar.h"
2929#include " sys/Mutex.h"
@@ -51,21 +51,38 @@ namespace mt
5151template <typename T>
5252struct RequestQueue
5353{
54+ public:
5455 // ! Default constructor
5556 RequestQueue () :
5657 mAvailableSpace (&mQueueLock ),
5758 mAvailableItems (&mQueueLock )
5859 {
5960 }
6061
61- // Put a (copy of, unless T is a pointer) request on the queue
62+ // ! Puts the request at the front of the queue
63+ void priorityEnqueue (T request)
64+ {
65+ #ifdef THREAD_DEBUG
66+ dbg_printf (" Locking (enqueue)\n " );
67+ #endif
68+ mQueueLock .lock ();
69+ mRequestQueue .push_front (request);
70+ #ifdef THREAD_DEBUG
71+ dbg_printf (" Unlocking (enqueue), new size [%d]\n " , mRequestQueue .size ());
72+ #endif
73+ mQueueLock .unlock ();
74+
75+ mAvailableItems .signal ();
76+ }
77+
78+ // ! Put a (copy of, unless T is a pointer) request on the queue
6279 void enqueue (T request)
6380 {
6481#ifdef THREAD_DEBUG
6582 dbg_printf (" Locking (enqueue)\n " );
6683#endif
6784 mQueueLock .lock ();
68- mRequestQueue .push (request);
85+ mRequestQueue .push_back (request);
6986#ifdef THREAD_DEBUG
7087 dbg_printf (" Unlocking (enqueue), new size [%d]\n " , mRequestQueue .size ());
7188#endif
@@ -74,7 +91,7 @@ struct RequestQueue
7491 mAvailableItems .signal ();
7592 }
7693
77- // Retrieve (by reference) T from the queue. blocks until ok
94+ // ! Retrieve (by reference) T from the queue. blocks until ok
7895 void dequeue (T& request)
7996 {
8097#ifdef THREAD_DEBUG
@@ -87,22 +104,70 @@ struct RequestQueue
87104 }
88105
89106 request = mRequestQueue .front ();
90- mRequestQueue .pop ();
107+ mRequestQueue .pop_front ();
91108
92109#ifdef THREAD_DEBUG
93110 dbg_printf (" Unlocking (dequeue), new size [%d]\n " , mRequestQueue .size ());
94111#endif
95112 mQueueLock .unlock ();
96113 mAvailableSpace .signal ();
97114 }
115+
116+ // ! Retrieves a copy of the n'th item from the front of the queue (0 = first item) without removing it
117+ T peek (size_t n = 0 )
118+ {
119+ T request;
120+ #ifdef THREAD_DEBUG
121+ dbg_printf (" Locking (peek)\n " );
122+ #endif
123+ mQueueLock .lock ();
124+ if (mRequestQueue .size () > n)
125+ {
126+ request = mRequestQueue [n];
127+ }
128+ else
129+ {
130+ mQueueLock .unlock ();
131+ throw except::Exception (Ctxt (" Request queue cannot peek beyond end of queue" ));
132+ }
133+ mQueueLock .unlock ();
134+ #ifdef THREAD_DEBUG
135+ dbg_printf (" Unlocking (peek)\n " );
136+ #endif
137+
138+ return request;
139+ }
98140
99- // Check to see if its empty
141+ // ! Lets the n'th request from the front cut in line and dequeue
142+ void cutAndDequeue (size_t n, T& request)
143+ {
144+ #ifdef THREAD_DEBUG
145+ dbg_printf (" Locking (peek)\n " );
146+ #endif
147+ mQueueLock .lock ();
148+ if (mRequestQueue .size () > n)
149+ {
150+ request = mRequestQueue [n];
151+ mRequestQueue .erase (mRequestQueue .begin ()+n);
152+ }
153+ else
154+ {
155+ mQueueLock .unlock ();
156+ throw except::Exception (Ctxt (" Request queue cannot access beyond end of queue" ));
157+ }
158+ mQueueLock .unlock ();
159+ #ifdef THREAD_DEBUG
160+ dbg_printf (" Unlocking (peek)\n " );
161+ #endif
162+ }
163+
164+ // ! Check to see if its empty
100165 bool isEmpty () const
101166 {
102167 return mRequestQueue .empty ();
103168 }
104169
105- // Check the length
170+ // ! Check the length
106171 int length () const
107172 {
108173 return mRequestQueue .size ();
@@ -116,7 +181,7 @@ struct RequestQueue
116181 mQueueLock .lock ();
117182 while (!isEmpty ())
118183 {
119- mRequestQueue .pop ();
184+ mRequestQueue .pop_front ();
120185 }
121186
122187#ifdef THREAD_DEBUG
@@ -126,12 +191,53 @@ struct RequestQueue
126191 mAvailableSpace .signal ();
127192 }
128193
194+ // ! Aggregates ProcFunctor of all of the elements of the queue
195+ template <typename ProcFunctor, typename AggregateType>
196+ AggregateType aggregate (const ProcFunctor& aggregate, const AggregateType& initial)
197+ {
198+ mQueueLock .lock ();
199+ AggregateType cumulative = initial;
200+ for (typename std::deque<T>::iterator iter = mRequestQueue .begin ();
201+ iter != mRequestQueue .end ();
202+ ++iter)
203+ {
204+ cumulative = aggregate (*iter, cumulative);
205+ }
206+
207+ mQueueLock .unlock ();
208+ return cumulative;
209+ }
210+
211+ // ! Remove the given request from the queue
212+ // Does nothing if the given request is not in the queue
213+ // \return true if an item was removed, false otherwise
214+ template <typename CmpFunctor>
215+ bool removeRequest (const CmpFunctor& compare)
216+ {
217+ mQueueLock .lock ();
218+ for (typename std::deque<T>::iterator iter = mRequestQueue .begin ();
219+ iter != mRequestQueue .end ();
220+ ++iter)
221+ {
222+ if (compare (*iter))
223+ {
224+ mRequestQueue .erase (iter);
225+ mQueueLock .unlock ();
226+ mAvailableSpace .signal ();
227+ return true ;
228+ }
229+ }
230+ mQueueLock .unlock ();
231+ return false ;
232+ }
233+
234+ private:
129235 RequestQueue (const RequestQueue&) = delete ;
130236 RequestQueue& operator =(const RequestQueue&) = delete ;
131237
132238private:
133239 // ! The internal data structure
134- std::queue <T> mRequestQueue ;
240+ std::deque <T> mRequestQueue ;
135241 // ! The synchronizer
136242 sys::Mutex mQueueLock ;
137243 // ! This condition is "is there space?"
0 commit comments