Skip to content

Commit 4dfa7e6

Browse files
committed
subscriptions: Share watcher among subscriptions of same filter
1 parent 528fd19 commit 4dfa7e6

File tree

8 files changed

+128
-74
lines changed

8 files changed

+128
-74
lines changed

core/src/subgraph/registrar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ where
136136
let logger = self.logger.clone();
137137

138138
self.subscription_manager
139-
.subscribe(vec![SubscriptionFilter::Assignment])
139+
.subscribe(FromIterator::from_iter([SubscriptionFilter::Assignment]))
140140
.map_err(|()| anyhow!("Entity change stream failed"))
141141
.map(|event| {
142142
// We're only interested in the SubgraphDeploymentAssignment change; we

graph/src/components/store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ impl StoreEvent {
588588
self
589589
}
590590

591-
pub fn matches(&self, filters: &Vec<SubscriptionFilter>) -> bool {
591+
pub fn matches(&self, filters: &BTreeSet<SubscriptionFilter>) -> bool {
592592
self.changes
593593
.iter()
594594
.any(|change| filters.iter().any(|filter| filter.matches(change)))
@@ -649,7 +649,7 @@ where
649649
/// Filter a `StoreEventStream` by subgraph and entity. Only events that have
650650
/// at least one change to one of the given (subgraph, entity) combinations
651651
/// will be delivered by the filtered stream.
652-
pub fn filter_by_entities(self, filters: Vec<SubscriptionFilter>) -> StoreEventStreamBox {
652+
pub fn filter_by_entities(self, filters: BTreeSet<SubscriptionFilter>) -> StoreEventStreamBox {
653653
let source = self.source.filter(move |event| event.matches(&filters));
654654

655655
StoreEventStream::new(Box::new(source))
@@ -852,10 +852,10 @@ pub trait SubscriptionManager: Send + Sync + 'static {
852852
/// Subscribe to changes for specific subgraphs and entities.
853853
///
854854
/// Returns a stream of store events that match the input arguments.
855-
fn subscribe(&self, entities: Vec<SubscriptionFilter>) -> StoreEventStreamBox;
855+
fn subscribe(&self, entities: BTreeSet<SubscriptionFilter>) -> StoreEventStreamBox;
856856

857857
/// If the payload is not required, use for a more efficient subscription mechanism backed by a watcher.
858-
fn subscribe_no_payload(&self, entities: Vec<SubscriptionFilter>) -> UnitStream;
858+
fn subscribe_no_payload(&self, entities: BTreeSet<SubscriptionFilter>) -> UnitStream;
859859
}
860860

861861
/// An internal identifer for the specific instance of a deployment. The

graph/src/data/store/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub mod scalar;
2626
pub mod ethereum;
2727

2828
/// Filter subscriptions
29-
#[derive(Clone)]
29+
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3030
pub enum SubscriptionFilter {
3131
/// Receive updates about all entities from the given deployment of the
3232
/// given type

graphql/src/store/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
1+
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
22
use std::mem::discriminant;
33

44
use graph::prelude::*;
@@ -295,7 +295,7 @@ pub fn collect_entities_from_query_field(
295295
schema: &s::Document,
296296
object_type: &s::ObjectType,
297297
field: &q::Field,
298-
) -> Vec<SubscriptionFilter> {
298+
) -> BTreeSet<SubscriptionFilter> {
299299
// Output entities
300300
let mut entities = HashSet::new();
301301

node/src/manager/commands/listen.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::io::Write;
1+
use std::iter::FromIterator;
22
use std::sync::Arc;
3+
use std::{collections::BTreeSet, io::Write};
34

45
use futures::compat::Future01CompatExt;
56
//use futures::future;
@@ -13,7 +14,7 @@ use crate::manager::deployment;
1314

1415
async fn listen(
1516
mgr: Arc<SubscriptionManager>,
16-
filter: Vec<SubscriptionFilter>,
17+
filter: BTreeSet<SubscriptionFilter>,
1718
) -> Result<(), Error> {
1819
let events = mgr.subscribe(filter);
1920
println!("press ctrl-c to stop");
@@ -41,7 +42,11 @@ async fn listen(
4142

4243
pub async fn assignments(mgr: Arc<SubscriptionManager>) -> Result<(), Error> {
4344
println!("waiting for assignment events");
44-
listen(mgr, vec![SubscriptionFilter::Assignment]).await?;
45+
listen(
46+
mgr,
47+
FromIterator::from_iter([SubscriptionFilter::Assignment]),
48+
)
49+
.await?;
4550

4651
Ok(())
4752
}
@@ -52,7 +57,7 @@ pub async fn entities(
5257
entity_types: Vec<String>,
5358
) -> Result<(), Error> {
5459
let deployment = deployment::as_hash(deployment)?;
55-
let filter: Vec<_> = entity_types
60+
let filter = entity_types
5661
.into_iter()
5762
.map(|et| SubscriptionFilter::Entities(deployment.clone(), EntityType::new(et)))
5863
.collect();

node/src/manager/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::BTreeSet;
2+
13
use graph::{
24
components::store::{SubscriptionManager, UnitStream},
35
prelude::{StoreEventStreamBox, SubscriptionFilter},
@@ -12,11 +14,11 @@ mod display;
1214
pub struct PanicSubscriptionManager;
1315

1416
impl SubscriptionManager for PanicSubscriptionManager {
15-
fn subscribe(&self, _: Vec<SubscriptionFilter>) -> StoreEventStreamBox {
17+
fn subscribe(&self, _: BTreeSet<SubscriptionFilter>) -> StoreEventStreamBox {
1618
panic!("we were never meant to call `subscribe`");
1719
}
1820

19-
fn subscribe_no_payload(&self, _: Vec<SubscriptionFilter>) -> UnitStream {
21+
fn subscribe_no_payload(&self, _: BTreeSet<SubscriptionFilter>) -> UnitStream {
2022
panic!("we were never meant to call `subscribe_no_payload`");
2123
}
2224
}

store/postgres/src/store_events.rs

Lines changed: 102 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use futures03::TryStreamExt;
2+
use graph::parking_lot::Mutex;
23
use graph::tokio_stream::wrappers::ReceiverStream;
4+
use std::collections::BTreeSet;
35
use std::sync::{atomic::Ordering, Arc, RwLock};
46
use std::{collections::HashMap, sync::atomic::AtomicUsize};
57
use tokio::sync::mpsc::{channel, Sender};
@@ -84,38 +86,46 @@ impl StoreEventListener {
8486
}
8587
}
8688

87-
#[async_trait]
88-
trait EventSink: Send + Sync {
89-
async fn send(&self, event: Arc<StoreEvent>) -> Result<(), Error>;
90-
fn is_closed(&self) -> bool;
89+
struct Watcher<T> {
90+
sender: Arc<watch::Sender<T>>,
91+
receiver: watch::Receiver<T>,
9192
}
9293

93-
#[async_trait]
94-
impl EventSink for Sender<Arc<StoreEvent>> {
95-
async fn send(&self, event: Arc<StoreEvent>) -> Result<(), Error> {
96-
Ok(self.send(event).await?)
94+
impl<T: Clone + Debug + Send + Sync + 'static> Watcher<T> {
95+
fn new(init: T) -> Self {
96+
let (sender, receiver) = watch::channel(init);
97+
Watcher {
98+
sender: Arc::new(sender),
99+
receiver,
100+
}
97101
}
98102

99-
fn is_closed(&self) -> bool {
100-
self.is_closed()
103+
fn send(&self, v: T) {
104+
// Unwrap: `self` holds a receiver.
105+
self.sender.send(v).unwrap()
101106
}
102-
}
103107

104-
#[async_trait]
105-
impl EventSink for watch::Sender<()> {
106-
async fn send(&self, _event: Arc<StoreEvent>) -> Result<(), Error> {
107-
Ok(self.send(())?)
108+
fn stream(&self) -> Box<dyn futures03::Stream<Item = T> + Unpin + Send + Sync> {
109+
Box::new(tokio_stream::wrappers::WatchStream::new(
110+
self.receiver.clone(),
111+
))
108112
}
109113

110-
fn is_closed(&self) -> bool {
111-
self.is_closed()
114+
/// Outstanding receivers returned from `Self::stream`.
115+
fn receiver_count(&self) -> usize {
116+
// Do not count the internal receiver.
117+
self.sender.receiver_count() - 1
112118
}
113119
}
114120

115121
/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
116122
/// currently active subscribers and forward new events to each of them
117123
pub struct SubscriptionManager {
118-
subscriptions: Arc<RwLock<HashMap<String, (Arc<Vec<SubscriptionFilter>>, Arc<dyn EventSink>)>>>,
124+
// These are more efficient since only one entry is stored per filter.
125+
subscriptions_no_payload: Arc<Mutex<HashMap<BTreeSet<SubscriptionFilter>, Watcher<()>>>>,
126+
127+
subscriptions:
128+
Arc<RwLock<HashMap<String, (Arc<BTreeSet<SubscriptionFilter>>, Sender<Arc<StoreEvent>>)>>>,
119129

120130
/// Keep the notification listener alive
121131
listener: StoreEventListener,
@@ -126,6 +136,7 @@ impl SubscriptionManager {
126136
let (listener, store_events) = StoreEventListener::new(logger, postgres_url, registry);
127137

128138
let mut manager = SubscriptionManager {
139+
subscriptions_no_payload: Arc::new(Mutex::new(HashMap::new())),
129140
subscriptions: Arc::new(RwLock::new(HashMap::new())),
130141
listener,
131142
};
@@ -146,61 +157,101 @@ impl SubscriptionManager {
146157
&self,
147158
store_events: Box<dyn Stream<Item = StoreEvent, Error = ()> + Send>,
148159
) {
149-
let subscriptions = self.subscriptions.clone();
160+
let subscriptions = self.subscriptions.cheap_clone();
161+
let subscriptions_no_payload = self.subscriptions_no_payload.cheap_clone();
150162
let mut store_events = store_events.compat();
151163

152164
// This channel is constantly receiving things and there are locks involved,
153165
// so it's best to use a blocking task.
154166
graph::spawn_blocking(async move {
155167
while let Some(Ok(event)) = store_events.next().await {
156-
let senders = subscriptions.read().unwrap().clone();
157168
let event = Arc::new(event);
158169

159-
// Write change to all matching subscription streams; remove subscriptions
160-
// whose receiving end has been dropped
161-
for (id, (_, sender)) in senders
162-
.iter()
163-
.filter(|(_, (filter, _))| event.matches(filter))
170+
// Send to `subscriptions`.
164171
{
165-
if sender.send(event.cheap_clone()).await.is_err() {
166-
// Receiver was dropped
167-
subscriptions.write().unwrap().remove(id);
172+
let senders = subscriptions.read().unwrap().clone();
173+
174+
// Write change to all matching subscription streams; remove subscriptions
175+
// whose receiving end has been dropped
176+
for (id, (_, sender)) in senders
177+
.iter()
178+
.filter(|(_, (filter, _))| event.matches(filter))
179+
{
180+
if sender.send(event.cheap_clone()).await.is_err() {
181+
// Receiver was dropped
182+
subscriptions.write().unwrap().remove(id);
183+
}
184+
}
185+
}
186+
187+
// Send to `subscriptions_no_payload`.
188+
{
189+
let watchers = subscriptions_no_payload.lock();
190+
191+
// Write change to all matching subscription streams
192+
for (_, watcher) in watchers.iter().filter(|(filter, _)| event.matches(filter))
193+
{
194+
watcher.send(());
168195
}
169196
}
170197
}
171198
});
172199
}
173200

174201
fn periodically_clean_up_stale_subscriptions(&self) {
175-
let subscriptions = self.subscriptions.clone();
202+
let subscriptions = self.subscriptions.cheap_clone();
203+
let subscriptions_no_payload = self.subscriptions_no_payload.cheap_clone();
176204

177205
// Clean up stale subscriptions every 5s
178206
graph::spawn(async move {
179207
let mut interval = tokio::time::interval(Duration::from_secs(5));
180208
loop {
181209
interval.tick().await;
182-
let mut subscriptions = subscriptions.write().unwrap();
183-
184-
// Obtain IDs of subscriptions whose receiving end has gone
185-
let stale_ids = subscriptions
186-
.iter_mut()
187-
.filter_map(|(id, (_, sender))| match sender.is_closed() {
188-
true => Some(id.clone()),
189-
false => None,
190-
})
191-
.collect::<Vec<_>>();
192-
193-
// Remove all stale subscriptions
194-
for id in stale_ids {
195-
subscriptions.remove(&id);
210+
211+
// Cleanup `subscriptions`.
212+
{
213+
let mut subscriptions = subscriptions.write().unwrap();
214+
215+
// Obtain IDs of subscriptions whose receiving end has gone
216+
let stale_ids = subscriptions
217+
.iter_mut()
218+
.filter_map(|(id, (_, sender))| match sender.is_closed() {
219+
true => Some(id.clone()),
220+
false => None,
221+
})
222+
.collect::<Vec<_>>();
223+
224+
// Remove all stale subscriptions
225+
for id in stale_ids {
226+
subscriptions.remove(&id);
227+
}
228+
}
229+
230+
// Cleanup `subscriptions_no_payload`.
231+
{
232+
let mut subscriptions = subscriptions_no_payload.lock();
233+
234+
// Obtain IDs of subscriptions whose receiving end has gone
235+
let stale_ids = subscriptions
236+
.iter_mut()
237+
.filter_map(|(id, watcher)| match watcher.receiver_count() == 0 {
238+
true => Some(id.clone()),
239+
false => None,
240+
})
241+
.collect::<Vec<_>>();
242+
243+
// Remove all stale subscriptions
244+
for id in stale_ids {
245+
subscriptions.remove(&id);
246+
}
196247
}
197248
}
198249
});
199250
}
200251
}
201252

202253
impl SubscriptionManagerTrait for SubscriptionManager {
203-
fn subscribe(&self, entities: Vec<SubscriptionFilter>) -> StoreEventStreamBox {
254+
fn subscribe(&self, entities: BTreeSet<SubscriptionFilter>) -> StoreEventStreamBox {
204255
let id = Uuid::new_v4().to_string();
205256

206257
// Prepare the new subscription by creating a channel and a subscription object
@@ -210,23 +261,18 @@ impl SubscriptionManagerTrait for SubscriptionManager {
210261
self.subscriptions
211262
.write()
212263
.unwrap()
213-
.insert(id, (Arc::new(entities.clone()), Arc::new(sender)));
264+
.insert(id, (Arc::new(entities.clone()), sender));
214265

215266
// Return the subscription ID and entity change stream
216267
StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat()))
217268
.filter_by_entities(entities)
218269
}
219270

220-
fn subscribe_no_payload(&self, entities: Vec<SubscriptionFilter>) -> UnitStream {
221-
let id = Uuid::new_v4().to_string();
222-
223-
let (sender, receiver) = watch::channel(());
224-
225-
self.subscriptions
226-
.write()
227-
.unwrap()
228-
.insert(id, (Arc::new(entities.clone()), Arc::new(sender)));
229-
230-
Box::new(tokio_stream::wrappers::WatchStream::new(receiver))
271+
fn subscribe_no_payload(&self, entities: BTreeSet<SubscriptionFilter>) -> UnitStream {
272+
self.subscriptions_no_payload
273+
.lock()
274+
.entry(entities)
275+
.or_insert_with(|| Watcher::new(()))
276+
.stream()
231277
}
232278
}

store/postgres/tests/store.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -975,10 +975,11 @@ fn subscribe(
975975
subgraph: &DeploymentHash,
976976
entity_type: &str,
977977
) -> StoreEventStream<impl Stream<Item = Arc<StoreEvent>, Error = ()> + Send> {
978-
let subscription = SUBSCRIPTION_MANAGER.subscribe(vec![SubscriptionFilter::Entities(
979-
subgraph.clone(),
980-
EntityType::new(entity_type.to_owned()),
981-
)]);
978+
let subscription =
979+
SUBSCRIPTION_MANAGER.subscribe(FromIterator::from_iter([SubscriptionFilter::Entities(
980+
subgraph.clone(),
981+
EntityType::new(entity_type.to_owned()),
982+
)]));
982983

983984
StoreEventStream::new(subscription)
984985
}

0 commit comments

Comments
 (0)