@@ -83,6 +83,18 @@ impl Worker {
83
83
loop {
84
84
tokio:: select! {
85
85
_ = tick_interval. tick( ) => { } ,
86
+ res = wake_sub. next( ) => {
87
+ if res. is_none( ) {
88
+ // Cancel background tasks
89
+ gc_handle. abort( ) ;
90
+ metrics_handle. abort( ) ;
91
+
92
+ return Err ( WorkflowError :: SubscriptionUnsubscribed . into( ) ) ;
93
+ }
94
+
95
+ tick_interval. reset( ) ;
96
+ } ,
97
+
86
98
res = & mut gc_handle => {
87
99
tracing:: error!( ?res, "metrics task unexpectedly stopped" ) ;
88
100
break ;
@@ -91,18 +103,17 @@ impl Worker {
91
103
tracing:: error!( ?res, "metrics task unexpectedly stopped" ) ;
92
104
break ;
93
105
} ,
94
- res = wake_sub. next( ) => {
95
- if res. is_none( ) {
96
- return Err ( WorkflowError :: SubscriptionUnsubscribed . into( ) ) ;
97
- }
98
-
99
- tick_interval. reset( ) ;
100
- } ,
101
106
_ = ctrl_c( ) => break ,
102
107
_ = sigterm. recv( ) => break ,
103
108
}
104
109
105
- self . tick ( & shared_client, & config, & pools, & cache) . await ?;
110
+ if let Err ( err) = self . tick ( & shared_client, & config, & pools, & cache) . await {
111
+ // Cancel background tasks
112
+ gc_handle. abort ( ) ;
113
+ metrics_handle. abort ( ) ;
114
+
115
+ return Err ( err) ;
116
+ }
106
117
}
107
118
108
119
self . shutdown ( sigterm) . await ;
@@ -181,6 +192,7 @@ impl Worker {
181
192
182
193
tracing:: info!( "shutdown complete" ) ;
183
194
195
+ // This will stop all background tasks because the tokio runtime will stop
184
196
rivet_runtime:: shutdown ( ) . await ;
185
197
}
186
198
0 commit comments