diff --git a/tap-agent/Cargo.toml b/tap-agent/Cargo.toml index 44ea98f83..1fa51edf8 100644 --- a/tap-agent/Cargo.toml +++ b/tap-agent/Cargo.toml @@ -41,8 +41,8 @@ tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-pr ractor = { version = "0.9", features = [ "async-trait", ], default-features = false } +futures = { version = "0.3.30", default-features = false } [dev-dependencies] tempfile = "3.8.0" wiremock = "0.6.1" -futures = { version = "0.3.30", default-features = false } diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 3115eb222..d050a4cdd 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -7,6 +7,7 @@ use alloy::primitives::U256; use bigdecimal::num_bigint::ToBigInt; use bigdecimal::ToPrimitive; +use futures::{stream, StreamExt}; use graphql_client::GraphQLQuery; use jsonrpsee::http_client::HttpClientBuilder; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; @@ -580,12 +581,14 @@ impl Actor for SenderAccount { config, }; - for allocation_id in &allocation_ids { + stream::iter(allocation_ids) // Create a sender allocation for each allocation - state - .create_sender_allocation(myself.clone(), *allocation_id) - .await?; - } + .map(|allocation_id| state.create_sender_allocation(myself.clone(), allocation_id)) + .buffered(10) // Limit concurrency to 10 allocations at a time + .collect::>>() + .await + .into_iter() + .collect::>>()?; tracing::info!(sender = %sender_id, "SenderAccount created!"); Ok(state) diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 7bb4bafe0..693430850 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -12,6 +12,7 @@ use alloy::primitives::Address; use anyhow::Result; use anyhow::{anyhow, bail}; use eventuals::{Eventual, EventualExt, PipeHandle}; +use futures::{stream, StreamExt}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; @@ -160,12 +161,15 @@ impl Actor for SenderAccountsManager { } }; - for (sender_id, allocation_ids) in sender_allocation { - state.sender_ids.insert(sender_id); - state - .create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids) - .await; - } + state.sender_ids.extend(sender_allocation.keys()); + + stream::iter(sender_allocation) + .map(|(sender_id, allocation_ids)| { + state.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids) + }) + .buffered(10) // Limit concurrency to 10 senders at a time + .collect::>() + .await; // Start the new_receipts_watcher task that will consume from the `pglistener` // after starting all senders