Skip to content

Commit 4a11105

Browse files
the draft and the docs improve
1 parent afa9dec commit 4a11105

File tree

2 files changed

+137
-133
lines changed

2 files changed

+137
-133
lines changed

include/nbl/system/IAsyncQueueDispatcher.h

Lines changed: 59 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ namespace impl
1313
{
1414
class IAsyncQueueDispatcherBase
1515
{
16-
public:
16+
protected:
1717
struct future_base_t;
1818
// dont want to play around with relaxed memory ordering yet
19-
struct request_base_t
19+
struct request_base_t // TODO: protect to anyone but inheritor
2020
{
2121
public:
2222
enum class STATE : uint32_t
@@ -92,6 +92,8 @@ class IAsyncQueueDispatcherBase
9292
LOCKED=4
9393
};
9494

95+
protected:
96+
friend struct request_base_t;
9597
//! REQUESTING THREAD: done as part of filling out the request
9698
virtual inline void associate_request(request_base_t* req)
9799
{
@@ -111,7 +113,6 @@ class IAsyncQueueDispatcherBase
111113
state.exchangeNotify<true>(STATE::READY,STATE::EXECUTING);
112114
}
113115

114-
protected:
115116
// the base class is not directly usable
116117
virtual inline ~future_base_t()
117118
{
@@ -129,7 +130,7 @@ class IAsyncQueueDispatcherBase
129130
atomic_state_t<STATE,STATE::INITIAL> state= {};
130131
};
131132

132-
protected:
133+
// not meant for direct usage
133134
IAsyncQueueDispatcherBase() = default;
134135
~IAsyncQueueDispatcherBase() = default;
135136

@@ -145,7 +146,7 @@ class IAsyncQueueDispatcherBase
145146
}
146147

147148
public:
148-
inline future_t() = default;
149+
inline future_t() {}
149150
inline ~future_t()
150151
{
151152
discard();
@@ -230,7 +231,7 @@ class IAsyncQueueDispatcherBase
230231
}
231232

232233
//! Can only be called once! If returns false means has been cancelled and nothing happened
233-
inline bool move_into(T& dst)
234+
[[nodiscard]] inline bool move_into(T& dst)
234235
{
235236
T* pSrc = acquire();
236237
if (!pSrc)
@@ -240,6 +241,15 @@ class IAsyncQueueDispatcherBase
240241
return true;
241242
}
242243

244+
//! Utility to write less code, WILL ASSERT IF IT FAILS TO ACQUIRE! So don't use on futures that might be cancelled or fail.
245+
inline T copy_out()
246+
{
247+
T retval;
248+
const bool success = move_into(retval);
249+
assert(success);
250+
return retval;
251+
}
252+
243253
//!
244254
virtual inline void discard()
245255
{
@@ -250,14 +260,13 @@ class IAsyncQueueDispatcherBase
250260
protected:
251261
// construct the retval element
252262
template <typename... Args>
253-
inline void notify(Args&&... args)
263+
inline void construct(Args&&... args)
254264
{
255265
storage_t::construct(std::forward<Args>(args)...);
256-
future_base_t::notify();
257266
}
258267
};
259268
template<typename T>
260-
struct cancellable_future_t final : public future_t<T>
269+
struct cancellable_future_t : public future_t<T>
261270
{
262271
using base_t = future_t<T>;
263272
std::atomic<request_base_t*> request = nullptr;
@@ -267,7 +276,7 @@ class IAsyncQueueDispatcherBase
267276
inline bool cancel()
268277
{
269278
auto expected = base_t::STATE::ASSOCIATED;
270-
if (state.tryTransition(STATE::EXECUTING,expected))
279+
if (base_t::state.tryTransition(base_t::STATE::EXECUTING,expected))
271280
{
272281
// Since we're here we've managed to move from ASSOCIATED to fake "EXECUTING" this means that the Request is either:
273282
// 1. RECORDING but after returning from `base_t::associate_request`
@@ -278,7 +287,7 @@ class IAsyncQueueDispatcherBase
278287
request.exchange(nullptr)->cancel();
279288

280289
// after doing everything, we can mark ourselves as cleaned up
281-
state.exchangeNotify<false>(STATE::INITIAL,STATE::EXECUTING);
290+
base_t::state.exchangeNotify<false>(base_t::STATE::INITIAL, base_t::STATE::EXECUTING);
282291
return true;
283292
}
284293
// we're here because either:
@@ -288,7 +297,7 @@ class IAsyncQueueDispatcherBase
288297
// - request is ready
289298
// - storage is locked/acquired
290299
// sanity check (there's a tiny gap between transitioning to EXECUTING and disassociating request)
291-
assert(expected==STATE::EXECUTING || request==nullptr);
300+
assert(expected==base_t::STATE::EXECUTING || request==nullptr);
292301
return false;
293302
}
294303

@@ -303,14 +312,14 @@ class IAsyncQueueDispatcherBase
303312
}
304313

305314
private:
306-
inline void associate_request(request_base_t* req) override
315+
inline void associate_request(request_base_t* req) override final
307316
{
308317
base_t::associate_request(req);
309318
request_base_t* prev = request.exchange(req);
310319
// sanity check
311320
assert(prev==nullptr);
312321
}
313-
inline bool disassociate_request() override
322+
inline bool disassociate_request() override final
314323
{
315324
if (base_t::disassociate_request())
316325
{
@@ -357,44 +366,39 @@ inline void IAsyncQueueDispatcherBase::request_base_t::notify()
357366
* void init(internal_state_t* state); // required only in case of custom internal state
358367
*
359368
* void exit(internal_state_t* state); // optional, no `state` parameter in case of no internal state
360-
*
361-
* void request_impl(request_t& req, ...); // `...` are parameteres forwarded from request(), the request's state is locked with a mutex during the call
362-
* void process_request(request_t& req, internal_state_t& state); // no `state` parameter in case of no internal state
363-
* void background_work() // optional, does nothing if not provided
364369
*
370+
* // no `state` parameter in case of no internal state
371+
* void process_request(request_metadata_t& req, internal_state_t& state);
372+
*
373+
* void background_work() // optional, does nothing if not provided
365374
*
366-
* Provided RequestType shall define 5 methods:
367-
* void start();
368-
* void finalize();
369-
* bool wait();
370-
* void notify();
371-
* TODO: [outdated docs] lock() will be called just before processing the request, and unlock() will be called just after processing the request.
372-
* Those are to enable safe external write access to the request struct for user-defined purposes.
373-
*
374-
* wait_for_result() will wait until the Async queue completes processing the request and notifies us that the request is ready,
375-
* the request will remain locked upon return (so nothing overwrites its address on the circular buffer)
376375
*
377-
* notify_all_ready() takes an r-value reference to an already locked mutex and notifies any waiters then releases the lock
376+
* The `lock()` will be called just before calling into `background_work()` and processing any requests via `process_request()`,
377+
* `unlock()` will be called just after processing the request (if any).
378378
*/
379-
template <typename CRTP, typename RequestType, uint32_t BufferSize = 256u, typename InternalStateType = void>
379+
template<typename CRTP, typename request_metadata_t, uint32_t BufferSize=256u, typename InternalStateType=void>
380380
class IAsyncQueueDispatcher : public IThreadHandler<CRTP,InternalStateType>, protected impl::IAsyncQueueDispatcherBase
381381
{
382-
static_assert(std::is_base_of_v<impl::IAsyncQueueDispatcherBase::request_base_t,RequestType>, "Request type must derive from request_base_t!");
383382
static_assert(BufferSize>0u, "BufferSize must not be 0!");
384383
static_assert(core::isPoT(BufferSize), "BufferSize must be power of two!");
385384

386385
protected:
387386
using base_t = IThreadHandler<CRTP,InternalStateType>;
388387
friend base_t; // TODO: remove, some functions should just be protected
389388

389+
struct request_t : public request_base_t
390+
{
391+
request_metadata_t m_metadata;
392+
};
393+
390394
private:
391395
constexpr static inline uint32_t MaxRequestCount = BufferSize;
392396

393397
// maybe one day we'll abstract this into a lockless queue
394398
using atomic_counter_t = std::atomic_uint64_t;
395399
using counter_t = atomic_counter_t::value_type;
396400

397-
RequestType request_pool[MaxRequestCount];
401+
request_t request_pool[MaxRequestCount];
398402
atomic_counter_t cb_begin = 0u;
399403
atomic_counter_t cb_end = 0u;
400404

@@ -413,39 +417,41 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP,InternalStateType>, pro
413417
using cvar_t = typename base_t::cvar_t;
414418
using internal_state_t = typename base_t::internal_state_t;
415419

416-
using request_t = RequestType;
420+
template<typename T>
421+
using future_t = impl::IAsyncQueueDispatcherBase::future_t<T>;
422+
template<typename T>
423+
using cancellable_future_t = impl::IAsyncQueueDispatcherBase::cancellable_future_t<T>;
417424

418-
// Returns a reference to a request's storage in the circular buffer after processing the moved arguments
419-
// YOU MUST CONSUME THE REQUEST by calling `discard_storage()` on it EXACTLY ONCE!
420-
// YOU MUST CALL IT EVEN IF THERE'S NO DATA YOU WISH TO GET BACK FROM IT!
421-
// (if you don't the queue will deadlock because of an unresolved overflow)
422-
template <typename... Args>
423-
request_t& request(Args&&... args)
425+
//! Constructs a request with `args` via `CRTP::request_impl` on the circular buffer after there's enough space to accomodate it.
426+
//! Then it associates the request to a future passed in as the first argument.
427+
template<typename T, typename... Args>
428+
void request(future_t<T>* _future, Args&&... args)
424429
{
425-
auto virtualIx = cb_end++;
426-
auto safe_begin = virtualIx<MaxRequestCount ? static_cast<counter_t>(0) : (virtualIx-MaxRequestCount+1u);
427-
428-
for (counter_t old_begin; (old_begin = cb_begin.load()) < safe_begin; )
430+
// get next output index
431+
const auto virtualIx = cb_end++;
432+
// protect against overflow by waiting for the worker to catch up
433+
const auto safe_begin = virtualIx<MaxRequestCount ? static_cast<counter_t>(0) : (virtualIx-MaxRequestCount+1u);
434+
for (counter_t old_begin; (old_begin=cb_begin.load())<safe_begin; )
429435
cb_begin.wait(old_begin);
430436

437+
// get actual storage index now
431438
const auto r_id = wrapAround(virtualIx);
432439

433440
request_t& req = request_pool[r_id];
434441
req.start();
435-
static_cast<CRTP*>(this)->request_impl(req, std::forward<Args>(args)...);
436-
req.finalize();
442+
req.m_metadata = request_metadata_t(std::forward<Args>(args)...);
443+
req.finalize(_future);
437444

438445
{
439446
auto global_lk = base_t::createLock();
440447
// wake up queue thread (needs to happen under a lock to not miss a wakeup)
441448
base_t::m_cvar.notify_one();
442449
}
443-
return req;
444450
}
445451

446452
protected:
447453
inline ~IAsyncQueueDispatcher() {}
448-
void background_work() {}
454+
inline void background_work() {}
449455

450456
private:
451457
template <typename... Args>
@@ -463,17 +469,16 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP,InternalStateType>, pro
463469

464470
request_t& req = request_pool[r_id];
465471
// do NOT allow cancelling or modification of the request while working on it
466-
if (req.wait_for_work())
472+
if (req.wait())
467473
{
468-
// if the request supports cancelling and got cancelled, the wait_for_work function may return false
469-
static_cast<CRTP*>(this)->process_request(req, optional_internal_state...);
474+
// if the request supports cancelling and got cancelled, then `wait()` function may return false
475+
static_cast<CRTP*>(this)->process_request(req.m_metadata,optional_internal_state...);
476+
req.notify();
470477
}
471478
// wake the waiter up
472-
req.notify_ready();
473479
cb_begin++;
474-
#if __cplusplus >= 202002L
475-
cb_begin.notify_one();
476-
#endif
480+
// this does not need to happen under a lock, because its not a condvar
481+
cb_begin.notify_one();
477482
}
478483
lock.lock();
479484
}

0 commit comments

Comments
 (0)