Skip to content

Commit 5598055

Browse files
committed
refactor: merge join and map watcher functions
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 8c90e52 commit 5598055

File tree

2 files changed

+26
-47
lines changed

2 files changed

+26
-47
lines changed

common/src/attestations/signers.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tracing::warn;
1010

1111
use crate::{
1212
prelude::{Allocation, AttestationSigner},
13-
watcher::{join_watcher, map_watcher},
13+
watcher::join_and_map_watcher,
1414
};
1515

1616
/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
@@ -24,18 +24,20 @@ pub fn attestation_signers(
2424
Box::leak(Box::new(Mutex::new(HashMap::new())));
2525
let indexer_mnemonic = Arc::new(indexer_mnemonic.to_string());
2626

27-
let joined_watcher = join_watcher(indexer_allocations_rx, dispute_manager_rx);
28-
29-
map_watcher(joined_watcher, move |(allocation, dispute)| {
30-
let indexer_mnemonic = indexer_mnemonic.clone();
31-
modify_sigers(
32-
&indexer_mnemonic,
33-
chain_id,
34-
attestation_signers_map,
35-
&allocation,
36-
&dispute,
37-
)
38-
})
27+
join_and_map_watcher(
28+
indexer_allocations_rx,
29+
dispute_manager_rx,
30+
move |(allocation, dispute)| {
31+
let indexer_mnemonic = indexer_mnemonic.clone();
32+
modify_sigers(
33+
&indexer_mnemonic,
34+
chain_id,
35+
attestation_signers_map,
36+
&allocation,
37+
&dispute,
38+
)
39+
},
40+
)
3941
}
4042
fn modify_sigers(
4143
indexer_mnemonic: &str,

common/src/watcher.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
54
//! This is a module that reimplements eventuals using
65
//! tokio::watch module and fixing some problems that eventuals
76
//! usually carry like initializing things without initializing
@@ -32,14 +31,13 @@ where
3231
let (tx, rx) = watch::channel(initial_value);
3332

3433
tokio::spawn(async move {
35-
// Refresh indexer allocations every now and then
3634
let mut time_interval = time::interval(interval);
3735
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
3836
loop {
3937
time_interval.tick().await;
4038
let result = function().await;
4139
match result {
42-
Ok(allocations) => tx.send(allocations).expect("Failed to update channel"),
40+
Ok(value) => tx.send(value).expect("Failed to update channel"),
4341
Err(err) => {
4442
// TODO mark it as delayed
4543
warn!(error = %err, "There was an error while updating watcher");
@@ -52,17 +50,20 @@ where
5250
Ok(rx)
5351
}
5452

55-
5653
/// Join two watch::Receiver
57-
pub fn join_watcher<T1, T2>(
54+
pub fn join_and_map_watcher<T1, T2, T3, F>(
5855
mut receiver_1: watch::Receiver<T1>,
5956
mut receiver_2: watch::Receiver<T2>,
60-
) -> watch::Receiver<(T1, T2)>
57+
map_function: F,
58+
) -> watch::Receiver<T3>
6159
where
6260
T1: Clone + Send + Sync + 'static,
6361
T2: Clone + Send + Sync + 'static,
62+
T3: Send + Sync + 'static,
63+
F: Fn((T1, T2)) -> T3 + Send + 'static,
6464
{
65-
let (tx, rx) = watch::channel((receiver_1.borrow().clone(), receiver_2.borrow().clone()));
65+
let initial_value = map_function((receiver_1.borrow().clone(), receiver_2.borrow().clone()));
66+
let (tx, rx) = watch::channel(initial_value);
6667

6768
tokio::spawn(async move {
6869
loop {
@@ -74,34 +75,10 @@ where
7475
panic!("receiver_1 or receiver_2 was dropped");
7576
}
7677
}
77-
tx.send((receiver_1.borrow().clone(), receiver_2.borrow().clone()))
78-
.expect("Failed to update signers channel");
79-
}
80-
});
81-
rx
82-
}
8378

84-
/// Maps a watch::Receiver into another type
85-
pub fn map_watcher<T1, T2, F>(
86-
mut receiver: watch::Receiver<T1>,
87-
map_function: F,
88-
) -> watch::Receiver<T2>
89-
where
90-
F: Fn(T1) -> T2 + Send + 'static,
91-
T1: Default + Clone + Sync + Send + 'static,
92-
T2: Sync + Send + 'static,
93-
{
94-
let initial_value = map_function(receiver.borrow().clone());
95-
let (tx, rx) = watch::channel(initial_value);
96-
97-
tokio::spawn(async move {
98-
loop {
99-
match receiver.changed().await {
100-
Ok(_) => {}
101-
Err(_) => panic!("reciever was dropped"),
102-
}
103-
let current_val = receiver.borrow().clone();
104-
let mapped_value = map_function(current_val);
79+
let current_val_1 = receiver_1.borrow().clone();
80+
let current_val_2 = receiver_2.borrow().clone();
81+
let mapped_value = map_function((current_val_1, current_val_2));
10582
tx.send(mapped_value).expect("Failed to update channel");
10683
}
10784
});

0 commit comments

Comments
 (0)