Skip to content

Commit b419af0

Browse files
committed
subscriptions: No backpressure to notify queue
1 parent ce8336e commit b419af0

File tree

9 files changed

+81
-67
lines changed

9 files changed

+81
-67
lines changed

graph/src/components/store.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,8 @@ pub struct StoreEventStream<S> {
623623
pub type StoreEventStreamBox =
624624
StoreEventStream<Box<dyn Stream<Item = Arc<StoreEvent>, Error = ()> + Send>>;
625625

626+
pub type UnitStream = Box<dyn futures03::Stream<Item = ()> + Unpin + Send + Sync>;
627+
626628
impl<S> Stream for StoreEventStream<S>
627629
where
628630
S: Stream<Item = Arc<StoreEvent>, Error = ()> + Send,
@@ -661,6 +663,8 @@ where
661663
/// on the returned stream as a single `StoreEvent`; the events are
662664
/// combined by using the maximum of all sources and the concatenation
663665
/// of the changes of the `StoreEvents` received during the interval.
666+
//
667+
// Currently unused, needs to be made compatible with `subscribe_no_payload`.
664668
pub async fn throttle_while_syncing(
665669
self,
666670
logger: &Logger,
@@ -849,6 +853,9 @@ pub trait SubscriptionManager: Send + Sync + 'static {
849853
///
850854
/// Returns a stream of store events that match the input arguments.
851855
fn subscribe(&self, entities: Vec<SubscriptionFilter>) -> StoreEventStreamBox;
856+
857+
/// If the payload is not required, use for a more efficient subscription mechanism backed by a watcher.
858+
fn subscribe_no_payload(&self, entities: Vec<SubscriptionFilter>) -> UnitStream;
852859
}
853860

854861
/// An internal identifer for the specific instance of a deployment. The

graph/src/data/subscription/result.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::prelude::QueryResult;
2-
use std::marker::Unpin;
2+
use std::pin::Pin;
33
use std::sync::Arc;
44

55
/// A stream of query results for a subscription.
66
pub type QueryResultStream =
7-
Box<dyn futures03::stream::Stream<Item = Arc<QueryResult>> + Send + Unpin>;
7+
Pin<Box<dyn futures03::stream::Stream<Item = Arc<QueryResult>> + Send>>;
88

99
/// The result of running a subscription, if successful.
1010
pub type SubscriptionResult = QueryResultStream;

graphql/src/execution/resolver.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::collections::HashMap;
22

33
use crate::execution::ExecutionContext;
4-
use graph::prelude::{async_trait, q, s, tokio, Error, QueryExecutionError, StoreEventStreamBox};
4+
use graph::components::store::UnitStream;
5+
use graph::prelude::{async_trait, q, s, tokio, Error, QueryExecutionError};
56
use graph::{
67
data::graphql::{ext::DocumentExt, ObjectOrInterface},
78
prelude::{r, QueryResult},
@@ -109,12 +110,12 @@ pub trait Resolver: Sized + Send + Sync + 'static {
109110
}
110111

111112
// Resolves a change stream for a given field.
112-
async fn resolve_field_stream(
113+
fn resolve_field_stream(
113114
&self,
114115
_schema: &s::Document,
115116
_object_type: &s::ObjectType,
116117
_field: &q::Field,
117-
) -> Result<StoreEventStreamBox, QueryExecutionError> {
118+
) -> Result<UnitStream, QueryExecutionError> {
118119
Err(QueryExecutionError::NotSupported(String::from(
119120
"Resolving field streams is not supported by this resolver",
120121
)))

graphql/src/runner.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,6 @@ where
347347
result_size: self.result_size.clone(),
348348
},
349349
)
350-
.await
351350
}
352351

