Skip to content

Commit 1b321c6

Browse files
committed
perf: create allocations in parallel
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f0c7d03 commit 1b321c6

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use alloy::primitives::U256;
77
use bigdecimal::num_bigint::ToBigInt;
88
use bigdecimal::ToPrimitive;
99

10+
use futures::{stream, StreamExt};
1011
use graphql_client::GraphQLQuery;
1112
use jsonrpsee::http_client::HttpClientBuilder;
1213
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
@@ -580,12 +581,14 @@ impl Actor for SenderAccount {
580581
config,
581582
};
582583

583-
for allocation_id in &allocation_ids {
584+
stream::iter(allocation_ids)
584585
// Create a sender allocation for each allocation
585-
state
586-
.create_sender_allocation(myself.clone(), *allocation_id)
587-
.await?;
588-
}
586+
.map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id))
587+
.buffered(10) // Limit concurrency to 10 allocations at a time
588+
.collect::<Vec<anyhow::Result<()>>>()
589+
.await
590+
.into_iter()
591+
.collect::<anyhow::Result<Vec<()>>>()?;
589592

590593
tracing::info!(sender = %sender_id, "SenderAccount created!");
591594
Ok(state)

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use std::collections::HashSet;
5+
use std::sync::Arc;
56
use std::time::Duration;
67
use std::{collections::HashMap, str::FromStr};
78

@@ -12,6 +13,7 @@ use alloy::primitives::Address;
1213
use anyhow::Result;
1314
use anyhow::{anyhow, bail};
1415
use eventuals::{Eventual, EventualExt, PipeHandle};
16+
use futures::{stream, StreamExt};
1517
use indexer_common::escrow_accounts::EscrowAccounts;
1618
use indexer_common::prelude::{Allocation, SubgraphClient};
1719
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
@@ -160,12 +162,15 @@ impl Actor for SenderAccountsManager {
160162
}
161163
};
162164

163-
for (sender_id, allocation_ids) in sender_allocation {
164-
state.sender_ids.insert(sender_id);
165-
state
166-
.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids)
167-
.await;
168-
}
165+
state.sender_ids.extend(sender_allocation.keys());
166+
167+
stream::iter(sender_allocation)
168+
.map(|(sender_id, allocation_ids)| {
169+
state.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids)
170+
})
171+
.buffered(10) // Limit concurrency to 10 senders at a time
172+
.collect::<Vec<()>>()
173+
.await;
169174

170175
// Start the new_receipts_watcher task that will consume from the `pglistener`
171176
// after starting all senders

0 commit comments

Comments
 (0)