Skip to content

Commit ea3bc89

Browse files
fix a bunch of docs
1 parent fe8b443 commit ea3bc89

File tree

2 files changed

+78
-62
lines changed

2 files changed

+78
-62
lines changed

include/nbl/system/IAsyncQueueDispatcher.h

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,18 @@ class IAsyncQueueDispatcherBase
3030
{
3131
static_assert(std::atomic_uint32_t::is_always_lock_free);
3232
}
33-
~request_base_t() = default;
33+
~request_base_t()
34+
{
35+
// must have been consumed before exit !
36+
const auto atExit = state.load();
37+
assert(atExit==ES_INITIAL);
38+
}
39+
40+
// in case you need it (which you won't
41+
E_STATE queryState() const {return static_cast<E_STATE>(state.load());}
3442

3543
// lock when overwriting the request
36-
void reset()
44+
void start()
3745
{
3846
transition(ES_INITIAL,ES_RECORDING);
3947
}
@@ -58,16 +66,21 @@ class IAsyncQueueDispatcherBase
5866
assert(prev==ES_EXECUTING);
5967
state.notify_one();
6068
}
69+
70+
// non-blocking query
71+
bool is_ready() const
72+
{
73+
return state.load()==ES_READY;
74+
}
6175
// to call to await the request to finish processing
62-
void wait_ready()
76+
void wait_ready() const
6377
{
6478
wait_for(ES_READY);
6579
}
6680
// to call after done reading the request and its memory can be recycled
6781
void discard_storage()
6882
{
69-
const auto prev = state.exchange(ES_INITIAL);
70-
assert(prev==ES_READY);
83+
transition(ES_READY,ES_INITIAL);
7184
state.notify_one();
7285
}
7386

@@ -82,7 +95,7 @@ class IAsyncQueueDispatcherBase
8295
}
8396
assert(expected==from);
8497
}
85-
void wait_for(const E_STATE waitVal)
98+
void wait_for(const E_STATE waitVal) const
8699
{
87100
uint32_t current;
88101
while ((current=state.load())!=waitVal)
@@ -95,11 +108,21 @@ class IAsyncQueueDispatcherBase
95108
}
96109

97110
/**
111+
* Required accessible methods of class being CRTP parameter:
112+
*
113+
* void init(internal_state_t* state); // required only in case of custom internal state
114+
*
115+
* void exit(internal_state_t* state); // optional, no `state` parameter in case of no internal state
116+
*
117+
* void request_impl(request_t& req, ...); // `...` are parameteres forwarded from request(), the request's state is locked with a mutex during the call
118+
* void process_request(request_t& req, internal_state_t& state); // no `state` parameter in case of no internal state
119+
* void background_work() // optional, does nothing if not provided
120+
*
121+
*
98122
* Provided RequestType shall define 5 methods:
99-
* T reset();
100-
* void finalize(T&&);
123+
* void start();
124+
* void finalize();
101125
* T wait_for_work();
102-
* T wait_for_result();
103126
* T notify_all_ready(T&&);
104127
* TODO: [outdated docs] lock() will be called just before processing the request, and unlock() will be called just after processing the request.
105128
* Those are to enable safe external write access to the request struct for user-defined purposes.
@@ -110,18 +133,20 @@ class IAsyncQueueDispatcherBase
110133
* notify_all_ready() takes an r-value reference to an already locked mutex and notifies any waiters then releases the lock
111134
*/
112135
template <typename CRTP, typename RequestType, uint32_t BufferSize = 256u, typename InternalStateType = void>
113-
class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, public impl::IAsyncQueueDispatcherBase
136+
class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, protected impl::IAsyncQueueDispatcherBase
114137
{
115138
static_assert(std::is_base_of_v<impl::IAsyncQueueDispatcherBase::request_base_t,RequestType>, "Request type must derive from request_base_t!");
116139
static_assert(BufferSize>0u, "BufferSize must not be 0!");
117140
static_assert(core::isPoT(BufferSize), "BufferSize must be power of two!");
118141

119142
protected:
120143
using base_t = IThreadHandler<CRTP,InternalStateType>;
121-
friend base_t;
144+
friend base_t; // TODO: remove, some functions should just be protected
145+
122146
private:
123147
constexpr static inline uint32_t MaxRequestCount = BufferSize;
124148

149+
// maybe one day we'll abstract this into a lockless queue
125150
using atomic_counter_t = std::atomic_uint64_t;
126151
using counter_t = atomic_counter_t::value_type;
127152

@@ -135,11 +160,9 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pu
135160
return x & Mask;
136161
}
137162

138-
139163
public:
140-
141-
IAsyncQueueDispatcher() {};
142-
~IAsyncQueueDispatcher() {};
164+
inline IAsyncQueueDispatcher() {}
165+
inline ~IAsyncQueueDispatcher() {}
143166

144167
using mutex_t = typename base_t::mutex_t;
145168
using lock_t = typename base_t::lock_t;
@@ -148,20 +171,10 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pu
148171

