Skip to content

Commit 8b58f68

Browse files
committed
feat(erc): use dedicated runtime for nft metadatas
1 parent 15a3a3d commit 8b58f68

File tree

7 files changed

+38
-12
lines changed

7 files changed

+38
-12
lines changed

crates/indexer/engine/src/engine.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use starknet::core::types::{Event, TransactionContent};
88
use starknet::macros::selector;
99
use starknet::providers::Provider;
1010
use starknet_crypto::Felt;
11+
use tokio::runtime::Runtime;
1112
use std::sync::LazyLock;
1213
use tokio::sync::broadcast::Sender;
1314
use tokio::sync::Semaphore;
@@ -64,6 +65,7 @@ pub struct Engine<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static>
6465
controllers: Option<Arc<ControllersSync>>,
6566
fetcher: Fetcher<P>,
6667
nft_metadata_semaphore: Arc<Semaphore>,
68+
nft_metadata_runtime: Arc<Runtime>,
6769
// The last fetch result & cursors, in case the processing fails, but not fetching.
6870
// Thus we can retry the processing with the same data instead of fetching again.
6971
cached_fetch: Option<(Box<FetchResult>, HashMap<Felt, ContractType>)>,
@@ -96,6 +98,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
9698
processors: Arc<Processors<P>>,
9799
config: EngineConfig,
98100
shutdown_tx: Sender<()>,
101+
nft_metadata_runtime: Arc<Runtime>
99102
) -> Self {
100103
Self::new_with_controllers(
101104
storage,
@@ -105,6 +108,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
105108
config,
106109
shutdown_tx,
107110
None,
111+
nft_metadata_runtime
108112
)
109113
}
110114

