Skip to content

Commit 834f8ba

Browse files
authored
feat(torii-indexer): configurable chunked batch requests (#37)
* feat(torii-indexer): configurable chunked batch requests * better error * fmt * c
1 parent 229101d commit 834f8ba

File tree

3 files changed

+47
-4
lines changed

3 files changed

+47
-4
lines changed

crates/cli/src/options.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub const DEFAULT_METRICS_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
1616
pub const DEFAULT_METRICS_PORT: u16 = 9200;
1717
pub const DEFAULT_EVENTS_CHUNK_SIZE: u64 = 1024;
1818
pub const DEFAULT_BLOCKS_CHUNK_SIZE: u64 = 10240;
19+
pub const DEFAULT_BATCH_CHUNK_SIZE: usize = 1024;
1920
pub const DEFAULT_POLLING_INTERVAL: u64 = 500;
2021
pub const DEFAULT_MAX_CONCURRENT_TASKS: usize = 100;
2122
pub const DEFAULT_RELAY_PORT: u16 = 9090;
@@ -189,13 +190,22 @@ pub struct IndexingOptions {
189190
help = "Whether or not to read models from the block number they were registered in."
190191
)]
191192
pub strict_model_reader: bool,
193+
194+
/// The chunk size to use for batch requests.
195+
#[arg(
196+
long = "indexing.batch_chunk_size",
197+
default_value_t = DEFAULT_BATCH_CHUNK_SIZE,
198+
help = "The chunk size to use for batch requests. This is used to split the requests into smaller chunks to avoid overwhelming the provider and potentially running into issues."
199+
)]
200+
pub batch_chunk_size: usize,
192201
}
193202

194203
impl Default for IndexingOptions {
195204
fn default() -> Self {
196205
Self {
197206
events_chunk_size: DEFAULT_EVENTS_CHUNK_SIZE,
198207
blocks_chunk_size: DEFAULT_BLOCKS_CHUNK_SIZE,
208+
batch_chunk_size: DEFAULT_BATCH_CHUNK_SIZE,
199209
pending: true,
200210
polling_interval: DEFAULT_POLLING_INTERVAL,
201211
max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS,

crates/indexer/src/engine.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use std::hash::Hash;
44
use std::sync::Arc;
55
use std::time::Duration;
66

7-
use anyhow::Result;
7+
use anyhow::{Context, Result};
88
use bitflags::bitflags;
99
use dojo_utils::provider as provider_utils;
1010
use dojo_world::contracts::world::WorldContractReader;
11+
use futures_util::future::try_join_all;
1112
use hashlink::LinkedHashMap;
1213
use starknet::core::types::requests::{
1314
GetBlockWithTxHashesRequest, GetEventsRequest, GetTransactionByHashRequest,
@@ -42,6 +43,7 @@ bitflags! {
4243
#[derive(Debug)]
4344
pub struct EngineConfig {
4445
pub polling_interval: Duration,
46+
pub batch_chunk_size: usize,
4547
pub blocks_chunk_size: u64,
4648
pub events_chunk_size: u64,
4749
pub max_concurrent_tasks: usize,
@@ -54,6 +56,7 @@ impl Default for EngineConfig {
5456
fn default() -> Self {
5557
Self {
5658
polling_interval: Duration::from_millis(500),
59+
batch_chunk_size: 1024,
5760
blocks_chunk_size: 10240,
5861
events_chunk_size: 1024,
5962
max_concurrent_tasks: 100,
@@ -349,7 +352,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
349352
}
350353
}
351354

352-
let transaction_results = self.provider.batch_requests(transaction_requests).await?;
355+
let transaction_results = self.chunked_batch_requests(&transaction_requests).await?;
356+
353357
for (block_number, result) in block_numbers.into_iter().zip(transaction_results) {
354358
match result {
355359
ProviderResponseData::GetTransactionByHash(transaction) => {
@@ -382,7 +386,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
382386

383387
// Execute timestamp requests in batch
384388
if !timestamp_requests.is_empty() {
385-
let timestamp_results = self.provider.batch_requests(timestamp_requests).await?;
389+
let timestamp_results = self.chunked_batch_requests(&timestamp_requests).await?;
386390

387391
// Process timestamp results
388392
for (block_number, result) in block_numbers.iter().zip(timestamp_results) {
@@ -426,7 +430,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
426430
.iter()
427431
.map(|(_, req)| req.clone())
428432
.collect();
429-
let batch_results = self.provider.batch_requests(batch_requests).await?;
433+
let batch_results = self.chunked_batch_requests(&batch_requests).await?;
430434

431435
// Process results and prepare next batch of requests if needed
432436
for ((contract_address, original_request), result) in
@@ -898,6 +902,34 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
898902

899903
Ok(())
900904
}
905+
906+
async fn chunked_batch_requests(
907+
&self,
908+
requests: &[ProviderRequestData],
909+
) -> Result<Vec<ProviderResponseData>> {
910+
if requests.is_empty() {
911+
return Ok(Vec::new());
912+
}
913+
914+
let mut futures = Vec::new();
915+
for chunk in requests.chunks(self.config.batch_chunk_size) {
916+
futures.push(async move { self.provider.batch_requests(chunk).await });
917+
}
918+
919+
let results_of_chunks: Vec<Vec<ProviderResponseData>> = try_join_all(futures)
920+
.await
921+
.with_context(|| {
922+
format!(
923+
"One or more batch requests failed during chunked execution. This could be due to the provider being overloaded. You can try reducing the batch chunk size. Total requests: {}. Batch chunk size: {}",
924+
requests.len(),
925+
self.config.batch_chunk_size
926+
)
927+
})?;
928+
929+
let flattened_results = results_of_chunks.into_iter().flatten().collect();
930+
931+
Ok(flattened_results)
932+
}
901933
}
902934

903935
// event_id format: block_number:transaction_hash:event_idx

crates/runner/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ impl Runner {
278278
max_concurrent_tasks: self.args.indexing.max_concurrent_tasks,
279279
blocks_chunk_size: self.args.indexing.blocks_chunk_size,
280280
events_chunk_size: self.args.indexing.events_chunk_size,
281+
batch_chunk_size: self.args.indexing.batch_chunk_size,
281282
polling_interval: Duration::from_millis(self.args.indexing.polling_interval),
282283
flags,
283284
event_processor_config: EventProcessorConfig {

0 commit comments

Comments
 (0)