@@ -23,23 +23,25 @@ private enum ShutdownError: Error {
23
23
/// `channelAdded` method in the same event loop tick as the `Channel` is actually created.
24
24
private final class ChannelCollector {
25
25
enum LifecycleState {
26
- case upAndRunning
27
- case shuttingDown
26
+ case upAndRunning(
27
+ openChannels: [ ObjectIdentifier : Channel ] ,
28
+ serverChannel: Channel
29
+ )
30
+ case shuttingDown(
31
+ openChannels: [ ObjectIdentifier : Channel ] ,
32
+ fullyShutdownPromise: EventLoopPromise < Void >
33
+ )
28
34
case shutdownCompleted
29
35
}
30
36
31
- private var openChannels : [ ObjectIdentifier : Channel ] = [ : ]
32
- private let serverChannel : Channel
33
- private var fullyShutdownPromise : EventLoopPromise < Void > ? = nil
34
- private var lifecycleState = LifecycleState . upAndRunning
37
+ private var lifecycleState : LifecycleState
35
38
36
- private var eventLoop : EventLoop {
37
- return self . serverChannel. eventLoop
38
- }
39
+ private let eventLoop : EventLoop
39
40
40
41
/// Initializes a `ChannelCollector` for `Channel`s accepted by `serverChannel`.
41
42
init ( serverChannel: Channel ) {
42
- self . serverChannel = serverChannel
43
+ self . eventLoop = serverChannel. eventLoop
44
+ self . lifecycleState = . upAndRunning( openChannels: [ : ] , serverChannel: serverChannel)
43
45
}
44
46
45
47
/// Add a channel to the `ChannelCollector`.
@@ -51,30 +53,64 @@ private final class ChannelCollector {
51
53
func channelAdded( _ channel: Channel ) throws {
52
54
self . eventLoop. assertInEventLoop ( )
53
55
54
- guard self . lifecycleState != . shutdownCompleted else {
56
+ switch self . lifecycleState {
57
+ case . upAndRunning( var openChannels, let serverChannel) :
58
+ openChannels [ ObjectIdentifier ( channel) ] = channel
59
+ self . lifecycleState = . upAndRunning( openChannels: openChannels, serverChannel: serverChannel)
60
+
61
+ case . shuttingDown( var openChannels, let fullyShutdownPromise) :
62
+ openChannels [ ObjectIdentifier ( channel) ] = channel
63
+ channel. eventLoop. execute {
64
+ channel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
65
+ }
66
+ self . lifecycleState = . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise)
67
+
68
+ case . shutdownCompleted:
55
69
channel. close ( promise: nil )
56
70
throw ShutdownError . alreadyShutdown
57
71
}
58
-
59
- self . openChannels [ ObjectIdentifier ( channel) ] = channel
60
72
}
61
73
62
74
private func shutdownCompleted( ) {
63
75
self . eventLoop. assertInEventLoop ( )
64
- assert ( self . lifecycleState == . shuttingDown)
65
76
66
- self . lifecycleState = . shutdownCompleted
67
- self . fullyShutdownPromise? . succeed ( ( ) )
77
+ switch self . lifecycleState {
78
+ case . upAndRunning:
79
+ preconditionFailure ( " This can never happen because we transition to shuttingDown first " )
80
+
81
+ case . shuttingDown( _, let fullyShutdownPromise) :
82
+ self . lifecycleState = . shutdownCompleted
83
+ fullyShutdownPromise. succeed ( ( ) )
84
+
85
+ case . shutdownCompleted:
86
+ preconditionFailure ( " We should only complete the shutdown once " )
87
+ }
68
88
}
69
89
70
90
private func channelRemoved0( _ channel: Channel ) {
71
91
self . eventLoop. assertInEventLoop ( )
72
- precondition ( self . openChannels. keys. contains ( ObjectIdentifier ( channel) ) ,
73
- " channel \( channel) not in ChannelCollector \( self . openChannels) " )
74
92
75
- self . openChannels. removeValue ( forKey: ObjectIdentifier ( channel) )
76
- if self . lifecycleState != . upAndRunning && self . openChannels. isEmpty {
77
- shutdownCompleted ( )
93
+ switch self . lifecycleState {
94
+ case . upAndRunning( var openChannels, let serverChannel) :
95
+ let removedChannel = openChannels. removeValue ( forKey: ObjectIdentifier ( channel) )
96
+
97
+ precondition ( removedChannel != nil , " channel \( channel) not in ChannelCollector \( openChannels) " )
98
+
99
+ self . lifecycleState = . upAndRunning( openChannels: openChannels, serverChannel: serverChannel)
100
+
101
+ case . shuttingDown( var openChannels, let fullyShutdownPromise) :
102
+ let removedChannel = openChannels. removeValue ( forKey: ObjectIdentifier ( channel) )
103
+
104
+ precondition ( removedChannel != nil , " channel \( channel) not in ChannelCollector \( openChannels) " )
105
+
106
+ if openChannels. isEmpty {
107
+ self . shutdownCompleted ( )
108
+ } else {
109
+ self . lifecycleState = . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise)
110
+ }
111
+
112
+ case . shutdownCompleted:
113
+ preconditionFailure ( " We should not have channels removed after transitioned to completed " )
78
114
}
79
115
}
80
116
@@ -96,44 +132,39 @@ private final class ChannelCollector {
96
132
97
133
private func initiateShutdown0( promise: EventLoopPromise < Void > ? ) {
98
134
self . eventLoop. assertInEventLoop ( )
99
- precondition ( self . lifecycleState == . upAndRunning)
100
135
101
- self . lifecycleState = . shuttingDown
136
+ switch self . lifecycleState {
137
+ case . upAndRunning( let openChannels, let serverChannel) :
138
+ let fullyShutdownPromise = promise ?? serverChannel. eventLoop. makePromise ( of: Void . self)
102
139
103
- if let promise = promise {
104
- if let alreadyExistingPromise = self . fullyShutdownPromise {
105
- alreadyExistingPromise. futureResult. cascade ( to: promise)
106
- } else {
107
- self . fullyShutdownPromise = promise
108
- }
109
- }
140
+ self . lifecycleState = . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise)
110
141
111
- self . serverChannel. close ( ) . cascadeFailure ( to: self . fullyShutdownPromise)
142
+ serverChannel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
143
+ serverChannel. close ( ) . cascadeFailure ( to: fullyShutdownPromise)
112
144
113
- for channel in self . openChannels. values {
114
- channel. eventLoop. execute {
115
- channel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
145
+ for channel in openChannels. values {
146
+ channel. eventLoop. execute {
147
+ channel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
148
+ }
116
149
}
117
- }
118
150
119
- if self . openChannels. isEmpty {
120
- shutdownCompleted ( )
151
+ if openChannels. isEmpty {
152
+ self . shutdownCompleted ( )
153
+ }
154
+
155
+ case . shuttingDown( _, let fullyShutdownPromise) :
156
+ fullyShutdownPromise. futureResult. cascade ( to: promise)
157
+
158
+ case . shutdownCompleted:
159
+ promise? . succeed ( ( ) )
121
160
}
122
161
}
123
162
124
163
/// Initiate the shutdown fulfilling `promise` when all the previously registered `Channel`s have been closed.
125
164
///
126
165
/// - parameters:
127
- /// - promise: The `EventLoopPromise` to fulfill when the shutdown of all previously registered `Channel`s has been completed.
166
+ /// - promise: The `EventLoopPromise` to fulfil when the shutdown of all previously registered `Channel`s has been completed.
128
167
func initiateShutdown( promise: EventLoopPromise < Void > ? ) {
129
- if self . serverChannel. eventLoop. inEventLoop {
130
- self . serverChannel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
131
- } else {
132
- self . eventLoop. execute {
133
- self . serverChannel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
134
- }
135
- }
136
-
137
168
if self . eventLoop. inEventLoop {
138
169
self . initiateShutdown0 ( promise: promise)
139
170
} else {
@@ -144,7 +175,6 @@ private final class ChannelCollector {
144
175
}
145
176
}
146
177
147
-
148
178
extension ChannelCollector : @unchecked Sendable { }
149
179
150
180
/// A `ChannelHandler` that adds all channels that it receives through the `ChannelPipeline` to a `ChannelCollector`.
@@ -173,7 +203,7 @@ private final class CollectAcceptedChannelsHandler: ChannelInboundHandler {
173
203
do {
174
204
try self . channelCollector. channelAdded ( channel)
175
205
let closeFuture = channel. closeFuture
176
- closeFuture. whenComplete { ( _: Result < ( ) , Error > ) in
206
+ closeFuture. whenComplete { ( _: Result < Void , Error > ) in
177
207
self . channelCollector. channelRemoved ( channel)
178
208
}
179
209
context. fireChannelRead ( data)
@@ -231,7 +261,7 @@ public final class ServerQuiescingHelper {
231
261
deinit {
232
262
self . channelCollectorPromise. fail ( UnusedQuiescingHelperError ( ) )
233
263
}
234
-
264
+
235
265
/// Create the `ChannelHandler` for the server `channel` to collect all accepted child `Channel`s.
236
266
///
237
267
/// - parameters:
@@ -262,6 +292,4 @@ public final class ServerQuiescingHelper {
262
292
}
263
293
}
264
294
265
- extension ServerQuiescingHelper : Sendable {
266
-
267
- }
295
+ extension ServerQuiescingHelper : Sendable { }
0 commit comments