Skip to content

Commit 4a54cdc

Browse files
authored
feat(torii-indexer): parallelize models & event messages (#2912)
* feat(torii-indexer): natural priority in parallelized tasks for model processing * btreemap of priorities * more verbose error handling & get rid of bit masking task id * store all raw events regardless of erc or world * fmt * parallelize event emitted * need to use event data
1 parent 9928558 commit 4a54cdc

File tree

5 files changed

+148
-73
lines changed

5 files changed

+148
-73
lines changed

crates/torii/indexer/src/engine.rs

Lines changed: 106 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66

77
use anyhow::Result;
88
use bitflags::bitflags;
9+
use cainome::cairo_serde::CairoSerde;
910
use dojo_utils::provider as provider_utils;
1011
use dojo_world::contracts::world::WorldContractReader;
1112
use futures_util::future::{join_all, try_join_all};
@@ -17,7 +18,7 @@ use starknet::core::types::{
1718
};
1819
use starknet::core::utils::get_selector_from_name;
1920
use starknet::providers::Provider;
20-
use starknet_crypto::Felt;
21+
use starknet_crypto::{poseidon_hash_many, Felt};
2122
use tokio::sync::broadcast::Sender;
2223
use tokio::sync::mpsc::Sender as BoundedSender;
2324
use tokio::sync::Semaphore;
@@ -207,6 +208,9 @@ pub struct ParallelizedEvent {
207208
pub event: Event,
208209
}
209210

211+
type TaskPriority = usize;
212+
type TaskId = u64;
213+
210214
#[allow(missing_debug_implementations)]
211215
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
212216
world: Arc<WorldContractReader<P>>,
@@ -216,7 +220,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
216220
config: EngineConfig,
217221
shutdown_tx: Sender<()>,
218222
block_tx: Option<BoundedSender<u64>>,
219-
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
223+
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>,
220224
contracts: Arc<HashMap<Felt, ContractType>>,
221225
}
222226

@@ -250,7 +254,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
250254
shutdown_tx,
251255
block_tx,
252256
contracts,
253-
tasks: HashMap::new(),
257+
tasks: BTreeMap::new(),
254258
}
255259
}
256260

@@ -596,44 +600,72 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
596600
}
597601

598602
async fn process_tasks(&mut self) -> Result<()> {
599-
// We use a semaphore to limit the number of concurrent tasks
600603
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));
601604

602-
// Run all tasks concurrently
603-
let mut handles = Vec::new();
604-
for (task_id, events) in self.tasks.drain() {
605-
let db = self.db.clone();
606-
let world = self.world.clone();
607-
let semaphore = semaphore.clone();
608-
let processors = self.processors.clone();
609-
610-
let event_processor_config = self.config.event_processor_config.clone();
611-
handles.push(tokio::spawn(async move {
612-
let _permit = semaphore.acquire().await?;
613-
let mut local_db = db.clone();
614-
for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events {
615-
let contract_processors = processors.get_event_processor(contract_type);
616-
if let Some(processors) = contract_processors.get(&event.keys[0]) {
617-
618-
let processor = processors.iter().find(|p| p.validate(&event)).expect("Must find atleast one processor for the event");
619-
620-
debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");
621-
622-
if let Err(e) = processor
623-
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config)
624-
.await
625-
{
626-
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");
605+
// Process each priority level sequentially
606+
for (priority, task_group) in std::mem::take(&mut self.tasks) {
607+
let mut handles = Vec::new();
608+
609+
// Process all tasks within this priority level concurrently
610+
for (task_id, events) in task_group {
611+
let db = self.db.clone();
612+
let world = self.world.clone();
613+
let semaphore = semaphore.clone();
614+
let processors = self.processors.clone();
615+
let event_processor_config = self.config.event_processor_config.clone();
616+
617+
handles.push(tokio::spawn(async move {
618+
let _permit = semaphore.acquire().await?;
619+
let mut local_db = db.clone();
620+
621+
// Process all events for this task sequentially
622+
for (contract_type, event) in events {
623+
let contract_processors = processors.get_event_processor(contract_type);
624+
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
625+
let processor = processors
626+
.iter()
627+
.find(|p| p.validate(&event.event))
628+
.expect("Must find at least one processor for the event");
629+
630+
debug!(
631+
target: LOG_TARGET,
632+
event_name = processor.event_key(),
633+
task_id = %task_id,
634+
priority = %priority,
635+
"Processing parallelized event."
636+
);
637+
638+
if let Err(e) = processor
639+
.process(
640+
&world,
641+
&mut local_db,
642+
event.block_number,
643+
event.block_timestamp,
644+
&event.event_id,
645+
&event.event,
646+
&event_processor_config,
647+
)
648+
.await
649+
{
650+
error!(
651+
target: LOG_TARGET,
652+
event_name = processor.event_key(),
653+
error = %e,
654+
task_id = %task_id,
655+
priority = %priority,
656+
"Processing parallelized event."
657+
);
658+
}
627659
}
628660
}
629-
}
630661

631-
Ok::<_, anyhow::Error>(())
632-
}));
633-
}
662+
Ok::<_, anyhow::Error>(())
663+
}));
664+
}
634665

635-
// Join all tasks
636-
try_join_all(handles).await?;
666+
// Wait for all tasks in this priority level to complete before moving to next priority
667+
try_join_all(handles).await?;
668+
}
637669

