Skip to content

Commit 6ec3157

Browse files
JannisJannis Pohlmann
authored andcommitted
datasource, graph, mock, store: Use shared ChainHeadUpdateListener
This fixes #889 by using a single Postgres connection for the chain head update streams of all subgraph deployments.
1 parent f0afa91 commit 6ec3157

File tree

8 files changed

+130
-75
lines changed

8 files changed

+130
-75
lines changed

datasource/ethereum/src/block_stream.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use futures::prelude::*;
22
use futures::sync::mpsc::{channel, Receiver, Sender};
3-
use std;
43
use std::cmp;
54
use std::collections::HashSet;
65
use std::env;
@@ -1059,14 +1058,13 @@ where
10591058
// Create a chain head update stream whose lifetime is tied to the
10601059
// liftetime of the block stream; we do this to immediately terminate
10611060
// the chain head update listener when the block stream is shut down
1062-
let mut chain_head_update_listener = self.chain_store.chain_head_updates();
10631061
let cancel_guard = CancelGuard::new();
1064-
let chain_head_update_stream = chain_head_update_listener
1065-
.take_event_stream()
1066-
.unwrap()
1067-
.cancelable(&cancel_guard, move || {
1068-
debug!(logger_for_stream, "Terminating chain head updates");
1069-
});
1062+
let chain_head_update_stream =
1063+
self.chain_store
1064+
.chain_head_updates()
1065+
.cancelable(&cancel_guard, move || {
1066+
debug!(logger_for_stream, "Terminating chain head updates");
1067+
});
10701068

10711069
// Create the actual subgraph-specific block stream
10721070
let block_stream = BlockStream::new(
@@ -1090,13 +1088,6 @@ where
10901088
.map(|_| ()),
10911089
);
10921090

1093-
// Start listening for chain head updates
1094-
chain_head_update_listener.start();
1095-
1096-
// Leak the chain update listener; we'll terminate it by closing the
1097-
// block stream's chain head update sink
1098-
std::mem::forget(chain_head_update_listener);
1099-
11001091
block_stream
11011092
}
11021093
}

graph/src/components/ethereum/listener.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1+
use futures::Stream;
12
use serde::de::{Deserialize, Deserializer, Error as DeserializerError};
23
use std::str::FromStr;
34
use web3::types::H256;
45

5-
use crate::components::EventProducer;
6-
76
/// Deserialize an H256 hash (with or without '0x' prefix).
87
fn deserialize_h256<'de, D>(deserializer: D) -> Result<H256, D::Error>
98
where
@@ -22,7 +21,9 @@ pub struct ChainHeadUpdate {
2221
pub head_block_number: u64,
2322
}
2423

