@@ -234,6 +234,7 @@ public actor ServiceGroup: Sendable, Service {
234
234
case gracefulShutdownFinished
235
235
case gracefulShutdownTimedOut
236
236
case cancellationCaught
237
+ case newServiceAdded( ServiceGroupConfiguration . ServiceConfiguration )
237
238
}
238
239
239
240
private func _run(
@@ -333,55 +334,48 @@ public actor ServiceGroup: Sendable, Service {
333
334
" We did not create a graceful shutdown manager per service "
334
335
)
335
336
336
- var _unownedTaskGroupHandledCarefully = group
337
- group. addTask {
338
- // This is the task that listens to added services and starts them while the group is running
339
-
340
- await withTaskCancellationHandler {
341
- // Channel will be finished in `shutdownGracefully`, we must not add services after graceful shutdown has started
342
- for await serviceConfiguration in addedServiceChannel {
343
- self . logger. debug (
344
- " Starting added service " ,
345
- metadata: [
346
- self . loggingConfiguration. keys. serviceKey: " \( serviceConfiguration. service) "
347
- ]
348
- )
349
-
350
- let gracefulShutdownManager = GracefulShutdownManager ( )
351
- gracefulShutdownManagers. append ( gracefulShutdownManager)
352
- services. append ( serviceConfiguration)
353
-
354
- precondition (
355
- services. count == gracefulShutdownManagers. count,
356
- " Mismatch between services and graceful shutdown managers "
357
- )
358
-
359
- _unownedTaskGroupHandledCarefully. addServiceTask (
360
- serviceConfiguration,
361
- gracefulShutdownManager: gracefulShutdownManager,
362
- index: services. count - 1
363
- )
364
- }
365
- } onCancel: {
366
- addedServiceChannel. finish ( )
367
- }
368
-
369
- return . gracefulShutdownFinished
370
- }
371
-
372
337
group. addTask {
373
338
// This child task is waiting forever until the group gets cancelled.
374
339
let ( stream, _) = AsyncStream . makeStream ( of: Void . self)
375
340
await stream. first { _ in true }
376
341
return . cancellationCaught
377
342
}
378
343
344
+ // Adds a task that listens to added services and funnels them into the task group
345
+ group. addAddedServiceListenerTask ( addedServiceChannel)
346
+
379
347
// We are going to wait for any of the services to finish or
380
348
// the signal sequence to throw an error.
381
349
while !group. isEmpty {
382
- let result : ChildTaskResult ? = try await group. next ( )
350
+ let nextEvent = try await group. next ( )
351
+
352
+ switch nextEvent {
353
+ case . newServiceAdded( let serviceConfiguration) :
354
+ self . logger. debug (
355
+ " Starting added service " ,
356
+ metadata: [
357
+ self . loggingConfiguration. keys. serviceKey: " \( serviceConfiguration. service) "
358
+ ]
359
+ )
360
+
361
+ let gracefulShutdownManager = GracefulShutdownManager ( )
362
+ gracefulShutdownManagers. append ( gracefulShutdownManager)
363
+ services. append ( serviceConfiguration)
364
+
365
+ precondition (
366
+ services. count == gracefulShutdownManagers. count,
367
+ " Mismatch between services and graceful shutdown managers "
368
+ )
369
+
370
+ group. addServiceTask (
371
+ serviceConfiguration,
372
+ gracefulShutdownManager: gracefulShutdownManager,
373
+ index: services. count - 1
374
+ )
375
+
376
+ // Each listener task can only handle a single added service, so we must add a new listener
377
+ group. addAddedServiceListenerTask ( addedServiceChannel)
383
378
384
- switch result {
385
379
case . serviceFinished( let service, let index) :
386
380
if group. isCancelled {
387
381
// The group is cancelled and we expect all services to finish
@@ -786,6 +780,11 @@ public actor ServiceGroup: Sendable, Service {
786
780
// We are going to continue the result loop since we have to wait for our service
787
781
// to finish.
788
782
break
783
+
784
+ case . newServiceAdded:
785
+ // TBD: How do we treat added services during graceful shutdown?
786
+ // Currently, we ignore them - but we make sure that `run` is never called
787
+ break
789
788
}
790
789
}
791
790
}
@@ -866,9 +865,23 @@ extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGr
866
865
}
867
866
}
868
867
}
869
-
870
868
}
871
869
870
+ mutating func addAddedServiceListenerTask( _ channel: AsyncChannel < ServiceGroupConfiguration . ServiceConfiguration > ) {
871
+ self . addTask {
872
+ return await withTaskCancellationHandler {
873
+ var iterator = channel. makeAsyncIterator ( )
874
+ if let addedService = await iterator. next ( ) {
875
+ return . newServiceAdded( addedService)
876
+ }
877
+
878
+ return . gracefulShutdownFinished
879
+ } onCancel: {
880
+ // Without this we can get stuck in `addService` if the group
881
+ channel. finish ( )
882
+ }
883
+ }
884
+ }
872
885
}
873
886
874
887
// This should be removed once we support Swift 5.9+
0 commit comments