638670
Ok(())
639671
}
@@ -802,14 +834,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
802834
contract_type: ContractType,
803835
) -> Result<()> {
804836
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
805-
match contract_type {
806-
ContractType::WORLD => {
807-
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;
808-
}
809-
// ERC events needs to be processed inside there respective processor
810-
// we store transfer events for ERC contracts regardless of this flag
811-
ContractType::ERC20 | ContractType::ERC721 => {}
812-
}
837+
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;
813838
}
814839

815840
let event_key = event.keys[0];
@@ -856,30 +881,53 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
856881
.find(|p| p.validate(event))
857882
.expect("Must find atleast one processor for the event");
858883

859-
let task_identifier = match processor.event_key().as_str() {
884+
let (task_priority, task_identifier) = match processor.event_key().as_str() {
885+
"ModelRegistered" | "EventRegistered" => {
886+
let mut hasher = DefaultHasher::new();
887+
event.keys.iter().for_each(|k| k.hash(&mut hasher));
888+
let hash = hasher.finish();
889+
(0usize, hash) // Priority 0 (highest) for model/event registration
890+
}
860891
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
861892
let mut hasher = DefaultHasher::new();
862-
// model selector
863893
event.keys[1].hash(&mut hasher);
864-
// entity id
865894
event.keys[2].hash(&mut hasher);
866-
hasher.finish()
895+
let hash = hasher.finish();
896+
(2usize, hash) // Priority 2 (lower) for store operations
897+
}
898+
"EventEmitted" => {
899+
let mut hasher = DefaultHasher::new();
900+
901+
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
902+
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
903+
});
904+
905+
// selector
906+
event.keys[1].hash(&mut hasher);
907+
// entity id
908+
let entity_id = poseidon_hash_many(&keys);
909+
entity_id.hash(&mut hasher);
910+
911+
let hash = hasher.finish();
912+
(2usize, hash) // Priority 2 for event messages
867913
}
868-
_ => 0,
914+
_ => (0, 0), // No parallelization for other events
869915
};
870916