25-
pub trait ChainHeadUpdateListener: EventProducer<ChainHeadUpdate> {
26-
/// Begin processing notifications coming in from Postgres.
27-
fn start(&mut self);
24+
pub type ChainHeadUpdateStream = Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>;
25+
26+
pub trait ChainHeadUpdateListener {
27+
// Subscribe to chain head updates.
28+
fn subscribe(&self) -> ChainHeadUpdateStream;
2829
}

graph/src/components/ethereum/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub use self::adapter::{
88
EthereumContractState, EthereumContractStateError, EthereumContractStateRequest,
99
EthereumLogFilter, EthereumNetworkIdentifier,
1010
};
11-
pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener};
11+
pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener, ChainHeadUpdateStream};
1212
pub use self::stream::{BlockStream, BlockStreamBuilder};
1313
pub use self::types::{
1414
EthereumBlock, EthereumBlockData, EthereumBlockPointer, EthereumEventData,

graph/src/components/store.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use failure::Error;
22
use futures::stream::poll_fn;
3-
use futures::Future;
4-
use futures::Stream;
5-
use futures::{Async, Poll};
3+
use futures::{Async, Future, Poll, Stream};
64
use std::collections::HashSet;
75
use std::env;
86
use std::fmt;
@@ -1017,7 +1015,7 @@ pub trait ChainStore: Send + Sync + 'static {
10171015
fn attempt_chain_head_update(&self, ancestor_count: u64) -> Result<Vec<H256>, Error>;
10181016

10191017
/// Subscribe to chain head updates.
1020-
fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener;
1018+
fn chain_head_updates(&self) -> ChainHeadUpdateStream;
10211019

10221020
/// Get the current head block pointer for this chain.
10231021
/// Any changes to the head block pointer will be to a block with a larger block number, never

graph/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ pub mod prelude {
6060
pub use tokio::prelude::*;
6161

6262
pub use crate::components::ethereum::{
63-
BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener, EthereumAdapter,
64-
EthereumAdapterError, EthereumBlock, EthereumBlockData, EthereumBlockPointer,
65-
EthereumContractCall, EthereumContractCallError, EthereumEventData, EthereumLogFilter,
66-
EthereumNetworkIdentifier, EthereumTransactionData,
63+
BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener,
64+
ChainHeadUpdateStream, EthereumAdapter, EthereumAdapterError, EthereumBlock,
65+
EthereumBlockData, EthereumBlockPointer, EthereumContractCall, EthereumContractCallError,
66+
EthereumEventData, EthereumLogFilter, EthereumNetworkIdentifier, EthereumTransactionData,
6767
};
6868
pub use crate::components::graphql::{
6969
GraphQlRunner, QueryResultFuture, SubscriptionResultFuture,

mock/src/store.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@ use graph_graphql::prelude::api_schema;
1515
pub struct MockChainHeadUpdateListener {}
1616

1717
impl ChainHeadUpdateListener for MockChainHeadUpdateListener {
18-
fn start(&mut self) {}
19-
}
20-
21-
impl EventProducer<ChainHeadUpdate> for MockChainHeadUpdateListener {
22-
fn take_event_stream(
23-
&mut self,
24-
) -> Option<Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>> {
18+
fn subscribe(&self) -> ChainHeadUpdateStream {
2519
unimplemented!();
2620
}
2721
}
@@ -385,7 +379,7 @@ impl ChainStore for MockStore {
385379
unimplemented!();
386380
}
387381

388-
fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
382+
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
389383
unimplemented!();
390384
}
391385

@@ -504,7 +498,7 @@ impl ChainStore for FakeStore {
504498
unimplemented!();
505499
}
506500

507-
fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
501+
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
508502
unimplemented!();
509503
}
510504

Lines changed: 99 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,60 @@
1-
use crate::notification_listener::{NotificationListener, SafeChannelName};
1+
use futures::sync::mpsc::{channel, Sender};
2+
use std::collections::HashMap;
3+
use std::sync::{Arc, RwLock};
4+
use uuid::Uuid;
5+
26
use graph::prelude::{ChainHeadUpdateListener as ChainHeadUpdateListenerTrait, *};
37
use graph::serde_json;
48

9+
use crate::notification_listener::{NotificationListener, SafeChannelName};
10+
11+
type ChainHeadUpdateSubscribers = Arc<RwLock<HashMap<String, Sender<ChainHeadUpdate>>>>;
12+
513
pub struct ChainHeadUpdateListener {
6-
notification_listener: NotificationListener,
7-
network_name: String,
14+
logger: Logger,
15+
subscribers: ChainHeadUpdateSubscribers,
16+
_listener: NotificationListener,
817
}
918

1019
impl ChainHeadUpdateListener {
1120
pub fn new(logger: &Logger, postgres_url: String, network_name: String) -> Self {
21+
let logger = logger.new(o!("component" => "ChainHeadUpdateListener"));
22+
let subscribers = Arc::new(RwLock::new(HashMap::new()));
23+
24+
// Create a Postgres notification listener for chain head updates
25+
let mut listener = NotificationListener::new(
26+
&logger,
27+
postgres_url,
28+
SafeChannelName::i_promise_this_is_safe("chain_head_updates"),
29+
);
30+
31+
Self::listen(&logger, &mut listener, network_name, subscribers.clone());
32+
1233
ChainHeadUpdateListener {
13-
notification_listener: NotificationListener::new(
14-
logger,
15-
postgres_url,
16-
SafeChannelName::i_promise_this_is_safe("chain_head_updates"),
17-
),
18-
network_name,
19-
}
20-
}
21-
}
34+
logger,
35+
subscribers,
2236

23-
impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
24-
fn start(&mut self) {
25-
self.notification_listener.start()
37+
// We keep the listener around to tie its stream's lifetime to
38+
// that of the chain head update listener and prevent it from
39+
// terminating early
40+
_listener: listener,
41+
}
2642
}
27-
}
2843

