@@ -1264,20 +1264,6 @@ class OperationDefinitionVisitor
12641264 std::optional<AwaitableResolver> _result;
12651265};
12661266
1267- SubscriptionData::SubscriptionData (std::shared_ptr<OperationData> data, SubscriptionName&& field,
1268- response::Value arguments, Directives fieldDirectives, peg::ast&& query,
1269- std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1270- : data(std::move(data))
1271- , field(std::move(field))
1272- , arguments(std::move(arguments))
1273- , fieldDirectives(std::move(fieldDirectives))
1274- , query(std::move(query))
1275- , operationName(std::move(operationName))
1276- , callback(std::move(callback))
1277- , selection(selection)
1278- {
1279- }
1280-
12811267OperationDefinitionVisitor::OperationDefinitionVisitor (ResolverContext resolverContext,
12821268 await_async launch, std::shared_ptr<RequestState> state, const TypeMap& operations,
12831269 response::Value&& variables, FragmentMap&& fragments)
@@ -1372,6 +1358,20 @@ void OperationDefinitionVisitor::visit(
13721358 _params->variables ));
13731359}
13741360
1361+ SubscriptionData::SubscriptionData (std::shared_ptr<OperationData> data, SubscriptionName&& field,
1362+ response::Value arguments, Directives fieldDirectives, peg::ast&& query,
1363+ std::string&& operationName, SubscriptionCallback&& callback, const peg::ast_node& selection)
1364+ : data(std::move(data))
1365+ , field(std::move(field))
1366+ , arguments(std::move(arguments))
1367+ , fieldDirectives(std::move(fieldDirectives))
1368+ , query(std::move(query))
1369+ , operationName(std::move(operationName))
1370+ , callback(std::move(callback))
1371+ , selection(selection)
1372+ {
1373+ }
1374+
13751375// SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription
13761376// at the point where we create a subscription.
13771377class SubscriptionDefinitionVisitor
@@ -1609,6 +1609,8 @@ std::list<schema_error> Request::validate(peg::ast& query) const
16091609
16101610 if (!query.validated )
16111611 {
1612+ const std::lock_guard lock { _validationMutex };
1613+
16121614 _validation->visit (*query.root );
16131615 errors = _validation->getStructuredErrors ();
16141616 query.validated = errors.empty ();
@@ -1746,14 +1748,14 @@ response::AwaitableValue Request::resolve(RequestResolveParams params) const
17461748AwaitableSubscribe Request::subscribe (RequestSubscribeParams params)
17471749{
17481750 const auto spThis = shared_from_this ();
1749- auto launch = params. launch ;
1751+ std::unique_lock lock { spThis-> _subscriptionMutex } ;
17501752 const auto key = spThis->addSubscription (std::move (params));
17511753 const auto itrOperation = spThis->_operations .find (strSubscription);
17521754
17531755 if (itrOperation != spThis->_operations .end ())
17541756 {
1755- const auto & operation = itrOperation->second ;
1756- const auto & registration = spThis->_subscriptions .at (key);
1757+ const auto operation = itrOperation->second ;
1758+ const auto registration = spThis->_subscriptions .at (key);
17571759 const SelectionSetParams selectionSetParams {
17581760 ResolverContext::NotifySubscribe,
17591761 registration->data ->state ,
@@ -1762,19 +1764,23 @@ AwaitableSubscribe Request::subscribe(RequestSubscribeParams params)
17621764 std::make_shared<FragmentSpreadDirectiveStack>(),
17631765 std::make_shared<FragmentSpreadDirectiveStack>(),
17641766 {},
1765- launch,
1767+ params. launch ,
17661768 };
17671769
1770+ lock.unlock ();
1771+
17681772 try
17691773 {
1770- co_await launch;
1774+ co_await params. launch ;
17711775 co_await operation->resolve (selectionSetParams,
17721776 registration->selection ,
17731777 registration->data ->fragments ,
17741778 registration->data ->variables );
17751779 }
17761780 catch (const std::exception& ex)
17771781 {
1782+ lock.lock ();
1783+
17781784 // Rethrow the exception, but don't leave it subscribed if the resolver failed.
17791785 spThis->removeSubscription (key);
17801786 throw ex;
@@ -1787,12 +1793,13 @@ AwaitableSubscribe Request::subscribe(RequestSubscribeParams params)
17871793AwaitableUnsubscribe Request::unsubscribe (RequestUnsubscribeParams params)
17881794{
17891795 const auto spThis = shared_from_this ();
1796+ std::unique_lock lock { spThis->_subscriptionMutex };
17901797 const auto itrOperation = spThis->_operations .find (strSubscription);
17911798
17921799 if (itrOperation != spThis->_operations .end ())
17931800 {
1794- const auto & operation = itrOperation->second ;
1795- const auto & registration = spThis->_subscriptions .at (params.key );
1801+ const auto operation = itrOperation->second ;
1802+ const auto registration = spThis->_subscriptions .at (params.key );
17961803 const SelectionSetParams selectionSetParams {
17971804 ResolverContext::NotifyUnsubscribe,
17981805 registration->data ->state ,
@@ -1804,11 +1811,15 @@ AwaitableUnsubscribe Request::unsubscribe(RequestUnsubscribeParams params)
18041811 params.launch ,
18051812 };
18061813
1814+ lock.unlock ();
1815+
18071816 co_await params.launch ;
18081817 co_await operation->resolve (selectionSetParams,
18091818 registration->selection ,
18101819 registration->data ->fragments ,
18111820 registration->data ->variables );
1821+
1822+ lock.lock ();
18121823 }
18131824
18141825 spThis->removeSubscription (params.key );
@@ -1990,6 +2001,7 @@ std::vector<std::shared_ptr<SubscriptionData>> Request::collectRegistrations(
19902001 std::string_view field, RequestDeliverFilter&& filter) const noexcept
19912002{
19922003 std::vector<std::shared_ptr<SubscriptionData>> registrations;
2004+ const std::lock_guard lock { _subscriptionMutex };
19932005 const auto itrListeners = _listeners.find (field);
19942006
19952007 if (itrListeners != _listeners.end ())
0 commit comments