Skip to content
22 changes: 15 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ tower = { git = "https://github.com/tower-rs/tower.git", features = ["full"] }
thiserror = { workspace = true }
cid = "0.11.1"
anyhow = "1.0"
num_cpus = "1.17.0"

[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod subgraph;

pub use crate::subgraph::{
SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphRunner,
SubgraphTriggerProcessor,
SubgraphTriggerProcessor, TriggerProcessorConfig,
};
35 changes: 32 additions & 3 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use graph_runtime_wasm::RuntimeHostBuilder;
use tokio::task;

use super::context::OffchainMonitor;
use super::SubgraphTriggerProcessor;
use crate::subgraph::runner::SubgraphRunnerError;

#[derive(Clone)]
Expand All @@ -41,6 +40,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
arweave_service: ArweaveService,
static_filters: bool,
env_vars: Arc<EnvVars>,
trigger_processor: Arc<super::trigger_processor::SubgraphTriggerProcessor>,

/// By design, there should be only one subgraph runner process per subgraph, but the current
/// implementation does not completely prevent multiple runners from being active at the same
Expand Down Expand Up @@ -87,7 +87,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
Box::new((*self.trigger_processor).clone()),
deployment_status_metric,
)
.await?;
Expand All @@ -102,7 +102,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
Box::new((*self.trigger_processor).clone()),
deployment_status_metric,
)
.await?;
Expand Down Expand Up @@ -184,6 +184,34 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
let logger_factory = logger_factory.with_parent(logger.clone());

// Configure trigger processor with backward compatibility
// Only enable sharding if explicitly configured with more than 1 shard
let enable_sharding = env_vars.subgraph_runtime_processing_shards > 1;

let processor_config = super::trigger_processor::TriggerProcessorConfig {
enable_sharding,
num_shards: env_vars.subgraph_runtime_processing_shards,
workers_per_shard: env_vars.subgraph_runtime_workers_per_shard,
max_queue_per_subgraph: env_vars.subgraph_max_queue_per_subgraph,
fairness_window_ms: 100, // 100ms fairness window
};

if enable_sharding {
info!(&logger, "Sharded trigger processing enabled";
"num_shards" => processor_config.num_shards,
"workers_per_shard" => processor_config.workers_per_shard,
"total_workers" => processor_config.num_shards * processor_config.workers_per_shard
);
} else {
info!(&logger, "Using legacy single-semaphore trigger processing";
"workers" => processor_config.workers_per_shard
);
}

let trigger_processor = Arc::new(super::trigger_processor::SubgraphTriggerProcessor::new(
processor_config,
));

SubgraphInstanceManager {
logger_factory,
subgraph_store,
Expand All @@ -195,6 +223,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
static_filters,
env_vars,
arweave_service,
trigger_processor,
subgraph_start_counter: Arc::new(AtomicU64::new(0)),
}
}
Expand Down
Loading
Loading