29-
impl EventProducer<ChainHeadUpdate> for ChainHeadUpdateListener {
30-
fn take_event_stream(
31-
&mut self,
32-
) -> Option<Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>> {
33-
let network_name = self.network_name.clone();
44+
fn listen(
45+
logger: &Logger,
46+
listener: &mut NotificationListener,
47+
network_name: String,
48+
subscribers: ChainHeadUpdateSubscribers,
49+
) {
50+
let logger = logger.clone();
3451

35-
self.notification_listener.take_event_stream().map(
36-
move |stream| -> Box<Stream<Item = _, Error = _> + Send> {
37-
Box::new(stream.filter_map(move |notification| {
52+
// Process chain head updates in a dedicated task
53+
tokio::spawn(
54+
listener
55+
.take_event_stream()
56+
.unwrap()
57+
.filter_map(move |notification| {
3858
// Create ChainHeadUpdate from JSON
3959
let update: ChainHeadUpdate =
4060
serde_json::from_value(notification.payload.clone()).unwrap_or_else(|_| {
@@ -44,14 +64,65 @@ impl EventProducer<ChainHeadUpdate> for ChainHeadUpdateListener {
4464
)
4565
});
4666

47-
// Only include update if about the right network
67+
// Only include update if it is for the network we're interested in
4868
if update.network_name == network_name {
4969
Some(update)
5070
} else {
5171
None
5272
}
53-
}))
54-
},
55-
)
73+
})
74+
.for_each(move |update| {
75+
let logger = logger.clone();
76+
let senders = subscribers.read().unwrap().clone();
77+
let subscribers = subscribers.clone();
78+
79+
debug!(
80+
logger,
81+
"Received chain head update";
82+
"network" => &update.network_name,
83+
"head_block_hash" => format!("{}", update.head_block_hash),
84+
"head_block_number" => &update.head_block_number,
85+
);
86+
87+
// Forward update to all susbcribers
88+
stream::iter_ok::<_, ()>(senders).for_each(move |(id, sender)| {
89+
let logger = logger.clone();
90+
let subscribers = subscribers.clone();
91+
92+
sender.send(update.clone()).then(move |result| {
93+
if result.is_err() {
94+
// If sending to a subscriber fails, we'll assume that
95+
// the receiving end has been dropped. In this case we
96+
// remove the subscriber
97+
debug!(logger, "Unsubscribe"; "id" => &id);
98+
subscribers.write().unwrap().remove(&id);
99+
}
100+
101+
// Move on to the next subscriber
102+
Ok(())
103+
})
104+
})
105+
}),
106+
);
107+
108+
// We're ready, start listening to chain head updaates
109+
listener.start();
110+
}
111+
}
112+
113+
impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
114+
fn subscribe(&self) -> ChainHeadUpdateStream {
115+
// Generate a new (unique) UUID; we're looping just to be sure we avoid collisions
116+
let mut id = Uuid::new_v4().to_string();
117+
while self.subscribers.read().unwrap().contains_key(&id) {
118+
id = Uuid::new_v4().to_string();
119+
}
120+
121+
debug!(self.logger, "Subscribe"; "id" => &id);
122+
123+
// Create a subscriber and return the receiving end
124+
let (sender, receiver) = channel(100);
125+
self.subscribers.write().unwrap().insert(id, sender);
126+
Box::new(receiver)
56127
}
57128
}

store/postgres/src/store.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use uuid::Uuid;
1212
use crate::notification_listener::JsonNotification;
1313
use graph::components::store::Store as StoreTrait;
1414
use graph::data::subgraph::schema::*;
15-
use graph::prelude::*;
15+
use graph::prelude::{ChainHeadUpdateListener as _, *};
1616
use graph::serde_json;
1717
use graph::web3::types::H256;
1818
use graph::{tokio, tokio::timer::Interval};
@@ -61,7 +61,7 @@ pub struct Store {
6161
subscriptions: Arc<RwLock<HashMap<String, Sender<StoreEvent>>>>,
6262
// listen to StoreEvents emitted by emit_store_events
6363
listener: StoreEventListener,
64-
postgres_url: String,
64+
chain_head_update_listener: ChainHeadUpdateListener,
6565
network_name: String,
6666
genesis_block_ptr: EthereumBlockPointer,
6767
conn: Pool<ConnectionManager<PgConnection>>,
@@ -108,7 +108,11 @@ impl Store {
108108
logger: logger.clone(),
109109
subscriptions: Arc::new(RwLock::new(HashMap::new())),
110110
listener,
111-
postgres_url: config.postgres_url.clone(),
111+
chain_head_update_listener: ChainHeadUpdateListener::new(
112+
&logger,
113+
config.postgres_url,
114+
config.network_name.clone(),
115+
),
112116
network_name: config.network_name.clone(),
113117
genesis_block_ptr: (net_identifiers.genesis_block_hash, 0u64).into(),
114118
conn: pool,
@@ -981,12 +985,8 @@ impl ChainStore for Store {
981985
.and_then(|r| r.map_err(Error::from))
982986
}
983987

984-
fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
985-
Self::ChainHeadUpdateListener::new(
986-
&self.logger,
987-
self.postgres_url.clone(),
988-
self.network_name.clone(),
989-
)
988+
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
989+
self.chain_head_update_listener.subscribe()
990990
}
991991

992992
fn chain_head_ptr(&self) -> Result<Option<EthereumBlockPointer>, Error> {

0 commit comments

Comments
 (0)