Skip to content

Commit 1a8c3ef

Browse files
committed
chain head listener: Used timed rwlock
1 parent f2f0520 commit 1a8c3ef

File tree

2 files changed

+37
-26
lines changed

2 files changed

+37
-26
lines changed

graph/src/util/timed_rw_lock.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,35 @@ impl<T> TimedRwLock<T> {
3232
}
3333

3434
pub fn write(&self, logger: &Logger) -> parking_lot::RwLockWriteGuard<T> {
35-
let start = Instant::now();
36-
let guard = self.lock.write();
37-
let elapsed = start.elapsed();
38-
if elapsed > self.log_threshold {
39-
warn!(logger, "Write lock took a long time to acquire";
40-
"id" => &self.id,
41-
"wait_ms" => elapsed.as_millis(),
42-
);
35+
loop {
36+
let mut elapsed = Duration::from_secs(0);
37+
match self.lock.try_write_for(self.log_threshold) {
38+
Some(guard) => break guard,
39+
None => {
40+
elapsed += self.log_threshold;
41+
warn!(logger, "Write lock taking a long time to acquire";
42+
"id" => &self.id,
43+
"wait_ms" => elapsed.as_millis(),
44+
);
45+
}
46+
}
4347
}
44-
guard
4548
}
4649

4750
pub fn read(&self, logger: &Logger) -> parking_lot::RwLockReadGuard<T> {
48-
let start = Instant::now();
49-
let guard = self.lock.read();
50-
let elapsed = start.elapsed();
51-
if elapsed > self.log_threshold {
52-
warn!(logger, "Read lock took a long time to acquire";
53-
"id" => &self.id,
54-
"wait_ms" => elapsed.as_millis(),
55-
);
51+
loop {
52+
let mut elapsed = Duration::from_secs(0);
53+
match self.lock.try_read_for(self.log_threshold) {
54+
Some(guard) => break guard,
55+
None => {
56+
elapsed += self.log_threshold;
57+
warn!(logger, "Read lock taking a long time to acquire";
58+
"id" => &self.id,
59+
"wait_ms" => elapsed.as_millis(),
60+
);
61+
}
62+
}
5663
}
57-
guard
5864
}
5965
}
6066

store/postgres/src/chain_head_listener.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use graph::{
22
blockchain::ChainHeadUpdateStream,
3-
parking_lot::RwLock,
43
prelude::{
54
futures03::{self, FutureExt},
65
tokio, StoreError,
76
},
87
prometheus::{CounterVec, GaugeVec},
8+
util::timed_rw_lock::TimedRwLock,
99
};
1010
use std::str::FromStr;
1111
use std::sync::Arc;
@@ -87,7 +87,7 @@ struct ChainHeadUpdate {
8787

8888
pub struct ChainHeadUpdateListener {
8989
/// Update watchers keyed by network.
90-
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
90+
watchers: Arc<TimedRwLock<BTreeMap<String, Watcher>>>,
9191
_listener: NotificationListener,
9292
}
9393

@@ -113,7 +113,10 @@ impl ChainHeadUpdateListener {
113113
// Create a Postgres notification listener for chain head updates
114114
let (mut listener, receiver) =
115115
NotificationListener::new(&logger, postgres_url, CHANNEL_NAME.clone());
116-
let watchers = Arc::new(RwLock::new(BTreeMap::new()));
116+
let watchers = Arc::new(TimedRwLock::new(
117+
BTreeMap::new(),
118+
"chain_head_listener_watchers",
119+
));
117120

118121
Self::listen(
119122
logger,
@@ -139,7 +142,7 @@ impl ChainHeadUpdateListener {
139142
metrics: Arc<BlockIngestorMetrics>,
140143
listener: &mut NotificationListener,
141144
mut receiver: Receiver<JsonNotification>,
142-
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
145+
watchers: Arc<TimedRwLock<BTreeMap<String, Watcher>>>,
143146
counter: CounterVec,
144147
) {
145148
// Process chain head updates in a dedicated task
@@ -169,8 +172,10 @@ impl ChainHeadUpdateListener {
169172
.set_chain_head_number(&update.network_name, *&update.head_block_number as i64);
170173

171174
// If there are subscriptions for this network, notify them.
172-
if let Some(watcher) = watchers.read().get(&update.network_name) {
173-
watcher.send()
175+
if let Some(watcher) = watchers.read(&logger).get(&update.network_name) {
176+
debug!(logger, "sending chain head update"; "network" => &update.network_name);
177+
watcher.send();
178+
debug!(logger, "chain head update sent"; "network" => &update.network_name);
174179
}
175180
}
176181
});
@@ -186,7 +191,7 @@ impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
186191

187192
let update_receiver = {
188193
let existing = {
189-
let watchers = self.watchers.read();
194+
let watchers = self.watchers.read(&logger);
190195
watchers.get(&network_name).map(|w| w.receiver.clone())
191196
};
192197

@@ -199,7 +204,7 @@ impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
199204
// Race condition: Another task could have simoultaneously entered this branch and
200205
// inserted a writer, so we should check the entry again after acquiring the lock.
201206
self.watchers
202-
.write()
207+
.write(&logger)
203208
.entry(network_name)
204209
.or_insert_with(|| Watcher::new())
205210
.receiver

0 commit comments

Comments
 (0)