diff --git a/packages/common/chirp-workflow/core/src/worker.rs b/packages/common/chirp-workflow/core/src/worker.rs index 8b2b2520d0..0779323840 100644 --- a/packages/common/chirp-workflow/core/src/worker.rs +++ b/packages/common/chirp-workflow/core/src/worker.rs @@ -83,6 +83,18 @@ impl Worker { loop { tokio::select! { _ = tick_interval.tick() => {}, + res = wake_sub.next() => { + if res.is_none() { + // Cancel background tasks + gc_handle.abort(); + metrics_handle.abort(); + + return Err(WorkflowError::SubscriptionUnsubscribed.into()); + } + + tick_interval.reset(); + }, + res = &mut gc_handle => { tracing::error!(?res, "metrics task unexpectedly stopped"); break; @@ -91,18 +103,17 @@ impl Worker { tracing::error!(?res, "metrics task unexpectedly stopped"); break; }, - res = wake_sub.next() => { - if res.is_none() { - return Err(WorkflowError::SubscriptionUnsubscribed.into()); - } - - tick_interval.reset(); - }, _ = ctrl_c() => break, _ = sigterm.recv() => break, } - self.tick(&shared_client, &config, &pools, &cache).await?; + if let Err(err) = self.tick(&shared_client, &config, &pools, &cache).await { + // Cancel background tasks + gc_handle.abort(); + metrics_handle.abort(); + + return Err(err); + } } self.shutdown(sigterm).await; @@ -181,6 +192,7 @@ impl Worker { tracing::info!("shutdown complete"); + // This will stop all background tasks because the tokio runtime will stop rivet_runtime::shutdown().await; }