Skip to content

Commit 5b86e0a

Browse files
intilizing channel with immidiate values
1 parent 35a1776 commit 5b86e0a

File tree

1 file changed

+17
-22
lines changed

1 file changed

+17
-22
lines changed

common/src/attestations/signers.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,27 @@ pub async fn attestation_signers(
2525
) -> Receiver<HashMap<Address, AttestationSigner>> {
2626
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
2727
Box::leak(Box::new(Mutex::new(HashMap::new())));
28+
let (allocations_tx, mut allocations_rx) =
29+
watch::channel(indexer_allocations.value_immediate().unwrap_or_default());
30+
let starter_signers_map = modify_sigers(
31+
Arc::new(indexer_mnemonic.clone()),
32+
chain_id,
33+
attestation_signers_map,
34+
allocations_rx.clone(),
35+
dispute_manager_rx.clone(),
36+
)
37+
.await;
2838

2939
// Whenever the indexer's active or recently closed allocations change, make sure
3040
// we have attestation signers for all of them.
31-
let (signers_tx, signers_rx) = watch::channel(HashMap::new());
41+
let (signers_tx, signers_rx) = watch::channel(starter_signers_map);
3242

3343
tokio::spawn(async move {
34-
// Listening to the allocation eventual and converting them to reciever.
35-
// Using pipe for updation.
36-
// For temporary pupose only.
37-
let (allocations_tx, mut allocations_rx) =
38-
watch::channel(indexer_allocations.value().await.unwrap());
44+
// Actively listening to indexer_allocations to update allocations channel
45+
// Temporary fix until the indexer_allocations is migrated to tokio watch
3946
let _p1 = indexer_allocations.pipe(move |allocatons| {
40-
let _ = allocations_tx.send(allocatons);
47+
allocations_tx.send(allocatons).unwrap();
4148
});
42-
modify_sigers(
43-
Arc::new(indexer_mnemonic.clone()),
44-
chain_id,
45-
attestation_signers_map,
46-
allocations_rx.clone(),
47-
dispute_manager_rx.clone(),
48-
)
49-
.await;
5049
loop {
5150
let updated_signers = select! {
5251
Ok(())= allocations_rx.changed() =>{
@@ -124,26 +123,22 @@ mod tests {
124123
#[tokio::test]
125124
async fn test_attestation_signers_update_with_allocations() {
126125
let (mut allocations_writer, allocations) = Eventual::<HashMap<Address, Allocation>>::new();
127-
let (dispute_manager_writer, dispute_manager) = watch::channel(None);
128-
129-
dispute_manager_writer
126+
let (dispute_manager_tx, dispute_manager_rx) = watch::channel(None);
127+
dispute_manager_tx
130128
.send(Some(*DISPUTE_MANAGER_ADDRESS))
131129
.unwrap();
132-
133130
let mut signers = attestation_signers(
134131
allocations,
135132
(*INDEXER_OPERATOR_MNEMONIC).to_string(),
136133
1,
137-
dispute_manager,
134+
dispute_manager_rx,
138135
)
139136
.await;
140-
141137
// Test that an empty set of allocations leads to an empty set of signers
142138
allocations_writer.write(HashMap::new());
143139
signers.changed().await.unwrap();
144140
let latest_signers = signers.borrow().clone();
145141
assert_eq!(latest_signers, HashMap::new());
146-
147142
// Test that writing our set of test allocations results in corresponding signers for all of them
148143
allocations_writer.write((*INDEXER_ALLOCATIONS).clone());
149144
signers.changed().await.unwrap();

0 commit comments

Comments
 (0)