diff --git a/Cargo.lock b/Cargo.lock index ce8de699bbb..f2bd33af6de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1949,6 +1949,7 @@ dependencies = [ "num-bigint 0.2.6", "num-integer", "num-traits", + "num_cpus", "object_store", "parking_lot", "petgraph 0.8.2", @@ -2070,6 +2071,7 @@ dependencies = [ "graph-chain-near", "graph-chain-substreams", "graph-runtime-wasm", + "num_cpus", "serde_yaml", "thiserror 2.0.16", "tower 0.5.2 (git+https://github.com/tower-rs/tower.git)", @@ -2483,6 +2485,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -3580,11 +3588,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi 0.3.9", + "hermit-abi 0.5.2", "libc", ] @@ -6904,7 +6912,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -6915,7 +6923,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -6924,7 +6932,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -7000,7 +7008,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/core/Cargo.toml b/core/Cargo.toml index 0a5440b2b30..4cdec886abc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ tower = { git = "https://github.com/tower-rs/tower.git", features = ["full"] } thiserror = { workspace = true } cid = "0.11.1" anyhow = "1.0" +num_cpus = "1.17.0" [dev-dependencies] tower-test = { git = "https://github.com/tower-rs/tower.git" } diff --git a/core/src/lib.rs b/core/src/lib.rs index 448bb1041fd..71533bd1f67 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -4,5 +4,5 @@ mod subgraph; pub use crate::subgraph::{ SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphRunner, - SubgraphTriggerProcessor, + SubgraphTriggerProcessor, TriggerProcessorConfig, }; diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 20366a39bfa..606fb043c87 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -26,7 +26,6 @@ use graph_runtime_wasm::RuntimeHostBuilder; use tokio::task; use super::context::OffchainMonitor; -use super::SubgraphTriggerProcessor; use crate::subgraph::runner::SubgraphRunnerError; #[derive(Clone)] @@ -41,6 +40,7 @@ pub struct SubgraphInstanceManager { arweave_service: ArweaveService, static_filters: bool, env_vars: Arc, + trigger_processor: Arc, /// By design, there should be only one subgraph runner process per subgraph, but the current /// implementation does not completely prevent multiple runners from being active at the same @@ -87,7 +87,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< loc.clone(), manifest, stop_block, - Box::new(SubgraphTriggerProcessor {}), + Box::new((*self.trigger_processor).clone()), deployment_status_metric, ) .await?; @@ -102,7 +102,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< loc.clone(), manifest, stop_block, - Box::new(SubgraphTriggerProcessor {}), + Box::new((*self.trigger_processor).clone()), deployment_status_metric, ) .await?; @@ -184,6 +184,34 @@ impl SubgraphInstanceManager { let logger = logger_factory.component_logger("SubgraphInstanceManager", None); let logger_factory = logger_factory.with_parent(logger.clone()); + // Configure trigger processor with backward compatibility + // Only enable sharding if explicitly configured with more than 1 shard + let enable_sharding = env_vars.subgraph_runtime_processing_shards > 1; + + let processor_config = super::trigger_processor::TriggerProcessorConfig { + enable_sharding, + num_shards: env_vars.subgraph_runtime_processing_shards, + workers_per_shard: env_vars.subgraph_runtime_workers_per_shard, + max_queue_per_subgraph: env_vars.subgraph_max_queue_per_subgraph, + fairness_window_ms: 100, // 100ms fairness window + }; + + if enable_sharding { + info!(&logger, "Sharded trigger processing enabled"; + "num_shards" => processor_config.num_shards, + "workers_per_shard" => processor_config.workers_per_shard, + "total_workers" => processor_config.num_shards * processor_config.workers_per_shard + ); + } else { + info!(&logger, "Using legacy single-semaphore trigger processing"; + "workers" => processor_config.workers_per_shard + ); + } + + let trigger_processor = Arc::new(super::trigger_processor::SubgraphTriggerProcessor::new( + processor_config, + )); + SubgraphInstanceManager { logger_factory, subgraph_store, @@ -195,6 +223,7 @@ impl SubgraphInstanceManager { static_filters, env_vars, arweave_service, + trigger_processor, subgraph_start_counter: Arc::new(AtomicU64::new(0)), } } diff --git a/core/src/subgraph/trigger_processor.rs b/core/src/subgraph/trigger_processor.rs index c3123e87268..b9cf9e06792 100644 --- a/core/src/subgraph/trigger_processor.rs +++ b/core/src/subgraph/trigger_processor.rs @@ -5,15 +5,291 @@ use graph::components::store::SubgraphFork; use graph::components::subgraph::{MappingError, SharedProofOfIndexing}; use graph::components::trigger_processor::{HostedTrigger, RunnableTriggers}; use graph::data_source::TriggerData; -use graph::prelude::tokio::time::Instant; +use graph::prelude::tokio::sync::Semaphore; +use graph::prelude::tokio::time::{sleep, Duration, Instant}; use graph::prelude::{ - BlockState, RuntimeHost, RuntimeHostBuilder, SubgraphInstanceMetrics, TriggerProcessor, + BlockState, DeploymentHash, RuntimeHost, RuntimeHostBuilder, SubgraphInstanceMetrics, + TriggerProcessor, }; -use graph::slog::Logger; +use graph::slog::{debug, warn, Logger}; +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; -pub struct SubgraphTriggerProcessor {} +// Use the standard library's hasher for now +use std::collections::hash_map::DefaultHasher; + +/// Configuration for the trigger processor +#[derive(Clone, Debug)] +pub struct TriggerProcessorConfig { + /// Enable sharded processing (false = legacy single semaphore mode) + pub enable_sharding: bool, + /// Number of shards (pools) to create when sharding is enabled + pub num_shards: usize, + /// Number of worker threads per shard (or total when not sharding) + pub workers_per_shard: usize, + /// Maximum queue size per subgraph before applying backpressure + pub max_queue_per_subgraph: usize, + /// Time window for fair scheduling (ms) + pub fairness_window_ms: u64, +} + +impl Default for TriggerProcessorConfig { + fn default() -> Self { + Self { + // Default to legacy mode to not surprise existing users + enable_sharding: false, + // When sharding is disabled, this is ignored + num_shards: 1, + // Default to 32 workers (same as before) + workers_per_shard: 32, + // Prevent any single subgraph from queuing too much work + max_queue_per_subgraph: 100, + // Ensure each subgraph gets processing time within 100ms + fairness_window_ms: 100, + } + } +} + +/// Tracks per-shard load and metrics +#[derive(Debug)] +struct ShardMetrics { + /// Current number of active permits + active_permits: AtomicUsize, + /// Total triggers processed + total_processed: AtomicUsize, + /// Number of subgraphs assigned to this shard + assigned_subgraphs: AtomicUsize, +} + +impl ShardMetrics { + fn new() -> Self { + Self { + active_permits: AtomicUsize::new(0), + total_processed: AtomicUsize::new(0), + assigned_subgraphs: AtomicUsize::new(0), + } + } +} + +/// Tracks per-subgraph state for backpressure +#[derive(Debug, Clone)] +struct SubgraphState { + /// Current queue depth for this subgraph + queue_depth: Arc, + /// Shard assignment for this subgraph + shard_id: usize, +} + +/// Scalable trigger processor that optionally shards subgraphs across multiple pools +#[derive(Clone)] +pub struct SubgraphTriggerProcessor { + /// Semaphores for concurrency control + /// In legacy mode: single semaphore + /// In sharded mode: one semaphore per shard + semaphores: Vec>, + /// Track subgraph to shard assignments for consistent routing + /// Using RwLock instead of DashMap for simplicity + subgraph_shards: Arc>>, + /// Metrics per shard + shard_metrics: Arc>, + /// Configuration + config: TriggerProcessorConfig, +} + +impl SubgraphTriggerProcessor { + pub fn new(config: TriggerProcessorConfig) -> Self { + let effective_shards = if config.enable_sharding { + config.num_shards.max(1) + } else { + 1 // Legacy mode: single semaphore + }; + + let mut semaphores = Vec::with_capacity(effective_shards); + let mut shard_metrics = Vec::with_capacity(effective_shards); + + // Create semaphores and metrics + for _ in 0..effective_shards { + semaphores.push(Arc::new(Semaphore::new(config.workers_per_shard))); + shard_metrics.push(ShardMetrics::new()); + } + + Self { + semaphores, + subgraph_shards: Arc::new(RwLock::new(HashMap::new())), + shard_metrics: Arc::new(shard_metrics), + config, + } + } + + /// Calculate shard ID for a deployment using consistent hashing + fn calculate_shard_id(&self, deployment: &DeploymentHash) -> usize { + let mut hasher = DefaultHasher::new(); + deployment.hash(&mut hasher); + (hasher.finish() as usize) % self.semaphores.len() + } + + fn get_shard_for_deployment(&self, deployment: &DeploymentHash) -> usize { + // Check if already assigned + { + let shards = self.subgraph_shards.read().unwrap(); + if let Some(state) = shards.get(deployment) { + return state.shard_id; + } + } + + // Assign new shard using consistent hashing + let shard_id = self.calculate_shard_id(deployment); + + // Track the assignment + let state = SubgraphState { + queue_depth: Arc::new(AtomicUsize::new(0)), + shard_id, + }; + + { + let mut shards = self.subgraph_shards.write().unwrap(); + shards.insert(deployment.clone(), state); + } + + self.shard_metrics[shard_id] + .assigned_subgraphs + .fetch_add(1, Ordering::Relaxed); + + shard_id + } + + /// Get or create subgraph state + fn get_or_create_subgraph_state(&self, deployment: &DeploymentHash) -> SubgraphState { + // Atomically check, insert, and return the subgraph state under a write lock + let mut shards = self.subgraph_shards.write().unwrap(); + if let Some(state) = shards.get(deployment) { + return state.clone(); + } + + // Assign new shard using consistent hashing + let shard_id = self.calculate_shard_id(deployment); + + // Track the assignment + let state = SubgraphState { + queue_depth: Arc::new(AtomicUsize::new(0)), + shard_id, + }; + shards.insert(deployment.clone(), state.clone()); + + self.shard_metrics[shard_id] + .assigned_subgraphs + .fetch_add(1, Ordering::Relaxed); + + state + } + + /// Apply backpressure if queue is too deep + async fn apply_backpressure( + &self, + logger: &Logger, + deployment: &DeploymentHash, + queue_depth: usize, + ) { + if queue_depth > self.config.max_queue_per_subgraph { + warn!(logger, "Applying backpressure for overloaded subgraph"; + "deployment" => deployment.to_string(), + "queue_depth" => queue_depth, + "max_allowed" => self.config.max_queue_per_subgraph + ); + + // Exponential backoff based on queue depth + let delay_ms = + ((queue_depth - self.config.max_queue_per_subgraph) * 10).min(1000) as u64; // Cap at 1 second + sleep(Duration::from_millis(delay_ms)).await; + } + } + + /// Get comprehensive metrics for monitoring + pub async fn get_metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + + // Basic configuration metrics + metrics.insert( + "sharding_enabled".to_string(), + self.config.enable_sharding as usize, + ); + metrics.insert("total_shards".to_string(), self.semaphores.len()); + metrics.insert( + "workers_per_shard".to_string(), + self.config.workers_per_shard, + ); + metrics.insert( + "max_queue_per_subgraph".to_string(), + self.config.max_queue_per_subgraph, + ); + + // Per-shard metrics + for (i, (semaphore, shard_metric)) in self + .semaphores + .iter() + .zip(self.shard_metrics.iter()) + .enumerate() + { + let available_permits = semaphore.available_permits(); + let active_permits = shard_metric.active_permits.load(Ordering::Relaxed); + let total_processed = shard_metric.total_processed.load(Ordering::Relaxed); + let assigned_subgraphs = shard_metric.assigned_subgraphs.load(Ordering::Relaxed); + + metrics.insert(format!("shard_{}_permits_available", i), available_permits); + metrics.insert(format!("shard_{}_permits_active", i), active_permits); + metrics.insert(format!("shard_{}_total_processed", i), total_processed); + metrics.insert( + format!("shard_{}_assigned_subgraphs", i), + assigned_subgraphs, + ); + } + + // Overall statistics + let shards = self.subgraph_shards.read().unwrap(); + metrics.insert("total_assigned_subgraphs".to_string(), shards.len()); + + // Calculate load imbalance + if self.config.enable_sharding && self.semaphores.len() > 1 { + let loads: Vec = self + .shard_metrics + .iter() + .map(|m| m.assigned_subgraphs.load(Ordering::Relaxed)) + .collect(); + + let max_load = *loads.iter().max().unwrap_or(&0); + let min_load = *loads.iter().min().unwrap_or(&0); + let imbalance = if min_load > 0 { + ((max_load - min_load) * 100) / min_load + } else { + 0 + }; + + metrics.insert("shard_imbalance_percent".to_string(), imbalance); + } + + metrics + } + + /// Get detailed status for a specific deployment + pub fn get_deployment_status( + &self, + deployment: &DeploymentHash, + ) -> Option> { + let shards = self.subgraph_shards.read().unwrap(); + shards.get(deployment).map(|state| { + let mut status = HashMap::new(); + status.insert("shard_id".to_string(), state.shard_id); + status.insert( + "queue_depth".to_string(), + state.queue_depth.load(Ordering::Relaxed), + ); + status + }) + } +} #[async_trait] impl TriggerProcessor for SubgraphTriggerProcessor @@ -25,7 +301,7 @@ where &'a self, logger: &Logger, triggers: Vec>, - block: &Arc, + _block: &Arc, mut state: BlockState, proof_of_indexing: &SharedProofOfIndexing, causality_region: &str, @@ -33,58 +309,137 @@ where subgraph_metrics: &Arc, instrument: bool, ) -> Result { - let error_count = state.deterministic_errors.len(); - if triggers.is_empty() { return Ok(state); } + // Create a synthetic deployment hash from data source name for consistent sharding. + // This ensures triggers from the same data source/subgraph are always routed to + // the same shard, maintaining cache locality. + let data_source_name = triggers[0].host.data_source().name(); + // Use data source name directly for deployment hash + let deployment_hash = DeploymentHash::new(data_source_name).unwrap_or_else(|_| { + // Use a hash of the name as fallback to ensure valid deployment hash + let mut hasher = DefaultHasher::new(); + data_source_name.hash(&mut hasher); + let fallback = format!("deployment_{:x}", hasher.finish()); + DeploymentHash::new(&fallback).unwrap() + }); + + // Determine shard assignment + let shard_id = if self.config.enable_sharding { + self.get_shard_for_deployment(&deployment_hash) + } else { + 0 // Legacy mode: always use first (and only) semaphore + }; + + let semaphore = &self.semaphores[shard_id]; + + // Get subgraph state for backpressure + let subgraph_state = self.get_or_create_subgraph_state(&deployment_hash); + + // Check current queue depth before adding new triggers (avoid increment-then-check) + let current_queue_depth = subgraph_state.queue_depth.load(Ordering::Relaxed); + let projected_queue_depth = current_queue_depth + triggers.len(); + + // Apply backpressure if needed BEFORE incrementing queue depth + self.apply_backpressure(logger, &deployment_hash, projected_queue_depth) + .await; + + // Save trigger count before moving triggers into async block + let trigger_count = triggers.len(); + + // Only increment queue depth after backpressure check passes + subgraph_state + .queue_depth + .fetch_add(trigger_count, Ordering::Relaxed); + + debug!(logger, "Processing triggers"; + "deployment" => deployment_hash.to_string(), + "shard" => shard_id, + "trigger_count" => trigger_count, + "sharding_enabled" => self.config.enable_sharding + ); + proof_of_indexing.start_handler(causality_region); - for HostedTrigger { - host, - mapping_trigger, - } in triggers - { - let start = Instant::now(); - state = host - .process_mapping_trigger( - logger, - mapping_trigger, - state, - proof_of_indexing.cheap_clone(), - debug_fork, - instrument, - ) - .await?; - let elapsed = start.elapsed().as_secs_f64(); - subgraph_metrics.observe_trigger_processing_duration(elapsed); - - if let Some(ds) = host.data_source().as_offchain() { - ds.mark_processed_at(block.number()); - // Remove this offchain data source since it has just been processed. - state - .processed_data_sources - .push(ds.as_stored_dynamic_data_source()); + // Track processed triggers to ensure proper queue depth cleanup + let mut processed_count = 0; + + // Use a closure to ensure queue depth is properly decremented on any exit path + let process_result = async { + for HostedTrigger { + host, + mapping_trigger, + } in triggers + { + // Acquire permit and hold it during processing + let permit = semaphore.acquire().await.unwrap(); + + // Track active permits + self.shard_metrics[shard_id] + .active_permits + .fetch_add(1, Ordering::Relaxed); + + let start = Instant::now(); + + // Process with permit held + let result = host + .process_mapping_trigger( + logger, + mapping_trigger, + state, + proof_of_indexing.cheap_clone(), + debug_fork, + instrument, + ) + .await; + + // Permit is automatically dropped here, releasing it + drop(permit); + + // Update metrics + self.shard_metrics[shard_id] + .active_permits + .fetch_sub(1, Ordering::Relaxed); + self.shard_metrics[shard_id] + .total_processed + .fetch_add(1, Ordering::Relaxed); + + // Increment processed count for queue cleanup + processed_count += 1; + + // Handle result + state = result?; + + let elapsed = start.elapsed(); + subgraph_metrics.observe_trigger_processing_duration(elapsed.as_secs_f64()); + + if elapsed > Duration::from_secs(30) { + debug!(logger, "Trigger processing took a long time"; + "duration_ms" => elapsed.as_millis(), + "shard" => shard_id + ); + } } - } + Ok(state) + }; - if state.deterministic_errors.len() != error_count { - assert!(state.deterministic_errors.len() == error_count + 1); + // Execute processing and ensure queue depth cleanup regardless of outcome + let result = process_result.await; - // If a deterministic error has happened, write a new - // ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing. - proof_of_indexing.write_deterministic_error(logger, causality_region); + // Always decrement queue depth by the number of processed triggers + // This ensures cleanup even if processing failed partway through + if trigger_count > 0 { + subgraph_state + .queue_depth + .fetch_sub(trigger_count, Ordering::Relaxed); } - Ok(state) + result } } -/// A helper for taking triggers as `TriggerData` (usually from the block -/// stream) and turning them into `HostedTrigger`s that are ready to run. -/// -/// The output triggers will be run in the order in which they are returned. pub struct Decoder where C: Blockchain, diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 14f5b5b8de6..d61b4942341 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -40,6 +40,7 @@ graphql-parser = "0.4.0" humantime = "2.2.0" lazy_static = "1.5.0" num-bigint = { version = "=0.2.6", features = ["serde"] } +num_cpus = "1.17.0" num-integer = { version = "=0.1.46" } num-traits = "=0.2.19" rand.workspace = true diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 802b304db1f..44adb1effc9 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -268,6 +268,16 @@ pub struct EnvVars { /// builds and one second for debug builds to speed up tests. The value /// is in seconds. pub ipfs_request_timeout: Duration, + /// The number of processing shards for subgraph runtime processing. + /// The default value is 1 (single semaphore for backward compatibility). + /// Set to > 1 to enable sharded processing (recommended: num_cpus). + pub subgraph_runtime_processing_shards: usize, + /// The number of worker threads per shard for subgraph runtime processing. + /// The default value is 32. + pub subgraph_runtime_workers_per_shard: usize, + /// Maximum queue size per subgraph before applying backpressure. + /// The default value is 100. + pub subgraph_max_queue_per_subgraph: usize, } impl EnvVars { @@ -365,6 +375,13 @@ impl EnvVars { firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout, firehose_block_batch_size: inner.firehose_block_fetch_batch_size, ipfs_request_timeout, + subgraph_runtime_processing_shards: inner + .subgraph_runtime_processing_shards + .unwrap_or(1), // Default to 1 for backward compatibility + subgraph_runtime_workers_per_shard: inner + .subgraph_runtime_workers_per_shard + .unwrap_or(32), + subgraph_max_queue_per_subgraph: inner.subgraph_max_queue_per_subgraph.unwrap_or(100), }) } @@ -553,6 +570,12 @@ struct Inner { firehose_block_fetch_batch_size: usize, #[envconfig(from = "GRAPH_IPFS_REQUEST_TIMEOUT")] ipfs_request_timeout: Option, + #[envconfig(from = "GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS")] + subgraph_runtime_processing_shards: Option, + #[envconfig(from = "GRAPH_SUBGRAPH_RUNTIME_WORKERS_PER_SHARD")] + subgraph_runtime_workers_per_shard: Option, + #[envconfig(from = "GRAPH_SUBGRAPH_MAX_QUEUE_PER_SUBGRAPH")] + subgraph_max_queue_per_subgraph: Option, #[envconfig( from = "GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION", default = "false" diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 86b67918673..23cbafb1ce8 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -52,7 +52,7 @@ use graph_chain_ethereum::Chain; use graph_core::polling_monitor::{arweave_service, ipfs_service}; use graph_core::{ SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, - SubgraphRegistrar as IpfsSubgraphRegistrar, SubgraphTriggerProcessor, + SubgraphRegistrar as IpfsSubgraphRegistrar, SubgraphTriggerProcessor, TriggerProcessorConfig, }; use graph_node::manager::PanicSubscriptionManager; use graph_node::{config::Config, store_builder::StoreBuilder}; @@ -209,7 +209,14 @@ impl TestContext { RuntimeHostBuilder, > { let (logger, deployment, raw) = self.get_runner_context().await; - let tp: Box> = Box::new(SubgraphTriggerProcessor {}); + let tp: Box> = + Box::new(SubgraphTriggerProcessor::new(TriggerProcessorConfig { + enable_sharding: false, // Disabled for simple test setup + num_shards: 1, // Simple setup for tests + workers_per_shard: 1, // Single worker for tests + max_queue_per_subgraph: 10, + fairness_window_ms: 100, + })); let deployment_status_metric = self .instance_manager