149172
using request_t = RequestType;
150173

151-
///////
152-
// Required accessible methods of class being CRTP parameter:
153-
154-
//void init(internal_state_t* state); // required only in case of custom internal state
155-
156-
//void exit(internal_state_t* state); // optional, no `state` parameter in case of no internal state
157-
158-
//void request_impl(request_t& req, ...); // `...` are parameteres forwarded from request(), the request's state is locked with a mutex during the call
159-
//void process_request(request_t& req, internal_state_t& state); // no `state` parameter in case of no internal state
160-
//void background_work() // optional, does nothing if not provided
161-
///////
162-
163-
using base_t::base_t;
164-
174+
// Returns a reference to a request's storage in the circular buffer after processing the moved arguments
175+
// YOU MUST CONSUME THE REQUEST by calling `discard_storage()` on it EXACTLY ONCE!
176+
// YOU MUST CALL IT EVEN IF THERE'S NO DATA YOU WISH TO GET BACK FROM IT!
177+
// (if you don't the queue will deadlock because of an unresolved overflow)
165178
template <typename... Args>
166179
request_t& request(Args&&... args)
167180
{
@@ -174,7 +187,7 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pu
174187
const auto r_id = wrapAround(virtualIx);
175188

176189
request_t& req = request_pool[r_id];
177-
req.reset();
190+
req.start();
178191
static_cast<CRTP*>(this)->request_impl(req, std::forward<Args>(args)...);
179192
req.finalize();
180193

include/nbl/system/ICancellableAsyncQueueDispatcher.h

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,37 @@ class ICancellableAsyncQueueDispatcherBase
4545
friend future_base_t;
4646
friend ICancellableAsyncQueueDispatcherBase;
4747

48-
//! Atomically cancels this request
49-
bool set_cancel();
48+
//! Atomically cancels this request, returns false if we haven't cancelled the request in time before it was executed
49+
bool set_cancel()
50+
{
51+
// double cancellation
52+
if (!future)
53+
return false;
54+
// wait in case of processing
55+
uint32_t expected = ES_PENDING;
56+
while (!state.compare_exchange_strong(expected, ES_INITIAL))
57+
{
58+
if (expected == ES_READY)
59+
{
60+
transition(ES_READY, ES_INITIAL);
61+
return true;
62+
}
63+
else if (expected == ES_INITIAL) // cancel after await
64+
{
65+
return false;
66+
}
67+
// was executing, we didnt get here on time
68+
assert(expected == ES_EXECUTING);
69+
state.wait(expected);
70+
expected = ES_PENDING;
71+
}
72+
// we've actually cancelled a pending request, and need to cleanup the future
73+
if (future)
74+
future->request = nullptr;
75+
future = nullptr;
76+
return true;
77+
}
78+
//!
5079
bool query_cancel() const
5180
{
5281
return state.load()==ES_INITIAL||future==nullptr;
@@ -83,6 +112,7 @@ class ICancellableAsyncQueueDispatcherBase
83112

84113
bool cancel()
85114
{
115+
// atomic exchange of pointer to ensure only one thread gets to cancel at once
86116
request_base_t* req = request.exchange(nullptr);
87117
if (req)
88118
return req->set_cancel();
@@ -106,7 +136,10 @@ class ICancellableAsyncQueueDispatcherBase
106136
// all the data is stored inside the future during the request execution, so we dont need access to the request struct after its done executing
107137
// could have used wait_ready() && discard_storate() but its more efficient that way
108138
if (req)
139+
{
109140
req->transition(impl::IAsyncQueueDispatcherBase::request_base_t::ES_READY,impl::IAsyncQueueDispatcherBase::request_base_t::ES_INITIAL);
141+
req->state.notify_one();
142+
}
110143
}
111144
};
112145

@@ -220,36 +253,6 @@ class ICancellableAsyncQueueDispatcher : public IAsyncQueueDispatcher<CRTP, Requ
220253
}
221254
};
222255

223-
// returns false if we haven't cancelled the request in time before it was executed
224-
inline bool impl::ICancellableAsyncQueueDispatcherBase::request_base_t::set_cancel()
225-
{
226-
// double cancellation
227-
if (!future)
228-
return false;
229-
// wait in case of processing
230-
uint32_t expected = ES_PENDING;
231-
while (!state.compare_exchange_strong(expected,ES_INITIAL))
232-
{
233-
if (expected==ES_READY)
234-
{
235-
transition(ES_READY,ES_INITIAL);
236-
return true;
237-
}
238-
else if (expected==ES_INITIAL) // cancel after await
239-
{
240-
return false;
241-
}
242-
// was executing, we didnt get here on time
243-
state.wait(expected);
244-
expected = ES_PENDING;
245-
}
246-
// we've actually cancelled a pending request, and need to cleanup the future
247-
if (future)
248-
future->request = nullptr;
249-
future = nullptr;
250-
return true;
251-
}
252-
253256
}
254257

255258
#endif

0 commit comments

Comments
 (0)