@@ -148,22 +148,36 @@ enum class ResolverContext
148148 NotifyUnsubscribe,
149149};
150150
151- // Resume coroutine execution on a worker thread.
151+ // Resume coroutine execution on a new worker thread any time co_await is called. This emulates the
152+ // behavior of std::async when passing std::launch::async.
152153struct await_worker_thread : coro::suspend_always
153154{
154- void await_suspend (coro::coroutine_handle<> h) const
155- {
156- std::thread (
157- [](coro::coroutine_handle<>&& h) noexcept {
158- h.resume ();
159- },
160- std::move (h))
161- .detach ();
162- }
155+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h) const ;
156+ };
157+
158+ // Queue coroutine execution on a single dedicated worker thread any time co_await is called from
159+ // the thread which created it.
160+ struct await_worker_queue : coro::suspend_always
161+ {
162+ GRAPHQLSERVICE_EXPORT await_worker_queue ();
163+ GRAPHQLSERVICE_EXPORT ~await_worker_queue ();
164+
165+ GRAPHQLSERVICE_EXPORT bool await_ready () const ;
166+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h);
167+
168+ private:
169+ void resumePending ();
170+
171+ const std::thread::id _startId;
172+ std::mutex _mutex {};
173+ std::condition_variable _cv {};
174+ std::list<coro::coroutine_handle<>> _pending {};
175+ bool _shutdown = false ;
176+ std::thread _worker;
163177};
164178
165179// Type-erased awaitable.
166- class await_async : public coro ::suspend_always
180+ class await_async final
167181{
168182private:
169183 struct Concept
@@ -202,7 +216,7 @@ class await_async : public coro::suspend_always
202216 std::shared_ptr<T> _pimpl;
203217 };
204218
205- const std::shared_ptr<Concept> _pimpl;
219+ const std::shared_ptr<const Concept> _pimpl;
206220
207221public:
208222 // Type-erased explicit constructor for a custom awaitable.
@@ -213,36 +227,14 @@ class await_async : public coro::suspend_always
213227 }
214228
215229 // Default to immediate synchronous execution.
216- await_async ()
217- : _pimpl { std::static_pointer_cast<Concept>(
218- std::make_shared<Model<coro::suspend_never>>(std::make_shared<coro::suspend_never>())) }
219- {
220- }
230+ GRAPHQLSERVICE_EXPORT await_async ();
221231
222232 // Implicitly convert a std::launch parameter used with std::async to an awaitable.
223- await_async (std::launch launch)
224- : _pimpl { ((launch & std::launch::async) == std::launch::async)
225- ? std::static_pointer_cast<Concept>(std::make_shared<Model<await_worker_thread>>(
226- std::make_shared<await_worker_thread>()))
227- : std::static_pointer_cast<Concept>(std::make_shared<Model<coro::suspend_never>>(
228- std::make_shared<coro::suspend_never>())) }
229- {
230- }
231-
232- bool await_ready () const
233- {
234- return _pimpl->await_ready ();
235- }
233+ GRAPHQLSERVICE_EXPORT await_async (std::launch launch);
236234
237- void await_suspend (coro::coroutine_handle<> h) const
238- {
239- _pimpl->await_suspend (std::move (h));
240- }
241-
242- void await_resume () const
243- {
244- _pimpl->await_resume ();
245- }
235+ GRAPHQLSERVICE_EXPORT bool await_ready () const ;
236+ GRAPHQLSERVICE_EXPORT void await_suspend (coro::coroutine_handle<> h) const ;
237+ GRAPHQLSERVICE_EXPORT void await_resume () const ;
246238};
247239
248240// Directive order matters, and some of them are repeatable. So rather than passing them in a
@@ -826,7 +818,7 @@ struct ModifiedResult
826818 typename std::conditional_t <std::is_base_of_v<Object, U>, std::shared_ptr<U>, U>;
827819
828820 using future_type = typename std::conditional_t <std::is_base_of_v<Object, U>,
829- AwaitableObject<std::shared_ptr<Object>>, AwaitableScalar<type>>;
821+ AwaitableObject<std::shared_ptr<const Object>>, AwaitableScalar<type>>;
830822 };
831823
832824 // Convert a single value of the specified type to JSON.
@@ -849,7 +841,7 @@ struct ModifiedResult
849841 co_await params.launch ;
850842
851843 auto awaitedResult = co_await ModifiedResult<Object>::convert (
852- std::static_pointer_cast<Object>(co_await result),
844+ std::static_pointer_cast<const Object>(co_await result),
853845 std::move (params));
854846
855847 co_return std::move (awaitedResult);
@@ -1155,7 +1147,7 @@ GRAPHQLSERVICE_EXPORT AwaitableResolver ModifiedResult<response::Value>::convert
11551147 AwaitableScalar<response::Value> result, ResolverParams params);
11561148template <>
11571149GRAPHQLSERVICE_EXPORT AwaitableResolver ModifiedResult<Object>::convert(
1158- AwaitableObject<std::shared_ptr<Object>> result, ResolverParams params);
1150+ AwaitableObject<std::shared_ptr<const Object>> result, ResolverParams params);
11591151
11601152// Export all of the scalar value validation methods
11611153template <>
@@ -1260,10 +1252,10 @@ struct RequestDeliverParams
12601252 await_async launch {};
12611253
12621254 // Optional override for the default Subscription operation object.
1263- std::shared_ptr<Object> subscriptionObject {};
1255+ std::shared_ptr<const Object> subscriptionObject {};
12641256};
12651257
1266- using TypeMap = internal::string_view_map<std::shared_ptr<Object>>;
1258+ using TypeMap = internal::string_view_map<std::shared_ptr<const Object>>;
12671259
12681260// State which is captured and kept alive until all pending futures have been resolved for an
12691261// operation. Note: SelectionSet is the other parameter that gets passed to the top level Object,
@@ -1328,12 +1320,14 @@ class Request : public std::enable_shared_from_this<Request>
13281320private:
13291321 SubscriptionKey addSubscription (RequestSubscribeParams&& params);
13301322 void removeSubscription (SubscriptionKey key);
1331- std::vector<std::shared_ptr<SubscriptionData>> collectRegistrations (
1323+ std::vector<std::shared_ptr<const SubscriptionData>> collectRegistrations (
13321324 std::string_view field, RequestDeliverFilter&& filter) const noexcept ;
13331325
13341326 const TypeMap _operations;
1335- std::unique_ptr<ValidateExecutableVisitor> _validation;
1336- internal::sorted_map<SubscriptionKey, std::shared_ptr<SubscriptionData>> _subscriptions;
1327+ mutable std::mutex _validationMutex {};
1328+ const std::unique_ptr<ValidateExecutableVisitor> _validation;
1329+ mutable std::mutex _subscriptionMutex {};
1330+ internal::sorted_map<SubscriptionKey, std::shared_ptr<const SubscriptionData>> _subscriptions;
13371331 internal::sorted_map<SubscriptionName, internal::sorted_set<SubscriptionKey>> _listeners;
13381332 SubscriptionKey _nextKey = 0 ;
13391333};
0 commit comments