File tree Expand file tree Collapse file tree 4 files changed +514
-10
lines changed
Expand file tree Collapse file tree 4 files changed +514
-10
lines changed Original file line number Diff line number Diff line change @@ -148,18 +148,32 @@ enum class ResolverContext
148148 NotifyUnsubscribe,
149149};
150150
151- // Resume coroutine execution on a worker thread.
151+ // Resume coroutine execution on a new worker thread any time co_await is called. This emulates the
152+ // behavior of std::async when passing std::launch::async.
152153struct await_worker_thread : coro::suspend_always
153154{
154- void await_suspend (coro::coroutine_handle<> h) const
155- {
156- std::thread (
157- [](coro::coroutine_handle<>&& h) noexcept {
158- h.resume ();
159- },
160- std::move (h))
161- .detach ();
162- }
155+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h) const ;
156+ };
157+
158+ // Queue coroutine execution on a single dedicated worker thread any time co_await is called from
159+ // the thread which created it.
160+ struct await_worker_queue : coro::suspend_always
161+ {
162+ GRAPHQLSERVICE_EXPORT await_worker_queue ();
163+ GRAPHQLSERVICE_EXPORT ~await_worker_queue ();
164+
165+ GRAPHQLSERVICE_EXPORT bool await_ready () const ;
166+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h);
167+
168+ private:
169+ void resumePending ();
170+
171+ const std::thread::id _startId;
172+ std::mutex _mutex {};
173+ std::condition_variable _cv {};
174+ std::list<coro::coroutine_handle<>> _pending {};
175+ bool _shutdown = false ;
176+ std::thread _worker;
163177};
164178
165179// Type-erased awaitable.
Original file line number Diff line number Diff line change @@ -164,6 +164,72 @@ response::Value schema_exception::getErrors()
164164 return buildErrorValues (std::move (_structuredErrors));
165165}
166166
167+ void await_worker_thread::await_suspend (coro::coroutine_handle<> h) const
168+ {
169+ std::thread (
170+ [](coro::coroutine_handle<>&& h) {
171+ h.resume ();
172+ },
173+ std::move (h))
174+ .detach ();
175+ }
176+
177+ await_worker_queue::await_worker_queue ()
178+ : _startId { std::this_thread::get_id () }
179+ , _worker { [this ]() {
180+ resumePending ();
181+ } }
182+ {
183+ }
184+
185+ await_worker_queue::~await_worker_queue ()
186+ {
187+ std::unique_lock lock { _mutex };
188+
189+ _shutdown = true ;
190+ lock.unlock ();
191+ _cv.notify_one ();
192+
193+ _worker.join ();
194+ }
195+
196+ bool await_worker_queue::await_ready () const
197+ {
198+ return std::this_thread::get_id () != _startId;
199+ }
200+
201+ void await_worker_queue::await_suspend (coro::coroutine_handle<> h)
202+ {
203+ std::unique_lock lock { _mutex };
204+
205+ _pending.push_back (std::move (h));
206+ lock.unlock ();
207+ _cv.notify_one ();
208+ }
209+
210+ void await_worker_queue::resumePending ()
211+ {
212+ std::unique_lock lock { _mutex };
213+
214+ while (!_shutdown)
215+ {
216+ _cv.wait (lock, [this ]() {
217+ return _shutdown || !_pending.empty ();
218+ });
219+
220+ auto pending = std::move (_pending);
221+
222+ lock.unlock ();
223+
224+ for (auto h : pending)
225+ {
226+ h.resume ();
227+ }
228+
229+ lock.lock ();
230+ }
231+ }
232+
167233FieldParams::FieldParams (SelectionSetParams&& selectionSetParams, Directives directives)
168234 : SelectionSetParams(std::move(selectionSetParams))
169235 , fieldDirectives(std::move(directives))
Original file line number Diff line number Diff line change @@ -23,6 +23,14 @@ target_link_libraries(today_tests PRIVATE
2323add_bigobj_flag(today_tests)
2424gtest_add_tests(TARGET today_tests)
2525
26+ add_executable (coroutine_tests CoroutineTests.cpp)
27+ target_link_libraries (coroutine_tests PRIVATE
28+ todaygraphql
29+ graphqljson
30+ GTest::GTest
31+ GTest::Main)
32+ gtest_add_tests(TARGET coroutine_tests)
33+
2634add_executable (client_tests ClientTests.cpp)
2735target_link_libraries (client_tests PRIVATE
2836 todaygraphql
You can’t perform that action at this time.
0 commit comments