353352
fn load_manager(&self) -> Arc<LoadManager> {

graphql/src/store/resolver.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::store::query::collect_entities_from_query_field;
1919
/// A resolver that fetches entities from a `Store`.
2020
#[derive(Clone)]
2121
pub struct StoreResolver {
22+
#[allow(dead_code)]
2223
logger: Logger,
2324
pub(crate) store: Arc<dyn QueryStore>,
2425
subscription_manager: Arc<dyn SubscriptionManager>,
@@ -298,25 +299,17 @@ impl Resolver for StoreResolver {
298299
}
299300
}
300301

301-
async fn resolve_field_stream(
302+
fn resolve_field_stream(
302303
&self,
303304
schema: &s::Document,
304305
object_type: &s::ObjectType,
305306
field: &q::Field,
306-
) -> result::Result<StoreEventStreamBox, QueryExecutionError> {
307+
) -> result::Result<UnitStream, QueryExecutionError> {
307308
// Collect all entities involved in the query field
308309
let entities = collect_entities_from_query_field(schema, object_type, field);
309310

310311
// Subscribe to the store and return the entity change stream
311-
Ok(self
312-
.subscription_manager
313-
.subscribe(entities)
314-
.throttle_while_syncing(
315-
&self.logger,
316-
self.store.clone(),
317-
*SUBSCRIPTION_THROTTLE_INTERVAL,
318-
)
319-
.await)
312+
Ok(self.subscription_manager.subscribe_no_payload(entities))
320313
}
321314

322315
fn post_process(&self, result: &mut QueryResult) -> Result<(), anyhow::Error> {

graphql/src/subscription/mod.rs

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::iter;
33
use std::result::Result;
44
use std::time::{Duration, Instant};
55

6+
use graph::components::store::UnitStream;
67
use graph::{components::store::SubscriptionManager, prelude::*};
78

89
use crate::runner::ResultSizeMetrics;
@@ -40,7 +41,7 @@ pub struct SubscriptionExecutionOptions {
4041
pub result_size: Arc<ResultSizeMetrics>,
4142
}
4243

43-
pub async fn execute_subscription(
44+
pub fn execute_subscription(
4445
subscription: Subscription,
4546
schema: Arc<ApiSchema>,
4647
options: SubscriptionExecutionOptions,
@@ -53,10 +54,10 @@ pub async fn execute_subscription(
5354
options.max_complexity,
5455
options.max_depth,
5556
)?;
56-
execute_prepared_subscription(query, options).await
57+
execute_prepared_subscription(query, options)
5758
}
5859

59-
pub(crate) async fn execute_prepared_subscription(
60+
pub(crate) fn execute_prepared_subscription(
6061
query: Arc<crate::execution::Query>,
6162
options: SubscriptionExecutionOptions,
6263
) -> Result<SubscriptionResult, SubscriptionError> {
@@ -72,15 +73,15 @@ pub(crate) async fn execute_prepared_subscription(
7273
"query" => &query.query_text,
7374
);
7475

75-
let source_stream = create_source_event_stream(query.clone(), &options).await?;
76+
let source_stream = create_source_event_stream(query.clone(), &options)?;
7677
let response_stream = map_source_to_response_stream(query, options, source_stream);
7778
Ok(response_stream)
7879
}
7980

80-
async fn create_source_event_stream(
81+
fn create_source_event_stream(
8182
query: Arc<crate::execution::Query>,
8283
options: &SubscriptionExecutionOptions,
83-
) -> Result<StoreEventStreamBox, SubscriptionError> {
84+
) -> Result<UnitStream, SubscriptionError> {
8485
let resolver = StoreResolver::for_subscription(
8586
&options.logger,
8687
query.schema.id().clone(),
@@ -123,35 +124,31 @@ async fn create_source_event_stream(
123124
let field = fields.1[0];
124125
let argument_values = coerce_argument_values(&ctx.query, subscription_type.as_ref(), field)?;
125126

126-
resolve_field_stream(&ctx, &subscription_type, field, argument_values).await
127+
resolve_field_stream(&ctx, &subscription_type, field, argument_values)
127128
}
128129

129-
async fn resolve_field_stream(
130+
fn resolve_field_stream(
130131
ctx: &ExecutionContext<impl Resolver>,
131132
object_type: &s::ObjectType,
132133
field: &q::Field,
133134
_argument_values: HashMap<&str, r::Value>,
134-
) -> Result<StoreEventStreamBox, SubscriptionError> {
135+
) -> Result<UnitStream, SubscriptionError> {
135136
ctx.resolver
136137
.resolve_field_stream(&ctx.query.schema.document(), object_type, field)
137-
.await
138138
.map_err(SubscriptionError::from)
139139
}
140140

141141
fn map_source_to_response_stream(
142142
query: Arc<crate::execution::Query>,
143143
options: SubscriptionExecutionOptions,
144-
source_stream: StoreEventStreamBox,
144+
source_stream: UnitStream,
145145
) -> QueryResultStream {
146146
// Create a stream with a single empty event. By chaining this in front
147147
// of the real events, we trick the subscription into executing its query
148148
// at least once. This satisfies the GraphQL over Websocket protocol
149149
// requirement of "respond[ing] with at least one GQL_DATA message", see
150150
// https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_data
151-
let trigger_stream = futures03::stream::iter(vec![Ok(Arc::new(StoreEvent {
152-
tag: 0,
153-
changes: Default::default(),
154-
}))]);
151+
let trigger_stream = futures03::stream::once(async {});
155152

156153
let SubscriptionExecutionOptions {
157154
logger,
@@ -165,43 +162,34 @@ fn map_source_to_response_stream(
165162
result_size,
166163
} = options;
167164

168-
Box::new(
169-
trigger_stream
170-
.chain(source_stream.compat())
171-
.then(move |res| match res {
172-
Err(()) => {
173-
futures03::future::ready(Arc::new(QueryExecutionError::EventStreamError.into()))
174-
.boxed()
175-
}
176-
Ok(event) => execute_subscription_event(
177-
logger.clone(),
178-
store.clone(),
179-
subscription_manager.cheap_clone(),
180-
query.clone(),
181-
event,
182-
timeout,
183-
max_first,
184-
max_skip,
185-
result_size.cheap_clone(),
186-
)
187-
.boxed(),
188-
}),
189-
)
165+
trigger_stream
166+
.chain(source_stream)
167+
.then(move |()| {
168+
execute_subscription_event(
169+
logger.clone(),
170+
store.clone(),
171+
subscription_manager.cheap_clone(),
172+
query.clone(),
173+
timeout,
174+
max_first,
175+
max_skip,
176+
result_size.cheap_clone(),
177+
)
178+
.boxed()
179+
})
180+
.boxed()
190181
}
191182

192183
async fn execute_subscription_event(
193184
logger: Logger,
194185
store: Arc<dyn QueryStore>,
195186
subscription_manager: Arc<dyn SubscriptionManager>,
196187
query: Arc<crate::execution::Query>,
197-
event: Arc<StoreEvent>,
198188
timeout: Option<Duration>,
199189
max_first: u32,
200190
max_skip: u32,
201191
result_size: Arc<ResultSizeMetrics>,
202192
) -> Arc<QueryResult> {
203-
debug!(logger, "Execute subscription event"; "event" => format!("{:?}", event));
204-
205193
let resolver = match StoreResolver::at_block(
206194
&logger,
207195
store,

graphql/tests/query.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -887,9 +887,8 @@ fn query_complexity_subscriptions() {
887887

888888
// This query is exactly at the maximum complexity.
889889
// FIXME: Not collecting the stream because that will hang the test.
890-
let _ignore_stream = execute_subscription(Subscription { query }, schema.clone(), options)
891-
.await
892-
.unwrap();
890+
let _ignore_stream =
891+
execute_subscription(Subscription { query }, schema.clone(), options).unwrap();
893892

894893
let query = Query::new(
895894
graphql_parser::parse_query(
@@ -934,7 +933,7 @@ fn query_complexity_subscriptions() {
934933
};
935934

936935
// The extra introspection causes the complexity to go over.
937-
let result = execute_subscription(Subscription { query }, schema, options).await;
936+
let result = execute_subscription(Subscription { query }, schema, options);
938937
match result {
939938
Err(SubscriptionError::GraphQLError(e)) => match e[0] {
940939
QueryExecutionError::TooComplex(1_010_200, _) => (), // Expected
@@ -1314,9 +1313,7 @@ fn subscription_gets_result_even_without_events() {
13141313
};
13151314
// Execute the subscription and expect at least one result to be
13161315
// available in the result stream
1317-
let stream = execute_subscription(Subscription { query }, schema, options)
1318-
.await
1319-
.unwrap();
1316+
let stream = execute_subscription(Subscription { query }, schema, options).unwrap();
13201317
let results: Vec<_> = stream
13211318
.take(1)
13221319
.collect()

node/src/manager/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use graph::{
2-
components::store::SubscriptionManager,
2+
components::store::{SubscriptionManager, UnitStream},
33
prelude::{StoreEventStreamBox, SubscriptionFilter},
44
};
55

@@ -15,4 +15,8 @@ impl SubscriptionManager for PanicSubscriptionManager {
1515
fn subscribe(&self, _: Vec<SubscriptionFilter>) -> StoreEventStreamBox {
1616
panic!("we were never meant to call `subscribe`");
1717
}
18+
19+
fn subscribe_no_payload(&self, _: Vec<SubscriptionFilter>) -> UnitStream {
20+
panic!("we were never meant to call `subscribe_no_payload`");
21+
}
1822
}

store/postgres/src/store_events.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use graph::tokio_stream::wrappers::ReceiverStream;
33
use std::sync::{atomic::Ordering, Arc, RwLock};
44
use std::{collections::HashMap, sync::atomic::AtomicUsize};
55
use tokio::sync::mpsc::{channel, Sender};
6+
use tokio::sync::watch;
67
use uuid::Uuid;
78

89
use crate::notification_listener::{NotificationListener, SafeChannelName};
9-
use graph::components::store::SubscriptionManager as SubscriptionManagerTrait;
10+
use graph::components::store::{SubscriptionManager as SubscriptionManagerTrait, UnitStream};
1011
use graph::prelude::serde_json;
11-
use graph::prelude::*;
12+
use graph::{prelude::*, tokio_stream};
1213

1314
pub struct StoreEventListener {
1415
notification_listener: NotificationListener,
@@ -100,6 +101,17 @@ impl EventSink for Sender<Arc<StoreEvent>> {
100101
}
101102
}
102103

104+
#[async_trait]
105+
impl EventSink for watch::Sender<()> {
106+
async fn send(&self, _event: Arc<StoreEvent>) -> Result<(), Error> {
107+
Ok(self.send(())?)
108+
}
109+
110+
fn is_closed(&self) -> bool {
111+
self.is_closed()
112+
}
113+
}
114+
103115
/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
104116
/// currently active subscribers and forward new events to each of them
105117
pub struct SubscriptionManager {
@@ -204,4 +216,17 @@ impl SubscriptionManagerTrait for SubscriptionManager {
204216
StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat()))
205217
.filter_by_entities(entities)
206218
}
219+
220+
fn subscribe_no_payload(&self, entities: Vec<SubscriptionFilter>) -> UnitStream {
221+
let id = Uuid::new_v4().to_string();
222+
223+
let (sender, receiver) = watch::channel(());
224+
225+
self.subscriptions
226+
.write()
227+
.unwrap()
228+
.insert(id, (Arc::new(entities.clone()), Arc::new(sender)));
229+
230+
Box::new(tokio_stream::wrappers::WatchStream::new(receiver))
231+
}
207232
}

0 commit comments

Comments
 (0)