871-
// if we have a task identifier, we queue the event to be parallelized
872917
if task_identifier != 0 {
873-
self.tasks.entry(task_identifier).or_default().push((
874-
contract_type,
875-
ParallelizedEvent {
876-
event_id: event_id.to_string(),
877-
event: event.clone(),
878-
block_number,
879-
block_timestamp,
880-
},
881-
));
918+
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push(
919+
(
920+
contract_type,
921+
ParallelizedEvent {
922+
event_id: event_id.to_string(),
923+
event: event.clone(),
924+
block_number,
925+
block_timestamp,
926+
},
927+
),
928+
);
882929
} else {
930+
// Process non-parallelized events immediately
883931
// if we dont have a task identifier, we process the event immediately
884932
if processor.validate(event) {
885933
if let Err(e) = processor

crates/torii/indexer/src/processors/store_del_record.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ where
3535
block_timestamp: u64,
3636
event_id: &str,
3737
event: &Event,
38-
_config: &EventProcessorConfig,
38+
config: &EventProcessorConfig,
3939
) -> Result<(), Error> {
4040
// Torii version is coupled to the world version, so we can expect the event to be well
4141
// formed.
@@ -55,15 +55,21 @@ where
5555
// This can happen if only specific namespaces are indexed.
5656
let model = match db.model(event.selector).await {
5757
Ok(m) => m,
58-
Err(e) if e.to_string().contains("no rows") => {
58+
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
5959
debug!(
6060
target: LOG_TARGET,
6161
selector = %event.selector,
6262
"Model does not exist, skipping."
6363
);
6464
return Ok(());
6565
}
66-
Err(e) => return Err(e),
66+
Err(e) => {
67+
return Err(anyhow::anyhow!(
68+
"Failed to retrieve model with selector {:#x}: {}",
69+
event.selector,
70+
e
71+
));
72+
}
6773
};
6874

6975
info!(

crates/torii/indexer/src/processors/store_set_record.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
block_timestamp: u64,
3737
event_id: &str,
3838
event: &Event,
39-
_config: &EventProcessorConfig,
39+
config: &EventProcessorConfig,
4040
) -> Result<(), Error> {
4141
// Torii version is coupled to the world version, so we can expect the event to be well
4242
// formed.
@@ -56,15 +56,21 @@ where
5656
// This can happen if only specific namespaces are indexed.
5757
let model = match db.model(event.selector).await {
5858
Ok(m) => m,
59-
Err(e) if e.to_string().contains("no rows") => {
59+
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
6060
debug!(
6161
target: LOG_TARGET,
6262
selector = %event.selector,
6363
"Model does not exist, skipping."
6464
);
6565
return Ok(());
6666
}
67-
Err(e) => return Err(e),
67+
Err(e) => {
68+
return Err(anyhow::anyhow!(
69+
"Failed to retrieve model with selector {:#x}: {}",
70+
event.selector,
71+
e
72+
));
73+
}
6874
};
6975

7076
info!(

crates/torii/indexer/src/processors/store_update_member.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use starknet::core::types::Event;
77
use starknet::core::utils::get_selector_from_name;
88
use starknet::providers::Provider;
99
use torii_sqlite::Sql;
10-
use tracing::info;
10+
use tracing::{debug, info};
1111

1212
use super::{EventProcessor, EventProcessorConfig};
1313

@@ -37,7 +37,7 @@ where
3737
block_timestamp: u64,
3838
event_id: &str,
3939
event: &Event,
40-
_config: &EventProcessorConfig,
40+
config: &EventProcessorConfig,
4141
) -> Result<(), Error> {
4242
// Torii version is coupled to the world version, so we can expect the event to be well
4343
// formed.
@@ -61,11 +61,20 @@ where
6161
// This can happen if only specific namespaces are indexed.
6262
let model = match db.model(model_selector).await {
6363
Ok(m) => m,
64+
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
65+
debug!(
66+
target: LOG_TARGET,
67+
selector = %model_selector,
68+
"Model does not exist, skipping."
69+
);
70+
return Ok(());
71+
}
6472
Err(e) => {
65-
if e.to_string().contains("no rows") {
66-
return Ok(());
67-
}
68-
return Err(e);
73+
return Err(anyhow::anyhow!(
74+
"Failed to retrieve model with selector {:#x}: {}",
75+
event.selector,
76+
e
77+
));
6978
}
7079
};
7180

crates/torii/indexer/src/processors/store_update_record.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636
block_timestamp: u64,
3737
event_id: &str,
3838
event: &Event,
39-
_config: &EventProcessorConfig,
39+
config: &EventProcessorConfig,
4040
) -> Result<(), Error> {
4141
// Torii version is coupled to the world version, so we can expect the event to be well
4242
// formed.
@@ -59,15 +59,21 @@ where
5959
// This can happen if only specific namespaces are indexed.
6060
let model = match db.model(event.selector).await {
6161
Ok(m) => m,
62-
Err(e) if e.to_string().contains("no rows") => {
62+
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
6363
debug!(
6464
target: LOG_TARGET,
6565
selector = %event.selector,
6666
"Model does not exist, skipping."
6767
);
6868
return Ok(());
6969
}
70-
Err(e) => return Err(e),
70+
Err(e) => {
71+
return Err(anyhow::anyhow!(
72+
"Failed to retrieve model with selector {:#x}: {}",
73+
event.selector,
74+
e
75+
));
76+
}
7177
};
7278

7379
info!(

0 commit comments

Comments
 (0)