Skip to content

Commit afa9dec

Browse files
draft complete
1 parent 418db0d commit afa9dec

File tree

5 files changed

+164
-183
lines changed

5 files changed

+164
-183
lines changed

include/nbl/core/StorageTrivializer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct alignas(T) StorageTrivializer
2424
T* getStorage() {return reinterpret_cast<T*>(storage); }
2525
const T* getStorage() const {return reinterpret_cast<const T*>(storage);}
2626

27+
template<typename... Args>
2728
void construct(Args&&... args)
2829
{
2930
new (getStorage()) T(std::forward<Args>(args)...);

include/nbl/system/IAsyncQueueDispatcher.h

Lines changed: 64 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
#ifndef __NBL_I_ASYNC_QUEUE_DISPATCHER_H_INCLUDED__
2-
#define __NBL_I_ASYNC_QUEUE_DISPATCHER_H_INCLUDED__
1+
#ifndef _NBL_I_ASYNC_QUEUE_DISPATCHER_H_INCLUDED_
2+
#define _NBL_I_ASYNC_QUEUE_DISPATCHER_H_INCLUDED_
33

4-
#include <atomic>
54
#include "nbl/core/declarations.h"
5+
66
#include "nbl/system/IThreadHandler.h"
7+
#include "nbl/system/atomic_state.h"
78

89
namespace nbl::system
910
{
@@ -12,74 +13,7 @@ namespace impl
1213
{
1314
class IAsyncQueueDispatcherBase
1415
{
15-
protected:
16-
IAsyncQueueDispatcherBase() = default;
17-
~IAsyncQueueDispatcherBase() = default;
18-
19-
template<class STATE, STATE kInitial=static_cast<STATE>(0u)>
20-
class atomic_state_t
21-
{
22-
static_assert(std::is_enum_v<STATE>);
23-
24-
public:
25-
~atomic_state_t()
26-
{
27-
static_assert(std::atomic_uint32_t::is_always_lock_free);
28-
// must have been consumed before exit !
29-
const auto atExit = state.load();
30-
assert(static_cast<STATE>(atExit)==kInitial);
31-
}
32-
33-
inline STATE query() const {return static_cast<STATE>(state.load());}
34-
// TODO: improve
35-
inline void wait(const STATE targetState) const
36-
{
37-
uint32_t current;
38-
while ((current=state.load()) != targetState)
39-
state.wait(current);
40-
}
41-
42-
[[nodiscard]] inline bool tryTransition(STATE& expected, const STATE to)
43-
{
44-
return state.compare_exchange_strong(reinterpret_cast<uint32_t&>(from),static_cast<uint32_t>(to));
45-
}
46-
47-
inline void waitTransition(const STATE from, const STATE to)
48-
{
49-
STATE expected = from;
50-
while (!tryTransition(expected,to))
51-
{
52-
state.wait(static_cast<uint32_t>(expected));
53-
expected = from;
54-
}
55-
assert(expected==from);
56-
}
57-
58-
[[nodiscard]] inline bool waitAbortableTransition(const STATE from, const STATE to, const STATE abortState)
59-
{
60-
uint32_t expected = static_cast<uint32_t>(from);
61-
while (!state.compare_exchange_strong(expected,static_cast<uint32_t>(to)))
62-
{
63-
state.wait(expected);
64-
if (expected==static_cast<uint32_t>(abortState))
65-
return false;
66-
expected = from;
67-
}
68-
assert(expected==from);
69-
return true;
70-
}
71-
// TODO: improve (assert and notify one vs all)
72-
inline void exchangeNotify(const STATE expected, const STATE to)
73-
{
74-
const auto prev = state.exchange(static_cast<uint32_t>(to));
75-
assert(static_cast<STATE>(prev)==expected);
76-
state.notify_one();
77-
}
78-
79-
private:
80-
std::atomic_uint32_t state = static_cast<uint32_t>(kInitial);
81-
};
82-
16+
public:
8317
struct future_base_t;
8418
// dont want to play around with relaxed memory ordering yet
8519
struct request_base_t
@@ -100,7 +34,7 @@ class IAsyncQueueDispatcherBase
10034
//! REQUESTING THREAD: lock when overwriting the request's data
10135
inline void start()
10236
{
103-
state.waitTransition(STATE::INITIAL,STATE::RECORDING);
37+
state.waitTransition(STATE::RECORDING,STATE::INITIAL);
10438
// previous thing cleaned up after itself
10539
assert(!future);
10640
}
@@ -115,8 +49,8 @@ class IAsyncQueueDispatcherBase
11549
//! ANY THREAD [except worker]: via cancellable_future_t::cancel
11650
inline void cancel()
11751
{
118-
const auto prev = state.exchangeNotify(STATE::CANCELLED);
119-
// If we were in EXECUTING then worker thread is definitely stuck `base_t::disassociate_request` spinlock
52+
const auto prev = state.exchangeNotify<false>(STATE::CANCELLED);
53+
// If we were in EXECUTING then worker thread is definitely stuck in `base_t::disassociate_request` spinlock
12054
assert(prev==STATE::PENDING || prev==STATE::EXECUTING);
12155
// sanity check, but its not our job to set it to nullptr
12256
assert(future);
@@ -164,25 +98,25 @@ class IAsyncQueueDispatcherBase
16498
// sanity check
16599
assert(req->getState().query()==request_base_t::STATE::RECORDING);
166100
// if not initial state then wait until it gets moved, etc.
167-
state.waitTransition(STATE::INITIAL,STATE::ASSOCIATED);
101+
state.waitTransition(STATE::ASSOCIATED,STATE::INITIAL);
168102
}
169103
//! WORKER THREAD: done as part of execution at the very start, after we want to begin work
170104
[[nodiscard]] virtual inline bool disassociate_request()
171105
{
172-
return state.waitAbortableTransition(STATE::ASSOCIATED,STATE::EXECUTING,STATE::INITIAL);
106+
return state.waitAbortableTransition(STATE::EXECUTING,STATE::ASSOCIATED,STATE::INITIAL);
173107
}
174108
//! WORKER THREAD: done as part of execution at the very end, after object is constructed
175109
inline void notify()
176110
{
177-
state.exchangeNotify(STATE::EXECUTING,STATE::READY);
111+
state.exchangeNotify<true>(STATE::READY,STATE::EXECUTING);
178112
}
179113

180114
protected:
181115
// the base class is not directly usable
182116
virtual inline ~future_base_t()
183117
{
184118
// non-cancellable future just need to get to this state, and cancellable will move here
185-
state.wait(STATE::INITIAL);
119+
state.wait([](const STATE _query)->bool{return _query!=STATE::INITIAL;});
186120
}
187121
// future_t is non-copyable and non-movable because request needs a pointer to it
188122
future_base_t(const future_base_t&) = delete;
@@ -195,10 +129,21 @@ class IAsyncQueueDispatcherBase
195129
atomic_state_t<STATE,STATE::INITIAL> state= {};
196130
};
197131

132+
protected:
133+
IAsyncQueueDispatcherBase() = default;
134+
~IAsyncQueueDispatcherBase() = default;
135+
198136
public:
199137
template<typename T>
200-
struct future_t : private core::StorageTrivializer<T>, protected future_base_t
138+
class future_t : private core::StorageTrivializer<T>, protected future_base_t
201139
{
140+
using storage_t = core::StorageTrivializer<T>;
141+
inline void discard_common()
142+
{
143+
storage_t::destruct();
144+
state.exchangeNotify<true>(STATE::INITIAL,STATE::LOCKED);
145+
}
146+
202147
public:
203148
inline future_t() = default;
204149
inline ~future_t()
@@ -222,25 +167,27 @@ class IAsyncQueueDispatcherBase
222167
}
223168

224169
//! Returns after waiting till `ready()` would be true or after
225-
inline bool wait()
170+
inline bool wait() const
226171
{
227-
while (true)
228-
{
229-
switch (state.query())
172+
bool retval = false;
173+
state.wait([&retval](const STATE _query)->bool{
174+
switch (_query)
230175
{
231-
case STATE::INITIAL:
232-
return false;
233-
break;
234-
case STATE::READY:
235-
[[fallthrough]];
236-
case STATE::LOCKED:
237-
return true;
238-
break;
239-
default:
240-
break;
176+
case STATE::INITIAL:
177+
return false;
178+
break;
179+
case STATE::READY:
180+
[[fallthrough]];
181+
case STATE::LOCKED:
182+
retval = true;
183+
return false;
184+
break;
185+
default:
186+
break;
241187
}
242-
}
243-
assert(false);
188+
return true;
189+
});
190+
return retval;
244191
}
245192

246193
//! NOTE: Deliberately named `...acquire` instead of `...lock` to make them incompatible with `unique_lock`
@@ -250,36 +197,36 @@ class IAsyncQueueDispatcherBase
250197
[[nodiscard]] inline T* try_acquire()
251198
{
252199
auto expected = STATE::READY;
253-
if (state.tryTransition(expected,STATE::LOCKED))
254-
return getStorage();
200+
if (state.tryTransition(STATE::LOCKED,expected))
201+
return storage_t::getStorage();
255202
return nullptr;
256203
}
257204
//! ANY THREAD [except WORKER]: Wait till we're either in READY and move us to LOCKED or bail on INITIAL
258205
// this accounts for being cancelled or consumed while waiting
259206
[[nodiscard]] inline T* acquire()
260207
{
261-
if (state.waitAbortableTransition(STATE::READY,STATE::LOCKED,STATE::INITIAL))
262-
return getStorage();
208+
if (state.waitAbortableTransition(STATE::LOCKED,STATE::READY,STATE::INITIAL))
209+
return storage_t::getStorage();
263210
return nullptr;
264211
}
265212
//! ANY THREAD [except WORKER]: Release an acquired lock
266213
inline void release()
267214
{
268-
state.exchangeNotify(STATE::LOCKED,STATE::READY);
215+
state.exchangeNotify<true>(STATE::READY,STATE::LOCKED);
269216
}
270217

271218
//! NOTE: You're in charge of ensuring future doesn't transition back to INITIAL (e.g. lock or use sanely!)
272219
inline const T* get() const
273220
{
274221
if (ready())
275-
return getStorage();
222+
return storage_t::getStorage();
276223
return nullptr;
277224
}
278225
inline T* get()
279226
{
280227
if (future_base_t::state.query() != future_base_t::STATE::LOCKED)
281228
return nullptr;
282-
return getStorage();
229+
return storage_t::getStorage();
283230
}
284231

285232
//! Can only be called once! If returns false means has been cancelled and nothing happened
@@ -288,7 +235,7 @@ class IAsyncQueueDispatcherBase
288235
T* pSrc = acquire();
289236
if (!pSrc)
290237
return false;
291-
dst = std::move(*T);
238+
dst = std::move(*pSrc);
292239
discard_common();
293240
return true;
294241
}
@@ -305,16 +252,9 @@ class IAsyncQueueDispatcherBase
305252
template <typename... Args>
306253
inline void notify(Args&&... args)
307254
{
308-
new (getStorage()) T(std::forward<Args>(args)...);
255+
storage_t::construct(std::forward<Args>(args)...);
309256
future_base_t::notify();
310257
}
311-
312-
private:
313-
inline discard_common()
314-
{
315-
destruct();
316-
state.exchangeNotify(STATE::LOCKED, STATE::INITIAL);
317-
}
318258
};
319259
template<typename T>
320260
struct cancellable_future_t final : public future_t<T>
@@ -326,8 +266,8 @@ class IAsyncQueueDispatcherBase
326266
//! ANY THREAD [except WORKER]: Cancel pending request if we can, returns whether we actually managed to cancel
327267
inline bool cancel()
328268
{
329-
STATE expected = STATE::ASSOCIATED;
330-
if (state.tryTransition(expected,STATE::EXECUTING))
269+
auto expected = base_t::STATE::ASSOCIATED;
270+
if (state.tryTransition(STATE::EXECUTING,expected))
331271
{
332272
// Since we're here we've managed to move from ASSOCIATED to fake "EXECUTING" this means that the Request is either:
333273
// 1. RECORDING but after returning from `base_t::associate_request`
@@ -338,7 +278,7 @@ class IAsyncQueueDispatcherBase
338278
request.exchange(nullptr)->cancel();
339279

340280
// after doing everything, we can mark ourselves as cleaned up
341-
state.exchangeNotify(STATE::EXECUTING,STATE::INITIAL);
281+
state.exchangeNotify<false>(STATE::INITIAL,STATE::EXECUTING);
342282
return true;
343283
}
344284
// we're here because either:
@@ -372,12 +312,11 @@ class IAsyncQueueDispatcherBase
372312
}
373313
inline bool disassociate_request() override
374314
{
375-
assert(request.load()->getState().query()==request_base_t::STATE::EXECUTING);
376315
if (base_t::disassociate_request())
377316
{
378317
// only assign if we didn't get cancelled mid-way, otherwise will mess up `associate_request` sanity checks
379318
request_base_t* prev = request.exchange(nullptr);
380-
assert(prev);
319+
assert(prev && prev->getState().query()==request_base_t::STATE::EXECUTING);
381320
return true;
382321
}
383322
return false;
@@ -389,16 +328,16 @@ inline void IAsyncQueueDispatcherBase::request_base_t::finalize(future_base_t* f
389328
{
390329
future = fut;
391330
future->associate_request(this);
392-
state.exchangeNotify(STATE::RECORDING,STATE::PENDING);
331+
state.exchangeNotify<false>(STATE::PENDING,STATE::RECORDING);
393332
}
394333

395334
inline bool IAsyncQueueDispatcherBase::request_base_t::wait()
396335
{
397-
if (state.waitAbortableTransition(STATE::PENDING,STATE::EXECUTING,STATE::CANCELLED) && future->disassociate_request())
336+
if (state.waitAbortableTransition(STATE::EXECUTING,STATE::PENDING,STATE::CANCELLED) && future->disassociate_request())
398337
return true;
399338
//assert(future->cancellable);
400339
future = nullptr;
401-
state.exchangeNotify(STATE::CANCELLED,STATE::INITIAL);
340+
state.exchangeNotify<false>(STATE::INITIAL,STATE::CANCELLED);
402341
return false;
403342
}
404343
inline void IAsyncQueueDispatcherBase::request_base_t::notify()
@@ -407,7 +346,7 @@ inline void IAsyncQueueDispatcherBase::request_base_t::notify()
407346
// cleanup
408347
future = nullptr;
409348
// allow to be recycled
410-
state.exchangeNotify(STATE::EXECUTING,STATE::INITIAL);
349+
state.exchangeNotify<false>(STATE::INITIAL,STATE::EXECUTING);
411350
}
412351

413352
}
@@ -438,7 +377,7 @@ inline void IAsyncQueueDispatcherBase::request_base_t::notify()
438377
* notify_all_ready() takes an r-value reference to an already locked mutex and notifies any waiters then releases the lock
439378
*/
440379
template <typename CRTP, typename RequestType, uint32_t BufferSize = 256u, typename InternalStateType = void>
441-
class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, protected impl::IAsyncQueueDispatcherBase
380+
class IAsyncQueueDispatcher : public IThreadHandler<CRTP,InternalStateType>, protected impl::IAsyncQueueDispatcherBase
442381
{
443382
static_assert(std::is_base_of_v<impl::IAsyncQueueDispatcherBase::request_base_t,RequestType>, "Request type must derive from request_base_t!");
444383
static_assert(BufferSize>0u, "BufferSize must not be 0!");
@@ -467,7 +406,7 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pr
467406

468407
public:
469408
inline IAsyncQueueDispatcher() {}
470-
inline ~IAsyncQueueDispatcher() {}
409+
inline IAsyncQueueDispatcher(base_t::start_on_construction_t) : base_t(base_t::start_on_construction_t) {}
471410

472411
using mutex_t = typename base_t::mutex_t;
473412
using lock_t = typename base_t::lock_t;
@@ -505,6 +444,7 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pr
505444
}
506445

507446
protected:
447+
inline ~IAsyncQueueDispatcher() {}
508448
void background_work() {}
509449

510450
private:
@@ -538,9 +478,8 @@ class IAsyncQueueDispatcher : public IThreadHandler<CRTP, InternalStateType>, pr
538478
lock.lock();
539479
}
540480

541-
542-
bool wakeupPredicate() const { return (cb_begin != cb_end); }
543-
bool continuePredicate() const { return (cb_begin != cb_end); }
481+
inline bool wakeupPredicate() const { return (cb_begin != cb_end); }
482+
inline bool continuePredicate() const { return (cb_begin != cb_end); }
544483
};
545484

546485
}

0 commit comments

Comments
 (0)