Skip to content

Commit b1084d7

Browse files
committed
basic implementation of a mechanism that adds services to a running group
1 parent e2b442c commit b1084d7

File tree

4 files changed

+586
-93
lines changed

4 files changed

+586
-93
lines changed

Sources/ServiceLifecycle/ServiceGroup.swift

Lines changed: 118 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import Logging
1616
import UnixSignals
17+
import AsyncAlgorithms
1718

1819
/// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services.
1920
public actor ServiceGroup: Sendable, Service {
@@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service {
2324
case initial(services: [ServiceGroupConfiguration.ServiceConfiguration])
2425
/// The state once ``ServiceGroup/run()`` has been called.
2526
case running(
26-
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation
27+
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation,
28+
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
2729
)
2830
/// The state once ``ServiceGroup/run()`` has finished.
2931
case finished
@@ -106,6 +108,37 @@ public actor ServiceGroup: Sendable, Service {
106108
self.maximumCancellationDuration = configuration._maximumCancellationDuration
107109
}
108110

111+
/// Adds a service to the group.
112+
///
113+
/// If the group is currently running, the added service will be started immediately.
114+
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
115+
/// - Parameters:
116+
/// - serviceConfiguration: The service configuration to add.
117+
public func addService(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async {
118+
switch self.state {
119+
case var .initial(services: services):
120+
self.state = .initial(services: [])
121+
services.append(serviceConfiguration)
122+
self.state = .initial(services: services)
123+
124+
case .running(_, let addedServiceChannel):
125+
await addedServiceChannel.send(serviceConfiguration)
126+
127+
case .finished:
128+
return
129+
}
130+
}
131+
132+
/// Adds a service to the group.
133+
///
134+
/// If the group is currently running, the added service will be started immediately.
135+
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
136+
/// - Parameters:
137+
/// - service: The service to add.
138+
public func addService(_ service: any Service) async {
139+
await self.addService(ServiceGroupConfiguration.ServiceConfiguration(service: service))
140+
}
141+
109142
/// Runs all the services by spinning up a child task per service.
110143
/// Furthermore, this method sets up the correct signal handlers
111144
/// for graceful shutdown.
@@ -128,16 +161,19 @@ public actor ServiceGroup: Sendable, Service {
128161
}
129162

130163
let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self)
164+
let addedServiceChannel = AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>()
131165

132166
self.state = .running(
133-
gracefulShutdownStreamContinuation: gracefulShutdownContinuation
167+
gracefulShutdownStreamContinuation: gracefulShutdownContinuation,
168+
addedServiceChannel: addedServiceChannel
134169
)
135170

136171
var potentialError: Error?
137172
do {
138173
try await self._run(
139174
services: &services,
140-
gracefulShutdownStream: gracefulShutdownStream
175+
gracefulShutdownStream: gracefulShutdownStream,
176+
addedServiceChannel: addedServiceChannel
141177
)
142178
} catch {
143179
potentialError = error
@@ -173,7 +209,7 @@ public actor ServiceGroup: Sendable, Service {
173209
self.state = .finished
174210
return
175211

176-
case .running(let gracefulShutdownStreamContinuation):
212+
case .running(let gracefulShutdownStreamContinuation, _):
177213
// We cannot transition to shuttingDown here since we are signalling over to the task
178214
// that runs `run`. This task is responsible for transitioning to shuttingDown since
179215
// there might be multiple signals racing to trigger it
@@ -189,7 +225,7 @@ public actor ServiceGroup: Sendable, Service {
189225
}
190226
}
191227

192-
private enum ChildTaskResult {
228+
fileprivate enum ChildTaskResult {
193229
case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int)
194230
case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error)
195231
case signalCaught(UnixSignal)
@@ -202,7 +238,8 @@ public actor ServiceGroup: Sendable, Service {
202238

203239
private func _run(
204240
services: inout [ServiceGroupConfiguration.ServiceConfiguration],
205-
gracefulShutdownStream: AsyncStream<Void>
241+
gracefulShutdownStream: AsyncStream<Void>,
242+
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
206243
) async throws {
207244
self.logger.debug(
208245
"Starting service lifecycle",
@@ -280,25 +317,11 @@ public actor ServiceGroup: Sendable, Service {
280317
let gracefulShutdownManager = GracefulShutdownManager()
281318
gracefulShutdownManagers.append(gracefulShutdownManager)
282319

283-
// This must be addTask and not addTaskUnlessCancelled
284-
// because we must run all the services for the below logic to work.
285-
group.addTask {
286-
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
287-
do {
288-
try await serviceConfiguration.service.run()
289-
return .serviceFinished(service: serviceConfiguration, index: index)
290-
} catch {
291-
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
292-
}
293-
}
294-
}
295-
}
296-
297-
group.addTask {
298-
// This child task is waiting forever until the group gets cancelled.
299-
let (stream, _) = AsyncStream.makeStream(of: Void.self)
300-
await stream.first { _ in true }
301-
return .cancellationCaught
320+
group.addServiceTask(
321+
serviceConfiguration,
322+
gracefulShutdownManager: gracefulShutdownManager,
323+
index: index
324+
)
302325
}
303326

304327
// We are storing the services in an optional array now. When a slot in the array is
@@ -310,6 +333,49 @@ public actor ServiceGroup: Sendable, Service {
310333
"We did not create a graceful shutdown manager per service"
311334
)
312335

336+
var taskGroupThatMustNotEscape = group
337+
group.addTask {
338+
// This is the task that listens to added services and starts them while the group is running
339+
340+
await withTaskCancellationHandler {
341+
// Channel will be finished in `shutdownGracefully`, we must not add services after graceful shutdown has started
342+
for await serviceConfiguration in addedServiceChannel {
343+
self.logger.debug(
344+
"Starting added service",
345+
metadata: [
346+
self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)"
347+
]
348+
)
349+
350+
let gracefulShutdownManager = GracefulShutdownManager()
351+
gracefulShutdownManagers.append(gracefulShutdownManager)
352+
services.append(serviceConfiguration)
353+
354+
precondition(
355+
services.count == gracefulShutdownManagers.count,
356+
"Mismatch between services and graceful shutdown managers"
357+
)
358+
359+
taskGroupThatMustNotEscape.addServiceTask(
360+
serviceConfiguration,
361+
gracefulShutdownManager: gracefulShutdownManager,
362+
index: services.count - 1
363+
)
364+
}
365+
} onCancel: {
366+
addedServiceChannel.finish()
367+
}
368+
369+
return .gracefulShutdownFinished
370+
}
371+
372+
group.addTask {
373+
// This child task is waiting forever until the group gets cancelled.
374+
let (stream, _) = AsyncStream.makeStream(of: Void.self)
375+
await stream.first { _ in true }
376+
return .cancellationCaught
377+
}
378+
313379
// We are going to wait for any of the services to finish or
314380
// the signal sequence to throw an error.
315381
while !group.isEmpty {
@@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service {
530596
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
531597
gracefulShutdownManagers: [GracefulShutdownManager]
532598
) async throws {
533-
guard case .running = self.state else {
599+
guard case let .running(_, addedServiceChannel) = self.state else {
534600
fatalError("Unexpected state")
535601
}
536602

603+
// Signal to stop adding new services (it is important that no new services are added after this point)
604+
addedServiceChannel.finish()
605+
537606
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *),
538607
let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration
539608
{
@@ -779,6 +848,29 @@ public actor ServiceGroup: Sendable, Service {
779848
}
780849
}
781850

851+
extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGroup.ChildTaskResult {
852+
mutating func addServiceTask(
853+
_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration,
854+
gracefulShutdownManager: GracefulShutdownManager,
855+
index: Int
856+
) {
857+
// This must be addTask and not addTaskUnlessCancelled
858+
// because we must run all the services for the shutdown logic to work.
859+
self.addTask {
860+
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
861+
do {
862+
try await serviceConfiguration.service.run()
863+
return .serviceFinished(service: serviceConfiguration, index: index)
864+
} catch {
865+
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
866+
}
867+
}
868+
}
869+
870+
}
871+
872+
}
873+
782874
// This should be removed once we support Swift 5.9+
783875
extension AsyncStream {
784876
fileprivate static func makeStream(
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import ServiceLifecycle
2+
3+
actor MockService: Service, CustomStringConvertible {
4+
enum Event {
5+
case run
6+
case runPing
7+
case runCancelled
8+
case shutdownGracefully
9+
}
10+
11+
let events: AsyncStream<Event>
12+
internal private(set) var hasRun: Bool = false
13+
14+
private let eventsContinuation: AsyncStream<Event>.Continuation
15+
16+
private var runContinuation: CheckedContinuation<Void, Error>?
17+
18+
nonisolated let description: String
19+
20+
private let pings: AsyncStream<Void>
21+
private nonisolated let pingContinuation: AsyncStream<Void>.Continuation
22+
23+
init(
24+
description: String
25+
) {
26+
var eventsContinuation: AsyncStream<Event>.Continuation!
27+
self.events = AsyncStream<Event> { eventsContinuation = $0 }
28+
self.eventsContinuation = eventsContinuation!
29+
30+
var pingContinuation: AsyncStream<Void>.Continuation!
31+
self.pings = AsyncStream<Void> { pingContinuation = $0 }
32+
self.pingContinuation = pingContinuation!
33+
34+
self.description = description
35+
}
36+
37+
func run() async throws {
38+
self.hasRun = true
39+
40+
try await withTaskCancellationHandler {
41+
try await withGracefulShutdownHandler {
42+
try await withThrowingTaskGroup(of: Void.self) { group in
43+
group.addTask {
44+
self.eventsContinuation.yield(.run)
45+
for await _ in self.pings {
46+
self.eventsContinuation.yield(.runPing)
47+
}
48+
}
49+
50+
try await withCheckedThrowingContinuation {
51+
self.runContinuation = $0
52+
}
53+
54+
group.cancelAll()
55+
}
56+
} onGracefulShutdown: {
57+
self.eventsContinuation.yield(.shutdownGracefully)
58+
}
59+
} onCancel: {
60+
self.eventsContinuation.yield(.runCancelled)
61+
}
62+
}
63+
64+
func resumeRunContinuation(with result: Result<Void, Error>) {
65+
self.runContinuation?.resume(with: result)
66+
}
67+
68+
nonisolated func sendPing() {
69+
self.pingContinuation.yield()
70+
}
71+
}

0 commit comments

Comments
 (0)