Skip to content

Commit 917f37d

Browse files
authored
Add server connection state machine (#1760)
Motivation: The server pipeline needs a channel handler which manages the lifecycle of TCP connections. This includes: policing keepalive pings sent by the client, managing graceful shutdown, shutting down idle connections, and shutting down connections which have lived for too long. Much of this logic can be abstracted into a state machine. This change add that state machine. Modifications: - Add a server connection handler state machine which tracks the graceful shutdown state and polices keepalive pings sent by the client. - Replace a few READMEs with empty Swift files to suppress a few build warnings. Result: State machine for managing server connections.
1 parent bba613b commit 917f37d

File tree

7 files changed

+639
-4
lines changed

7 files changed

+639
-4
lines changed
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import NIOCore
18+
import NIOHTTP2
19+
20+
extension ServerConnectionHandler {
21+
/// Tracks the state of TCP connections at the server.
22+
///
23+
/// The state machine manages the state for the graceful shutdown procedure as well as policing
24+
/// client-side keep alive.
25+
struct StateMachine {
26+
/// Current state.
27+
private var state: State
28+
29+
/// Opaque data sent to the client in a PING frame after emitting the first GOAWAY frame
30+
/// as part of graceful shutdown.
31+
private let goAwayPingData: HTTP2PingData
32+
33+
/// Create a new state machine.
34+
///
35+
/// - Parameters:
36+
/// - allowKeepAliveWithoutCalls: Whether the client is permitted to send keep alive pings
37+
/// when there are no active calls.
38+
/// - minPingReceiveIntervalWithoutCalls: The minimum time interval required between keep
39+
/// alive pings when there are no active calls.
40+
/// - goAwayPingData: Opaque data sent to the client in a PING frame when the server
41+
/// initiates graceful shutdown.
42+
init(
43+
allowKeepAliveWithoutCalls: Bool,
44+
minPingReceiveIntervalWithoutCalls: TimeAmount,
45+
goAwayPingData: HTTP2PingData = HTTP2PingData(withInteger: .random(in: .min ... .max))
46+
) {
47+
let keepAlive = KeepAlive(
48+
allowWithoutCalls: allowKeepAliveWithoutCalls,
49+
minPingReceiveIntervalWithoutCalls: minPingReceiveIntervalWithoutCalls
50+
)
51+
52+
self.state = .active(State.Active(keepAlive: keepAlive))
53+
self.goAwayPingData = goAwayPingData
54+
}
55+
56+
/// Record that the stream with the given ID has been opened.
57+
mutating func streamOpened(_ id: HTTP2StreamID) {
58+
switch self.state {
59+
case .active(var state):
60+
state.lastStreamID = id
61+
let (inserted, _) = state.openStreams.insert(id)
62+
assert(inserted, "Can't open stream \(Int(id)), it's already open")
63+
self.state = .active(state)
64+
65+
case .closing(var state):
66+
state.lastStreamID = id
67+
let (inserted, _) = state.openStreams.insert(id)
68+
assert(inserted, "Can't open stream \(Int(id)), it's already open")
69+
self.state = .closing(state)
70+
71+
case .closed:
72+
()
73+
}
74+
}
75+
76+
enum OnStreamClosed: Equatable {
77+
/// Start the idle timer, after which the connection should be closed gracefully.
78+
case startIdleTimer
79+
/// Close the connection.
80+
case close
81+
/// Do nothing.
82+
case none
83+
}
84+
85+
/// Record that the stream with the given ID has been closed.
86+
mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed {
87+
let onStreamClosed: OnStreamClosed
88+
89+
switch self.state {
90+
case .active(var state):
91+
let removedID = state.openStreams.remove(id)
92+
assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
93+
onStreamClosed = state.openStreams.isEmpty ? .startIdleTimer : .none
94+
self.state = .active(state)
95+
96+
case .closing(var state):
97+
let removedID = state.openStreams.remove(id)
98+
assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open")
99+
// If the second GOAWAY hasn't been sent it isn't safe to close if there are no open
100+
// streams: the client may have opened a stream which the server doesn't know about yet.
101+
let canClose = state.sentSecondGoAway && state.openStreams.isEmpty
102+
onStreamClosed = canClose ? .close : .none
103+
self.state = .closing(state)
104+
105+
case .closed:
106+
onStreamClosed = .none
107+
}
108+
109+
return onStreamClosed
110+
}
111+
112+
enum OnPing: Equatable {
113+
/// Send a GOAWAY frame with the code "enhance your calm" and immediately close the connection.
114+
case enhanceYourCalmThenClose(HTTP2StreamID)
115+
/// Acknowledge the ping.
116+
case sendAck
117+
/// Ignore the ping.
118+
case none
119+
}
120+
121+
/// Received a ping with the given data.
122+
///
123+
/// - Parameters:
124+
/// - time: The time at which the ping was received.
125+
/// - data: The data sent with the ping.
126+
mutating func receivedPing(atTime time: NIODeadline, data: HTTP2PingData) -> OnPing {
127+
let onPing: OnPing
128+
129+
switch self.state {
130+
case .active(var state):
131+
let tooManyPings = state.keepAlive.receivedPing(
132+
atTime: time,
133+
hasOpenStreams: !state.openStreams.isEmpty
134+
)
135+
136+
if tooManyPings {
137+
onPing = .enhanceYourCalmThenClose(state.lastStreamID)
138+
self.state = .closed
139+
} else {
140+
onPing = .sendAck
141+
self.state = .active(state)
142+
}
143+
144+
case .closing(var state):
145+
let tooManyPings = state.keepAlive.receivedPing(
146+
atTime: time,
147+
hasOpenStreams: !state.openStreams.isEmpty
148+
)
149+
150+
if tooManyPings {
151+
onPing = .enhanceYourCalmThenClose(state.lastStreamID)
152+
self.state = .closed
153+
} else {
154+
onPing = .sendAck
155+
self.state = .closing(state)
156+
}
157+
158+
case .closed:
159+
onPing = .none
160+
}
161+
162+
return onPing
163+
}
164+
165+
enum OnPingAck: Equatable {
166+
/// Send a GOAWAY frame with no error and the given last stream ID, optionally closing the
167+
/// connection immediately afterwards.
168+
case sendGoAway(lastStreamID: HTTP2StreamID, close: Bool)
169+
/// Ignore the ack.
170+
case none
171+
}
172+
173+
/// Received a PING frame with the 'ack' flag set.
174+
mutating func receivedPingAck(data: HTTP2PingData) -> OnPingAck {
175+
let onPingAck: OnPingAck
176+
177+
switch self.state {
178+
case .closing(var state):
179+
// If only one GOAWAY has been sent and the data matches the data from the GOAWAY ping then
180+
// the server should send another GOAWAY ratcheting down the last stream ID. If no streams
181+
// are open then the server can close the connection immediately after, otherwise it must
182+
// wait until all streams are closed.
183+
if !state.sentSecondGoAway, data == self.goAwayPingData {
184+
state.sentSecondGoAway = true
185+
186+
if state.openStreams.isEmpty {
187+
self.state = .closed
188+
onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: true)
189+
} else {
190+
self.state = .closing(state)
191+
onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: false)
192+
}
193+
} else {
194+
onPingAck = .none
195+
}
196+
197+
case .active, .closed:
198+
onPingAck = .none
199+
}
200+
201+
return onPingAck
202+
}
203+
204+
enum OnStartGracefulShutdown: Equatable {
205+
/// Initiate graceful shutdown by sending a GOAWAY frame with the last stream ID set as the max
206+
/// stream ID and no error. Follow it immediately with a PING frame with the given data.
207+
case sendGoAwayAndPing(HTTP2PingData)
208+
/// Ignore the request to start graceful shutdown.
209+
case none
210+
}
211+
212+
/// Request that the connection begins graceful shutdown.
213+
mutating func startGracefulShutdown() -> OnStartGracefulShutdown {
214+
let onStartGracefulShutdown: OnStartGracefulShutdown
215+
216+
switch self.state {
217+
case .active(let state):
218+
self.state = .closing(State.Closing(from: state))
219+
onStartGracefulShutdown = .sendGoAwayAndPing(self.goAwayPingData)
220+
221+
case .closing, .closed:
222+
onStartGracefulShutdown = .none
223+
}
224+
225+
return onStartGracefulShutdown
226+
}
227+
228+
/// Reset the state of keep-alive policing.
229+
mutating func resetKeepAliveState() {
230+
switch self.state {
231+
case .active(var state):
232+
state.keepAlive.reset()
233+
self.state = .active(state)
234+
235+
case .closing(var state):
236+
state.keepAlive.reset()
237+
self.state = .closing(state)
238+
239+
case .closed:
240+
()
241+
}
242+
}
243+
244+
/// Marks the state as closed.
245+
mutating func markClosed() {
246+
self.state = .closed
247+
}
248+
}
249+
}
250+
251+
extension ServerConnectionHandler.StateMachine {
252+
fileprivate struct KeepAlive {
253+
/// Allow the client to send keep alive pings when there are no active calls.
254+
private let allowWithoutCalls: Bool
255+
256+
/// The minimum time interval which pings may be received at when there are no active calls.
257+
private let minPingReceiveIntervalWithoutCalls: TimeAmount
258+
259+
/// The maximum number of "bad" pings sent by the client the server tolerates before closing
260+
/// the connection.
261+
private let maxPingStrikes: Int
262+
263+
/// The number of "bad" pings sent by the client. This can be reset when the server sends
264+
/// DATA or HEADERS frames.
265+
///
266+
/// Ping strikes account for pings being occasionally being used for purposes other than keep
267+
/// alive (a low number of strikes is therefore expected and okay).
268+
private var pingStrikes: Int
269+
270+
/// The last time a valid ping happened. This may be in the distant past if there is no such
271+
/// time (for example the connection is new and there are no active calls).
272+
///
273+
/// Note: `distantPast` isn't used to indicate no previous valid ping as `NIODeadline` uses
274+
/// the monotonic clock on Linux which uses an undefined starting point and in some cases isn't
275+
/// always that distant.
276+
private var lastValidPingTime: NIODeadline?
277+
278+
init(allowWithoutCalls: Bool, minPingReceiveIntervalWithoutCalls: TimeAmount) {
279+
self.allowWithoutCalls = allowWithoutCalls
280+
self.minPingReceiveIntervalWithoutCalls = minPingReceiveIntervalWithoutCalls
281+
self.maxPingStrikes = 2
282+
self.pingStrikes = 0
283+
self.lastValidPingTime = nil
284+
}
285+
286+
/// Reset ping strikes and the time of the last valid ping.
287+
mutating func reset() {
288+
self.lastValidPingTime = nil
289+
self.pingStrikes = 0
290+
}
291+
292+
/// Returns whether the client has sent too many pings.
293+
mutating func receivedPing(atTime time: NIODeadline, hasOpenStreams: Bool) -> Bool {
294+
let interval: TimeAmount
295+
296+
if hasOpenStreams || self.allowWithoutCalls {
297+
interval = self.minPingReceiveIntervalWithoutCalls
298+
} else {
299+
// If there are no open streams and keep alive pings aren't allowed without calls then
300+
// use an interval of two hours.
301+
//
302+
// This comes from gRFC A8: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
303+
interval = .hours(2)
304+
}
305+
306+
// If there's no last ping time then the first is acceptable.
307+
let isAcceptablePing = self.lastValidPingTime.map { $0 + interval <= time } ?? true
308+
let tooManyPings: Bool
309+
310+
if isAcceptablePing {
311+
self.lastValidPingTime = time
312+
tooManyPings = false
313+
} else {
314+
self.pingStrikes += 1
315+
tooManyPings = self.pingStrikes > self.maxPingStrikes
316+
}
317+
318+
return tooManyPings
319+
}
320+
}
321+
}
322+
323+
extension ServerConnectionHandler.StateMachine {
324+
fileprivate enum State {
325+
/// The connection is active.
326+
struct Active {
327+
/// The number of open streams.
328+
var openStreams: Set<HTTP2StreamID>
329+
/// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
330+
var lastStreamID: HTTP2StreamID
331+
/// The state of keep alive.
332+
var keepAlive: KeepAlive
333+
334+
init(keepAlive: KeepAlive) {
335+
self.openStreams = []
336+
self.lastStreamID = .rootStream
337+
self.keepAlive = keepAlive
338+
}
339+
}
340+
341+
/// The connection is closing gracefully, an initial GOAWAY frame has been sent (with the
342+
/// last stream ID set to max).
343+
struct Closing {
344+
/// The number of open streams.
345+
var openStreams: Set<HTTP2StreamID>
346+
/// The ID of the most recently opened stream (zero indicates no streams have been opened yet).
347+
var lastStreamID: HTTP2StreamID
348+
/// The state of keep alive.
349+
var keepAlive: KeepAlive
350+
/// Whether the second GOAWAY frame has been sent with a lower stream ID.
351+
var sentSecondGoAway: Bool
352+
353+
init(from state: Active) {
354+
self.openStreams = state.openStreams
355+
self.lastStreamID = state.lastStreamID
356+
self.keepAlive = state.keepAlive
357+
self.sentSecondGoAway = false
358+
}
359+
}
360+
361+
case active(Active)
362+
case closing(Closing)
363+
case closed
364+
}
365+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Temporary namespace. Will be replaced with a channel handler.
18+
enum ServerConnectionHandler {
19+
}

Tests/GRPCHTTP2CoreTests/GRPCHTTP2CoreTests.swift renamed to Sources/GRPCHTTP2TransportNIOPosix/Empty.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,3 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
17-
// Add tests to this package.

Sources/GRPCHTTP2TransportNIOPosix/README.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)