Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ torii-sqlite = { path = "crates/sqlite/sqlite" }
torii-sqlite-types = { path = "crates/sqlite/types" }
torii-typed-data = { path = "crates/typed-data" }
torii-mcp = { path = "crates/mcp" }
torii-processors = { path = "crates/processors" }
torii-grpc-client = { path = "crates/grpc/client" }
torii-grpc-server = { path = "crates/grpc/server" }
torii-adigraphmap = { path = "crates/adigraphmap" }
Expand Down
1 change: 1 addition & 0 deletions crates/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-util.workspace = true
tracing.workspace = true
torii-sqlite.workspace = true
lazy_static.workspace = true
torii-processors.workspace = true

[dev-dependencies]
dojo-test-utils.workspace = true
Expand Down
131 changes: 5 additions & 126 deletions crates/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,141 +17,20 @@ use starknet::core::types::{
MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts,
ResultPageRequest, Transaction, TransactionReceipt, TransactionWithReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::{Provider, ProviderRequestData, ProviderResponseData};
use starknet_crypto::Felt;
use tokio::sync::broadcast::Sender;
use tokio::time::{sleep, Instant};
use torii_processors::processors::Processors;
use torii_processors::EventProcessorConfig;
use torii_sqlite::cache::ContractClassCache;
use torii_sqlite::types::{Contract, ContractType};
use torii_sqlite::{Cursors, Sql};
use tracing::{debug, error, info, trace, warn};

use crate::constants::LOG_TARGET;
use crate::processors::controller::ControllerProcessor;
use crate::processors::erc1155_transfer_batch::Erc1155TransferBatchProcessor;
use crate::processors::erc1155_transfer_single::Erc1155TransferSingleProcessor;
use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor;
use crate::processors::erc20_transfer::Erc20TransferProcessor;
use crate::processors::erc4906_batch_metadata_update::Erc4906BatchMetadataUpdateProcessor;
use crate::processors::erc4906_metadata_update::Erc4906MetadataUpdateProcessor;
use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor;
use crate::processors::erc721_transfer::Erc721TransferProcessor;
use crate::processors::event_message::EventMessageProcessor;
use crate::processors::metadata_update::MetadataUpdateProcessor;
use crate::processors::raw_event::RawEventProcessor;
use crate::processors::register_event::RegisterEventProcessor;
use crate::processors::register_model::RegisterModelProcessor;
use crate::processors::store_del_record::StoreDelRecordProcessor;
use crate::processors::store_set_record::StoreSetRecordProcessor;
use crate::processors::store_transaction::StoreTransactionProcessor;
use crate::processors::store_update_member::StoreUpdateMemberProcessor;
use crate::processors::store_update_record::StoreUpdateRecordProcessor;
use crate::processors::upgrade_event::UpgradeEventProcessor;
use crate::processors::upgrade_model::UpgradeModelProcessor;
use crate::processors::{
BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor,
};
use crate::task_manager::{self, ParallelizedEvent, TaskManager};

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;
use torii_processors::task_manager::{ParallelizedEvent, TaskManager};

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
pub event_processors: HashMap<ContractType, EventProcessorMap<P>>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Default for Processors<P> {
fn default() -> Self {
Self {
block: vec![],
transaction: vec![Box::new(StoreTransactionProcessor)],
// We shouldn't have a catch all for now since the world doesn't forward raw events
// anymore.
catch_all_event: Box::new(RawEventProcessor) as Box<dyn EventProcessor<P>>,
event_processors: Self::initialize_event_processors(),
}
}
}

impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Processors<P> {
pub fn initialize_event_processors() -> HashMap<ContractType, EventProcessorMap<P>> {
let mut event_processors_map = HashMap::<ContractType, EventProcessorMap<P>>::new();

let event_processors = vec![
(
ContractType::WORLD,
vec![
Box::new(RegisterModelProcessor) as Box<dyn EventProcessor<P>>,
Box::new(RegisterEventProcessor) as Box<dyn EventProcessor<P>>,
Box::new(UpgradeModelProcessor) as Box<dyn EventProcessor<P>>,
Box::new(UpgradeEventProcessor) as Box<dyn EventProcessor<P>>,
Box::new(StoreSetRecordProcessor),
Box::new(StoreDelRecordProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Box::new(MetadataUpdateProcessor),
Box::new(EventMessageProcessor),
],
),
(
ContractType::ERC20,
vec![
Box::new(Erc20TransferProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc20LegacyTransferProcessor) as Box<dyn EventProcessor<P>>,
],
),
(
ContractType::ERC721,
vec![
Box::new(Erc721TransferProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc721LegacyTransferProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc4906MetadataUpdateProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc4906BatchMetadataUpdateProcessor) as Box<dyn EventProcessor<P>>,
],
),
(
ContractType::ERC1155,
vec![
Box::new(Erc1155TransferBatchProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc1155TransferSingleProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc4906MetadataUpdateProcessor) as Box<dyn EventProcessor<P>>,
Box::new(Erc4906BatchMetadataUpdateProcessor) as Box<dyn EventProcessor<P>>,
],
),
(
ContractType::UDC,
vec![Box::new(ControllerProcessor) as Box<dyn EventProcessor<P>>],
),
];

for (contract_type, processors) in event_processors {
for processor in processors {
let key = get_selector_from_name(processor.event_key().as_str())
.expect("Event key is ASCII so this should never fail");
// event_processors_map.entry(contract_type).or_default().insert(key, processor);
event_processors_map
.entry(contract_type)
.or_default()
.entry(key)
.or_default()
.push(processor);
}
}

event_processors_map
}

pub fn get_event_processor(
&self,
contract_type: ContractType,
) -> &HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>> {
self.event_processors.get(&contract_type).unwrap()
}
}

bitflags! {
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -938,7 +817,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

let event_key = event.keys[0];

let processors = self.processors.get_event_processor(contract_type);
let processors = self.processors.get_event_processors(contract_type);
let Some(processors) = processors.get(&event_key) else {
// if we dont have a processor for this event, we try the catch all processor
if self.processors.catch_all_event.validate(event) {
Expand Down Expand Up @@ -984,7 +863,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
(processor.task_priority(), processor.task_identifier(event));

// if our event can be parallelized, we add it to the task manager
if task_identifier != task_manager::TASK_ID_SEQUENTIAL {
if task_identifier != torii_processors::task_manager::TASK_ID_SEQUENTIAL {
self.task_manager.add_parallelized_event(
task_priority,
task_identifier,
Expand Down
3 changes: 0 additions & 3 deletions crates/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@ mod constants;
mod test;

pub mod engine;
pub mod processors;
mod task_manager;

pub use engine::Engine;
3 changes: 2 additions & 1 deletion crates/indexer/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use torii_sqlite::types::{Contract, ContractType, Token};
use torii_sqlite::utils::u256_to_sql_string;
use torii_sqlite::Sql;

use crate::engine::{Engine, EngineConfig, Processors};
use crate::engine::{Engine, EngineConfig};
use torii_processors::processors::Processors;

pub async fn bootstrap_engine<P>(
world: WorldContractReader<P>,
Expand Down
24 changes: 24 additions & 0 deletions crates/processors/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "torii-processors"
edition.workspace = true
license.workspace = true
license-file.workspace = true
repository.workspace = true
version.workspace = true

[dependencies]
torii-sqlite.workspace = true
dojo-world.workspace = true
dojo-types.workspace = true
starknet.workspace = true
anyhow.workspace = true
async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
cainome.workspace = true
futures-util.workspace = true
starknet-crypto.workspace = true
lazy_static.workspace = true
base64.workspace = true
serde_json.workspace = true
cainome-cairo-serde.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,10 @@ use starknet::providers::Provider;
use torii_sqlite::cache::ContractClassCache;
use torii_sqlite::Sql;

use crate::task_manager::{TaskId, TaskPriority};
pub mod task_manager;
pub mod processors;

pub mod controller;
pub mod erc1155_transfer_batch;
pub mod erc1155_transfer_single;
pub mod erc20_legacy_transfer;
pub mod erc20_transfer;
pub mod erc4906_batch_metadata_update;
pub mod erc4906_metadata_update;
pub mod erc721_legacy_transfer;
pub mod erc721_transfer;
pub mod event_message;
pub mod metadata_update;
pub mod raw_event;
pub mod register_event;
pub mod register_model;
pub mod store_del_record;
pub mod store_set_record;
pub mod store_transaction;
pub mod store_update_member;
pub mod store_update_record;
pub mod upgrade_event;
pub mod upgrade_model;
use crate::task_manager::{TaskId, TaskPriority};

#[derive(Clone, Debug, Default)]
pub struct EventProcessorConfig {
Expand Down Expand Up @@ -105,3 +86,4 @@ pub trait TransactionProcessor<P: Provider + Sync + std::fmt::Debug>: Send + Syn
contract_class_cache: &ContractClassCache<P>,
) -> Result<(), Error>;
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use starknet_crypto::Felt;
use torii_sqlite::Sql;
use tracing::info;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::controller";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{self, TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc1155_transfer_batch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{self, TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc1155_transfer_single";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc20_legacy_transfer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc20_transfer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{self, TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc4906_metadata_update_batch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{self, TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc4906_metadata_update";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc721_legacy_transfer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::erc721_transfer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use starknet_crypto::poseidon_hash_many;
use torii_sqlite::Sql;
use tracing::info;

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::event_message";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use torii_sqlite::utils::fetch_content_from_ipfs;
use torii_sqlite::Sql;
use tracing::{error, info};

use super::{EventProcessor, EventProcessorConfig};
use crate::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii::indexer::processors::metadata_update";
Expand Down
Loading
Loading