@@ -644,7 +644,7 @@ void SelectionVisitor::visitField(const peg::ast_node& field)
644644
645645 _values.push ({
646646 std::move (alias),
647- itr->second ({ _state, arguments, selection, _fragments, _variables })
647+ itr->second ({ _state, std::move ( arguments) , selection, _fragments, _variables })
648648 });
649649}
650650
@@ -998,6 +998,11 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
998998 {
999999 operation = " query" ;
10001000 }
1001+ else if (operation == " subscription" )
1002+ {
1003+ // Skip subscription operations, they should use subscribe instead of resolve.
1004+ return ;
1005+ }
10011006
10021007 auto position = operationDefinition.begin ();
10031008 std::string name;
@@ -1060,56 +1065,55 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
10601065 throw schema_exception ({ error.str () });
10611066 }
10621067
1063- _result = std::async (std::launch::deferred,
1064- [this , &operationDefinition](std::shared_ptr<RequestState> state, std::shared_ptr<Object> operationObject)
1065- {
1066- response::Value operationVariables (response::Type::Map);
1068+ response::Value operationVariables (response::Type::Map);
10671069
1068- peg::on_first_child<peg::variable_definitions>(operationDefinition,
1069- [this , &operationVariables](const peg::ast_node& child)
1070+ peg::on_first_child<peg::variable_definitions>(operationDefinition,
1071+ [this , &operationVariables](const peg::ast_node& child)
1072+ {
1073+ peg::for_each_child<peg::variable>(child,
1074+ [this , &operationVariables](const peg::ast_node& variable)
10701075 {
1071- peg::for_each_child<peg::variable>(child,
1072- [this , &operationVariables](const peg::ast_node& variable)
1073- {
1074- std::string variableName;
1076+ std::string variableName;
10751077
1076- peg::on_first_child<peg::variable_name>(variable,
1077- [&variableName](const peg::ast_node& name)
1078- {
1079- // Skip the $ prefix
1080- variableName = name.content ().c_str () + 1 ;
1081- });
1078+ peg::on_first_child<peg::variable_name>(variable,
1079+ [&variableName](const peg::ast_node& name)
1080+ {
1081+ // Skip the $ prefix
1082+ variableName = name.content ().c_str () + 1 ;
1083+ });
10821084
1083- auto itrVar = _variables.find (variableName);
1084- response::Value valueVar;
1085+ auto itrVar = _variables.find (variableName);
1086+ response::Value valueVar;
10851087
1086- if (itrVar != _variables.get <const response::MapType&>().cend ())
1087- {
1088- valueVar = response::Value (itrVar->second );
1089- }
1090- else
1088+ if (itrVar != _variables.get <const response::MapType&>().cend ())
1089+ {
1090+ valueVar = response::Value (itrVar->second );
1091+ }
1092+ else
1093+ {
1094+ peg::on_first_child<peg::default_value>(variable,
1095+ [this , &valueVar](const peg::ast_node& defaultValue)
10911096 {
1092- peg::on_first_child<peg::default_value>(variable,
1093- [this , &valueVar](const peg::ast_node& defaultValue)
1094- {
1095- ValueVisitor visitor (_variables);
1097+ ValueVisitor visitor (_variables);
10961098
1097- visitor.visit (*defaultValue.children .front ());
1098- valueVar = visitor.getValue ();
1099- });
1100- }
1099+ visitor.visit (*defaultValue.children .front ());
1100+ valueVar = visitor.getValue ();
1101+ });
1102+ }
11011103
1102- operationVariables.emplace_back (std::move (variableName), std::move (valueVar));
1103- });
1104+ operationVariables.emplace_back (std::move (variableName), std::move (valueVar));
11041105 });
1106+ });
11051107
1108+ _result = std::async (std::launch::deferred,
1109+ [](std::future<response::Value> data)
1110+ {
11061111 response::Value document (response::Type::Map);
1107- auto data = operationObject->resolve (state, *operationDefinition.children .back (), _fragments, operationVariables);
11081112
11091113 document.emplace_back (" data" , data.get ());
11101114
11111115 return document;
1112- }, _state, itr->second );
1116+ }, itr->second -> resolve (_state, *operationDefinition. children . back (), _fragments, operationVariables) );
11131117 }
11141118 catch (const schema_exception& ex)
11151119 {
@@ -1124,6 +1128,138 @@ void OperationDefinitionVisitor::visit(const peg::ast_node& operationDefinition)
11241128 }
11251129}
11261130
1131+ // SubscriptionDefinitionVisitor visits the AST collects the fields referenced in the subscription at the point
1132+ // where we create a subscription.
1133+ class SubscriptionDefinitionVisitor
1134+ {
1135+ public:
1136+ SubscriptionDefinitionVisitor (SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments);
1137+
1138+ const peg::ast_node& getRoot () const ;
1139+ SubscriptionRegistration getRegistration ();
1140+
1141+ void visit (const peg::ast_node& operationDefinition);
1142+
1143+ private:
1144+ SubscriptionParams _params;
1145+ SubscriptionCallback _callback;
1146+ FragmentMap _fragments;
1147+ std::unique_ptr<SubscriptionRegistration> _result;
1148+ };
1149+
1150+ SubscriptionDefinitionVisitor::SubscriptionDefinitionVisitor (SubscriptionParams&& params, SubscriptionCallback&& callback, FragmentMap&& fragments)
1151+ : _params(std::move(params))
1152+ , _callback(std::move(callback))
1153+ , _fragments(std::move(fragments))
1154+ {
1155+ }
1156+
1157+ const peg::ast_node& SubscriptionDefinitionVisitor::getRoot () const
1158+ {
1159+ return *_params.query ->root ;
1160+ }
1161+
1162+ SubscriptionRegistration SubscriptionDefinitionVisitor::getRegistration ()
1163+ {
1164+ if (!_result)
1165+ {
1166+ std::ostringstream error;
1167+
1168+ error << " Missing operation" ;
1169+
1170+ if (!_params.operationName .empty ())
1171+ {
1172+ error << " name: " << _params.operationName ;
1173+ }
1174+
1175+ throw schema_exception ({ error.str () });
1176+ }
1177+
1178+ auto result = std::move (*_result);
1179+
1180+ _result.reset ();
1181+
1182+ return result;
1183+ }
1184+
1185+ void SubscriptionDefinitionVisitor::visit (const peg::ast_node& operationDefinition)
1186+ {
1187+ std::string operation;
1188+
1189+ peg::on_first_child<peg::operation_type>(operationDefinition,
1190+ [&operation](const peg::ast_node& child)
1191+ {
1192+ operation = child.content ();
1193+ });
1194+
1195+ if (operation != " subscription" )
1196+ {
1197+ // Skip operations other than subscription.
1198+ return ;
1199+ }
1200+
1201+ auto position = operationDefinition.begin ();
1202+ std::string name;
1203+
1204+ peg::on_first_child<peg::operation_name>(operationDefinition,
1205+ [&name](const peg::ast_node& child)
1206+ {
1207+ name = child.content ();
1208+ });
1209+
1210+ if (!_params.operationName .empty ()
1211+ && name != _params.operationName )
1212+ {
1213+ // Skip the subscriptions that don't match the name
1214+ return ;
1215+ }
1216+
1217+ if (_result)
1218+ {
1219+ std::ostringstream error;
1220+
1221+ if (_params.operationName .empty ())
1222+ {
1223+ error << " No operationName specified with extra subscription" ;
1224+ }
1225+ else
1226+ {
1227+ error << " Duplicate subscription" ;
1228+ }
1229+
1230+ if (!name.empty ())
1231+ {
1232+ error << " name: " << name;
1233+ }
1234+
1235+ error << " line: " << position.line
1236+ << " column: " << position.byte_in_line ;
1237+
1238+ throw schema_exception ({ error.str () });
1239+ }
1240+
1241+ const auto & selection = *operationDefinition.children .back ();
1242+ std::unordered_set<SubscriptionName> fieldNames;
1243+
1244+ peg::for_each_child<peg::field>(selection,
1245+ [this , &fieldNames](const peg::ast_node& field)
1246+ {
1247+ peg::on_first_child<peg::field_name>(field,
1248+ [&fieldNames](const peg::ast_node& child)
1249+ {
1250+ fieldNames.insert (child.content ());
1251+ });
1252+ });
1253+
1254+ _result.reset (new SubscriptionRegistration {
1255+ std::move (_params),
1256+ std::move (_callback),
1257+ selection,
1258+ std::move (fieldNames),
1259+ std::move (_fragments)
1260+ });
1261+ }
1262+
11271263Request::Request (TypeMap&& operationTypes)
11281264 : _operations(std::move(operationTypes))
11291265{
@@ -1151,6 +1287,112 @@ std::future<response::Value> Request::resolve(const std::shared_ptr<RequestState
11511287 return operationVisitor.getValue ();
11521288}
11531289
1290+ SubscriptionKey Request::subscribe (SubscriptionParams&& params, SubscriptionCallback&& callback)
1291+ {
1292+ FragmentDefinitionVisitor fragmentVisitor;
1293+
1294+ peg::for_each_child<peg::fragment_definition>(*params.query ->root ,
1295+ [&fragmentVisitor](const peg::ast_node& child)
1296+ {
1297+ fragmentVisitor.visit (child);
1298+ });
1299+
1300+ auto fragments = fragmentVisitor.getFragments ();
1301+ SubscriptionDefinitionVisitor subscriptionVisitor (std::move (params), std::move (callback), std::move (fragments));
1302+
1303+ peg::for_each_child<peg::operation_definition>(subscriptionVisitor.getRoot (),
1304+ [&subscriptionVisitor](const peg::ast_node& child)
1305+ {
1306+ subscriptionVisitor.visit (child);
1307+ });
1308+
1309+ auto registration = subscriptionVisitor.getRegistration ();
1310+ auto key = _nextKey++;
1311+
1312+ for (const auto & name : registration.fieldNames )
1313+ {
1314+ _listeners[name].insert (key);
1315+ }
1316+
1317+ _subscriptions.emplace (key, std::move (registration));
1318+
1319+ return key;
1320+ }
1321+
1322+ void Request::unsubscribe (SubscriptionKey key)
1323+ {
1324+ auto itrSubscription = _subscriptions.find (key);
1325+
1326+ if (itrSubscription == _subscriptions.cend ())
1327+ {
1328+ return ;
1329+ }
1330+
1331+ for (const auto & name : itrSubscription->second .fieldNames )
1332+ {
1333+ auto itrListener = _listeners.find (name);
1334+
1335+ itrListener->second .erase (key);
1336+ if (itrListener->second .empty ())
1337+ {
1338+ _listeners.erase (itrListener);
1339+ }
1340+ }
1341+
1342+ _subscriptions.erase (itrSubscription);
1343+
1344+ if (_subscriptions.empty ())
1345+ {
1346+ _nextKey = 0 ;
1347+ }
1348+ else
1349+ {
1350+ _nextKey = _subscriptions.crbegin ()->first + 1 ;
1351+ }
1352+ }
1353+
1354+ void Request::deliver (const SubscriptionName& name, const std::shared_ptr<Object>& subscriptionObject) const
1355+ {
1356+ auto itrListeners = _listeners.find (name);
1357+
1358+ if (itrListeners == _listeners.cend ())
1359+ {
1360+ return ;
1361+ }
1362+
1363+ for (const auto & key : itrListeners->second )
1364+ {
1365+ auto itrSubscription = _subscriptions.find (key);
1366+ const auto & registration = itrSubscription->second ;
1367+ std::future<response::Value> result;
1368+
1369+ try
1370+ {
1371+ result = std::async (std::launch::deferred,
1372+ [](std::future<response::Value> data)
1373+ {
1374+ response::Value document (response::Type::Map);
1375+
1376+ document.emplace_back (" data" , data.get ());
1377+
1378+ return document;
1379+ }, subscriptionObject->resolve (registration.params .state , registration.selection , registration.fragments , registration.params .variables ));
1380+ }
1381+ catch (const schema_exception& ex)
1382+ {
1383+ std::promise<response::Value> promise;
1384+ response::Value document (response::Type::Map);
1385+
1386+ document.emplace_back (" data" , response::Value ());
1387+ document.emplace_back (" errors" , response::Value (ex.getErrors ()));
1388+
1389+ result = promise.get_future ();
1390+ }
1391+
1392+ registration.callback (std::move (result));
1393+ }
1394+ }
1395+
11541396} /* namespace service */
11551397} /* namespace graphql */
11561398} /* namespace facebook */
0 commit comments