@@ -117,6 +121,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
117121
config: EngineConfig,
118122
shutdown_tx: Sender<()>,
119123
controllers: Option<Arc<ControllersSync>>,
124+
nft_metadata_runtime: Arc<Runtime>,
120125
) -> Self {
121126
let max_concurrent_tasks = config.max_concurrent_tasks;
122127
let event_processor_config = config.event_processor_config.clone();
@@ -138,11 +143,13 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
138143
processors,
139144
max_concurrent_tasks,
140145
event_processor_config,
146+
nft_metadata_runtime.clone()
141147
),
142148
contract_class_cache: Arc::new(ContractClassCache::new(provider.clone())),
143149
controllers,
144150
fetcher: Fetcher::new(provider.clone(), fetcher_config),
145151
nft_metadata_semaphore,
152+
nft_metadata_runtime,
146153
cached_fetch: None,
147154
}
148155
}
@@ -567,6 +574,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
567574
event: event.clone(),
568575
nft_metadata_semaphore: self.nft_metadata_semaphore.clone(),
569576
is_at_head,
577+
nft_metadata_runtime: self.nft_metadata_runtime.clone()
570578
};
571579
if self.processors.catch_all_event.validate(event) {
572580
if let Err(e) = self.processors.catch_all_event.process(&ctx).await {

crates/processors/src/erc.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use starknet::{
1111
providers::{Provider, ProviderError, ProviderRequestData, ProviderResponseData},
1212
};
1313
use starknet_crypto::Felt;
14-
use tokio::sync::Semaphore;
14+
use tokio::{runtime::Runtime, sync::Semaphore};
1515
use torii_cache::Cache;
1616
use torii_storage::Storage;
1717
use tracing::{debug, warn};
@@ -409,6 +409,7 @@ pub async fn try_register_nft_token_metadata<P: Provider + Sync>(
409409
cache: Arc<dyn Cache + Send + Sync>,
410410
storage: Arc<dyn Storage>,
411411
nft_metadata_semaphore: Arc<Semaphore>,
412+
runtime: Arc<Runtime>,
412413
) -> Result<(), Error> {
413414
let _lock = match cache.get_token_registration_lock(id.clone()).await {
414415
Some(lock) => lock,
@@ -419,18 +420,22 @@ pub async fn try_register_nft_token_metadata<P: Provider + Sync>(
419420
return Ok(());
420421
}
421422

422-
let _permit = nft_metadata_semaphore
423-
.acquire()
424-
.await
425-
.map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?;
426-
let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?;
423+
runtime.spawn(async move {
424+
let _permit = nft_metadata_semaphore
425+
.acquire()
426+
.await
427+
.map_err(|e| Error::TokenMetadataError(TokenMetadataError::AcquireError(e)))?;
428+
let metadata = fetch_token_metadata(contract_address, actual_token_id, provider).await?;
427429

428-
storage
429-
.register_nft_token(contract_address, actual_token_id, metadata)
430-
.await?;
430+
storage
431+
.register_nft_token(contract_address, actual_token_id, metadata)
432+
.await?;
431433

432-
cache.mark_token_registered(id).await;
434+
cache.mark_token_registered(id).await;
433435

436+
Result::<(), Error>::Ok(())
437+
});
438+
434439
// For ERC-1155, we need to track unique token count at contract level
435440
// This is called when a new token is being registered, so we increment by 1
436441
// We can't distinguish ERC-721 vs ERC-1155 here, but ERC-721 will also increment by 1

crates/processors/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::sync::Arc;
44
use async_trait::async_trait;
55
use starknet::core::types::{Event, Felt, TransactionContent};
66
use starknet::providers::Provider;
7+
use tokio::runtime::Runtime;
78
use tokio::sync::Semaphore;
89
use torii_cache::{Cache, ContractClassCache};
910
use torii_storage::Storage;
@@ -34,6 +35,7 @@ pub struct EventProcessorContext<P: Provider + Sync + Send + 'static> {
3435
pub event: Event,
3536
pub config: EventProcessorConfig,
3637
pub nft_metadata_semaphore: Arc<Semaphore>,
38+
pub nft_metadata_runtime: Arc<Runtime>,
3739
pub is_at_head: bool,
3840
}
3941

crates/processors/src/processors/erc4906_batch_metadata_update.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,12 @@ where
9393
while token_id <= to_token_id {
9494
let storage = ctx.storage.clone();
9595
let nft_metadata_semaphore = ctx.nft_metadata_semaphore.clone();
96+
let runtime = ctx.nft_metadata_runtime.clone();
9697
let provider = ctx.provider.clone();
9798
let token_address_clone = token_address;
9899
let current_token_id = token_id;
99100

100-
tasks.push(tokio::spawn(async move {
101+
tasks.push(runtime.spawn(async move {
101102
let _permit = nft_metadata_semaphore
102103
.acquire()
103104
.await

crates/processors/src/processors/erc4906_metadata_update.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ where
8787
let storage = ctx.storage.clone();
8888
let provider = ctx.provider.clone();
8989
let semaphore = ctx.nft_metadata_semaphore.clone();
90+
let runtime = ctx.nft_metadata_runtime.clone();
9091

91-
tokio::spawn(async move {
92+
runtime.spawn(async move {
9293
let _permit = match semaphore.acquire().await {
9394
Ok(permit) => permit,
9495
Err(e) => {

crates/processors/src/task_manager.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use hashlink::LinkedHashMap;
44
use starknet::core::types::Event;
55
use starknet::providers::Provider;
66
use starknet_crypto::Felt;
7+
use tokio::runtime::Runtime;
78
use tokio::sync::Semaphore;
89
use torii_cache::Cache;
910
use torii_proto::ContractType;
@@ -48,6 +49,7 @@ pub struct TaskManager<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'st
4849
processors: Arc<Processors<P>>,
4950
event_processor_config: EventProcessorConfig,
5051
nft_metadata_semaphore: Arc<Semaphore>,
52+
nft_metadata_runtime: Arc<Runtime>,
5153
}
5254

5355
impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<P> {
@@ -58,6 +60,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
5860
processors: Arc<Processors<P>>,
5961
max_concurrent_tasks: usize,
6062
event_processor_config: EventProcessorConfig,
63+
nft_metadata_runtime: Arc<Runtime>,
6164
) -> Self {
6265
Self {
6366
storage,
@@ -69,6 +72,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
6972
event_processor_config.max_metadata_tasks,
7073
)),
7174
event_processor_config,
75+
nft_metadata_runtime,
7276
}
7377
}
7478

@@ -144,6 +148,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
144148
let event_processor_config = self.event_processor_config.clone();
145149
let cache = self.cache.clone();
146150
let nft_metadata_semaphore = self.nft_metadata_semaphore.clone();
151+
let nft_metadata_runtime = self.nft_metadata_runtime.clone();
147152

148153
self.task_network
149154
.process_tasks(move |task_id, task_data| {
@@ -153,6 +158,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
153158
let event_processor_config = event_processor_config.clone();
154159
let cache = cache.clone();
155160
let nft_metadata_semaphore = nft_metadata_semaphore.clone();
161+
let nft_metadata_runtime = nft_metadata_runtime.clone();
156162

157163
async move {
158164
// Process all events for this task sequentially
@@ -209,6 +215,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> TaskManager<
209215
config: event_processor_config.clone(),
210216
nft_metadata_semaphore: nft_metadata_semaphore.clone(),
211217
is_at_head: *is_at_head,
218+
nft_metadata_runtime: nft_metadata_runtime.clone(),
212219
};
213220

214221
// Record processor timing and success/error metrics

crates/runner/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@ impl Runner {
634634
"Runtime allocation calculated"
635635
);
636636

637+
let nft_metadata_runtime = Arc::new(Runtime::new()?);
637638
let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new_with_controllers(
638639
storage.clone(),
639640
cache.clone(),
@@ -686,6 +687,7 @@ impl Runner {
686687
},
687688
shutdown_tx.clone(),
688689
controllers,
690+
nft_metadata_runtime,
689691
);
690692

691693
let shutdown_rx = shutdown_tx.subscribe();

0 commit comments

Comments
 (0)