Skip to content

Commit 881d078

Browse files
committed
chain head listener: workaround watcher deadlock
1 parent f473895 commit 881d078

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

store/postgres/src/chain_head_listener.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use graph::{
77
prometheus::{CounterVec, GaugeVec},
88
util::timed_rw_lock::TimedRwLock,
99
};
10-
use std::str::FromStr;
11-
use std::sync::Arc;
10+
use std::sync::{atomic::AtomicBool, Arc};
1211
use std::{collections::BTreeMap, time::Duration};
12+
use std::{str::FromStr, sync::atomic};
1313

1414
use lazy_static::lazy_static;
1515

@@ -38,16 +38,20 @@ lazy_static! {
3838
}
3939

4040
struct Watcher {
41-
sender: watch::Sender<()>,
41+
sender: Arc<watch::Sender<()>>,
4242
receiver: watch::Receiver<()>,
4343
}
4444

4545
impl Watcher {
4646
fn new() -> Self {
4747
let (sender, receiver) = watch::channel(());
48-
Watcher { sender, receiver }
48+
Watcher {
49+
sender: Arc::new(sender),
50+
receiver,
51+
}
4952
}
5053

54+
#[allow(dead_code)]
5155
fn send(&self) {
5256
// Unwrap: `self` holds a receiver.
5357
self.sender.send(()).unwrap()
@@ -147,6 +151,7 @@ impl ChainHeadUpdateListener {
147151
) {
148152
// Process chain head updates in a dedicated task
149153
graph::spawn(async move {
154+
let sending_to_watcher = Arc::new(AtomicBool::new(false));
150155
while let Some(notification) = receiver.recv().await {
151156
// Create ChainHeadUpdate from JSON
152157
let update: ChainHeadUpdate =
@@ -173,9 +178,19 @@ impl ChainHeadUpdateListener {
173178

174179
// If there are subscriptions for this network, notify them.
175180
if let Some(watcher) = watchers.read(&logger).get(&update.network_name) {
176-
debug!(logger, "sending chain head update"; "network" => &update.network_name);
177-
watcher.send();
178-
debug!(logger, "chain head update sent"; "network" => &update.network_name);
181+
// Due to a tokio bug, we must assume that the watcher can deadlock, see
182+
// https://github.com/tokio-rs/tokio/issues/4246.
183+
if !sending_to_watcher.load(atomic::Ordering::SeqCst) {
184+
let sending_to_watcher = sending_to_watcher.cheap_clone();
185+
let sender = watcher.sender.cheap_clone();
186+
tokio::task::spawn_blocking(move || {
187+
sending_to_watcher.store(true, atomic::Ordering::SeqCst);
188+
sender.send(()).unwrap();
189+
sending_to_watcher.store(false, atomic::Ordering::SeqCst);
190+
});
191+
} else {
192+
debug!(logger, "skipping chain head update, watcher is deadlocked"; "network" => &update.network_name);
193+
}
179194
}
180195
}
181196
});

0 commit comments

Comments
 (0)