Skip to content

Commit a0ed7b2

Browse files
authored
perf: create allocations in parallel (#431)
1 parent f0c7d03 commit a0ed7b2

File tree

3 files changed

+19
-12
lines changed

3 files changed

+19
-12
lines changed

tap-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-pr
4141
ractor = { version = "0.9", features = [
4242
"async-trait",
4343
], default-features = false }
44+
futures = { version = "0.3.30", default-features = false }
4445

4546
[dev-dependencies]
4647
tempfile = "3.8.0"
4748
wiremock = "0.6.1"
48-
futures = { version = "0.3.30", default-features = false }

tap-agent/src/agent/sender_account.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use alloy::primitives::U256;
77
use bigdecimal::num_bigint::ToBigInt;
88
use bigdecimal::ToPrimitive;
99

10+
use futures::{stream, StreamExt};
1011
use graphql_client::GraphQLQuery;
1112
use jsonrpsee::http_client::HttpClientBuilder;
1213
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
@@ -580,12 +581,14 @@ impl Actor for SenderAccount {
580581
config,
581582
};
582583

583-
for allocation_id in &allocation_ids {
584+
stream::iter(allocation_ids)
584585
// Create a sender allocation for each allocation
585-
state
586-
.create_sender_allocation(myself.clone(), *allocation_id)
587-
.await?;
588-
}
586+
.map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id))
587+
.buffered(10) // Limit concurrency to 10 allocations at a time
588+
.collect::<Vec<anyhow::Result<()>>>()
589+
.await
590+
.into_iter()
591+
.collect::<anyhow::Result<Vec<()>>>()?;
589592

590593
tracing::info!(sender = %sender_id, "SenderAccount created!");
591594
Ok(state)

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use alloy::primitives::Address;
1212
use anyhow::Result;
1313
use anyhow::{anyhow, bail};
1414
use eventuals::{Eventual, EventualExt, PipeHandle};
15+
use futures::{stream, StreamExt};
1516
use indexer_common::escrow_accounts::EscrowAccounts;
1617
use indexer_common::prelude::{Allocation, SubgraphClient};
1718
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
@@ -160,12 +161,15 @@ impl Actor for SenderAccountsManager {
160161
}
161162
};
162163

163-
for (sender_id, allocation_ids) in sender_allocation {
164-
state.sender_ids.insert(sender_id);
165-
state
166-
.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids)
167-
.await;
168-
}
164+
state.sender_ids.extend(sender_allocation.keys());
165+
166+
stream::iter(sender_allocation)
167+
.map(|(sender_id, allocation_ids)| {
168+
state.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids)
169+
})
170+
.buffered(10) // Limit concurrency to 10 senders at a time
171+
.collect::<Vec<()>>()
172+
.await;
169173

170174
// Start the new_receipts_watcher task that will consume from the `pglistener`
171175
// after starting all senders

0 commit comments

Comments
 (0)