Skip to content

Commit ddc46f1

Browse files
fix race condition
Co-authored-by: Copilot <[email protected]>
1 parent ea8aebc commit ddc46f1

File tree

1 file changed

+20
-15
lines changed

1 file changed

+20
-15
lines changed

core/src/subgraph/trigger_processor.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -159,24 +159,29 @@ impl SubgraphTriggerProcessor {
159159

160160
/// Get or create subgraph state
161161
fn get_or_create_subgraph_state(&self, deployment: &DeploymentHash) -> SubgraphState {
162-
{
163-
let shards = self.subgraph_shards.read().unwrap();
164-
if let Some(state) = shards.get(deployment) {
165-
return state.clone();
166-
}
162+
// Atomically check, insert, and return the subgraph state under a write lock
163+
let mut shards = self.subgraph_shards.write().unwrap();
164+
if let Some(state) = shards.get(deployment) {
165+
return state.clone();
167166
}
168167

169-
// Need to create new state
170-
let shard_id = self.get_shard_for_deployment(deployment);
168+
// Assign new shard using DefaultHasher
169+
let mut hasher = DefaultHasher::new();
170+
deployment.hash(&mut hasher);
171+
let shard_id = (hasher.finish() as usize) % self.semaphores.len();
171172

172-
let shards = self.subgraph_shards.read().unwrap();
173-
shards
174-
.get(deployment)
175-
.cloned()
176-
.unwrap_or_else(|| SubgraphState {
177-
queue_depth: Arc::new(AtomicUsize::new(0)),
178-
shard_id,
179-
})
173+
// Track the assignment
174+
let state = SubgraphState {
175+
queue_depth: Arc::new(AtomicUsize::new(0)),
176+
shard_id,
177+
};
178+
shards.insert(deployment.clone(), state.clone());
179+
180+
self.shard_metrics[shard_id]
181+
.assigned_subgraphs
182+
.fetch_add(1, Ordering::Relaxed);
183+
184+
state
180185
}
181186

182187
/// Apply backpressure if queue is too deep

0 commit comments

Comments
 (0)