Skip to content

Commit d29d739

Browse files
committed
fix: clean up worker threads in the event of an error
1 parent 2d84554 commit d29d739

File tree

1 file changed

+20
-8
lines changed
  • packages/common/chirp-workflow/core/src

1 file changed

+20
-8
lines changed

packages/common/chirp-workflow/core/src/worker.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ impl Worker {
8383
loop {
8484
tokio::select! {
8585
_ = tick_interval.tick() => {},
86+
res = wake_sub.next() => {
87+
if res.is_none() {
88+
// Cancel background tasks
89+
gc_handle.abort();
90+
metrics_handle.abort();
91+
92+
return Err(WorkflowError::SubscriptionUnsubscribed.into());
93+
}
94+
95+
tick_interval.reset();
96+
},
97+
8698
res = &mut gc_handle => {
8799
tracing::error!(?res, "metrics task unexpectedly stopped");
88100
break;
@@ -91,18 +103,17 @@ impl Worker {
91103
tracing::error!(?res, "metrics task unexpectedly stopped");
92104
break;
93105
},
94-
res = wake_sub.next() => {
95-
if res.is_none() {
96-
return Err(WorkflowError::SubscriptionUnsubscribed.into());
97-
}
98-
99-
tick_interval.reset();
100-
},
101106
_ = ctrl_c() => break,
102107
_ = sigterm.recv() => break,
103108
}
104109

105-
self.tick(&shared_client, &config, &pools, &cache).await?;
110+
if let Err(err) = self.tick(&shared_client, &config, &pools, &cache).await {
111+
// Cancel background tasks
112+
gc_handle.abort();
113+
metrics_handle.abort();
114+
115+
return Err(err);
116+
}
106117
}
107118

108119
self.shutdown(sigterm).await;
@@ -181,6 +192,7 @@ impl Worker {
181192

182193
tracing::info!("shutdown complete");
183194

195+
// This will stop all background tasks because the tokio runtime will stop
184196
rivet_runtime::shutdown().await;
185197
}
186198

0 commit comments

Comments
 (0)