Skip to content

Commit 36e1e99

Browse files
glbrnttMrMage
authored andcommitted
Add automatic reconnect to the client connection. (#489)
* Add automatic reconnect to the client connection. Motivation: Clients could lose their connections to the server for a number of reasons, we offered no means to automatically reconnect. Modification: - `ClientConnection` now stores a future Channel and future multiplexer, as opposed to future `ClientConnection`s holding a channel and multiplexer. - This allows us to replace the future channel and future multiplexer when the client is closed (but not explicitly via `close()`). - The state can be monitored via a delegate or by registering callbacks on the next transition to a given state. - Reconnection uses the same backoff logic used for the initial connection creation. Results: Clients can automatically reconnect when their connection goes away. * Fixes
1 parent 959b51a commit 36e1e99

15 files changed

+674
-198
lines changed

Sources/Examples/Echo/main.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,7 @@ func makeEchoClient(address: String, port: Int, ssl: Bool) -> Echo_EchoServiceCl
8282
eventLoopGroup: eventLoopGroup,
8383
tlsConfiguration: tlsConfiguration)
8484

85-
return try ClientConnection.start(configuration)
86-
.map { Echo_EchoServiceClient(connection: $0) }
87-
.wait()
85+
return Echo_EchoServiceClient(connection: ClientConnection(configuration: configuration))
8886
} catch {
8987
print("Unable to create an EchoClient: \(error)")
9088
return nil

Sources/GRPC/ClientCalls/BaseClientCall.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,12 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
108108
/// Creates and configures an HTTP/2 stream channel. The `self.subchannel` future will hold the
109109
/// stream channel once it has been created.
110110
private func createStreamChannel() {
111-
self.connection.channel.eventLoop.execute {
112-
self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
111+
self.connection.multiplexer.whenFailure { error in
112+
self.streamPromise.fail(error)
113+
}
114+
115+
self.connection.multiplexer.whenSuccess { multiplexer in
116+
multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
113117
subchannel.pipeline.addHandlers(
114118
HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.configuration.httpProtocol),
115119
HTTP1ToRawGRPCClientCodec(),

Sources/GRPC/ClientConnection.swift

Lines changed: 168 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -53,139 +53,209 @@ import NIOTLS
5353
/// delegate associated with this connection (see `DelegatingErrorHandler`).
5454
///
5555
/// See `BaseClientCall` for a description of the remainder of the client pipeline.
56-
open class ClientConnection {
57-
/// Makes and configures a `ClientBootstrap` using the provided configuration.
58-
///
59-
/// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
60-
/// handlers detailed in the documentation for `ClientConnection`.
61-
///
62-
/// - Parameter configuration: The configuration to prepare the bootstrap with.
63-
public class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
64-
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
65-
// Enable SO_REUSEADDR and TCP_NODELAY.
66-
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
67-
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
68-
.channelInitializer { channel in
69-
let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
70-
channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
71-
}
56+
public class ClientConnection {
57+
/// The configuration this connection was created using.
58+
internal let configuration: ClientConnection.Configuration
7259

73-
return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
74-
channel.configureHTTP2Pipeline(mode: .client)
75-
}.flatMap { _ in
76-
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
77-
return channel.pipeline.addHandler(errorHandler)
78-
}
79-
}
60+
/// The channel which will handle gRPC calls.
61+
internal var channel: EventLoopFuture<Channel>
8062

81-
return bootstrap
82-
}
63+
/// HTTP multiplexer from the `channel` handling gRPC calls.
64+
internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
8365

84-
/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
85-
///
86-
/// - Parameter channel: The channel to verify successful TLS setup on.
87-
public class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
88-
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
89-
$0.verification
66+
/// A monitor for the connectivity state.
67+
public let connectivity: ConnectivityStateMonitor
68+
69+
/// Creates a new connection from the given configuration.
70+
public init(configuration: ClientConnection.Configuration) {
71+
let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
72+
let channel = ClientConnection.makeChannel(
73+
configuration: configuration,
74+
connectivityMonitor: monitor
75+
)
76+
77+
self.channel = channel
78+
self.multiplexer = channel.flatMap {
79+
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
80+
}
81+
self.connectivity = monitor
82+
self.configuration = configuration
83+
84+
self.channel.whenSuccess { _ in
85+
self.connectivity.state = .ready
9086
}
87+
self.replaceChannelAndMultiplexerOnClose(channel: channel)
9188
}
9289

93-
/// Makes a `ClientConnection` from the given channel and configuration.
94-
///
95-
/// - Parameter channel: The channel to use for the connection.
96-
/// - Parameter configuration: The configuration used to create the channel.
97-
public class func makeClientConnection(
98-
channel: Channel,
99-
configuration: Configuration
100-
) -> EventLoopFuture<ClientConnection> {
101-
return channel.pipeline.handler(type: HTTP2StreamMultiplexer.self).map { multiplexer in
102-
ClientConnection(channel: channel, multiplexer: multiplexer, configuration: configuration)
90+
/// Registers a callback on the `closeFuture` of the given channel to replace this class's
91+
/// channel and multiplexer.
92+
private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
93+
channel.always { result in
94+
// If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
95+
if case .failure = result {
96+
self.connectivity.state = .shutdown
97+
}
98+
}.flatMap {
99+
$0.closeFuture
100+
}.whenComplete { _ in
101+
// `.shutdown` is terminal so don't attempt a reconnection.
102+
guard self.connectivity.state != .shutdown else {
103+
return
104+
}
105+
106+
let newChannel = ClientConnection.makeChannel(
107+
configuration: self.configuration,
108+
connectivityMonitor: self.connectivity
109+
)
110+
111+
self.channel = newChannel
112+
self.multiplexer = newChannel.flatMap {
113+
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
114+
}
115+
116+
// Change the state if the connection was successful.
117+
newChannel.whenSuccess { _ in
118+
self.connectivity.state = .ready
119+
}
120+
self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
103121
}
104122
}
105123

106-
/// Starts a client connection using the given configuration.
107-
///
108-
/// This involves: creating a `ClientBootstrap`, connecting to a target, verifying that the TLS
109-
/// handshake was successful (if TLS was configured) and creating the `ClientConnection`.
110-
/// See the individual functions for more information:
111-
/// - `makeBootstrap(configuration:)`,
112-
/// - `verifyTLS(channel:)`, and
113-
/// - `makeClientConnection(channel:configuration:)`.
114-
///
115-
/// - Parameter configuration: The configuration to start the connection with.
116-
public class func start(_ configuration: Configuration) -> EventLoopFuture<ClientConnection> {
117-
return start(configuration, backoffIterator: configuration.connectionBackoff?.makeIterator())
124+
/// The `EventLoop` this connection is using.
125+
public var eventLoop: EventLoop {
126+
return self.channel.eventLoop
118127
}
119128

120-
/// Starts a client connection using the given configuration and backoff.
129+
/// Closes the connection to the server.
130+
public func close() -> EventLoopFuture<Void> {
131+
if self.connectivity.state == .shutdown {
132+
// We're already shutdown or in the process of shutting down.
133+
return channel.flatMap { $0.closeFuture }
134+
} else {
135+
self.connectivity.state = .shutdown
136+
return channel.flatMap { $0.close() }
137+
}
138+
}
139+
}
140+
141+
extension ClientConnection {
142+
/// Creates a `Channel` using the given configuration.
121143
///
122-
/// In addition to the steps taken in `start(configuration:)`, we _may_ additionally set a
123-
/// connection timeout and schedule a retry attempt (should the connection fail) if a
144+
/// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
145+
/// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
146+
/// timeout and schedule a retry attempt (should the connection fail) if a
124147
/// `ConnectionBackoff.Iterator` is provided.
125148
///
149+
/// See the individual functions for more information:
150+
/// - `makeBootstrap(configuration:)`, and
151+
/// - `verifyTLS(channel:)`.
152+
///
126153
/// - Parameter configuration: The configuration to start the connection with.
127-
/// - Parameter backoffIterator: A `ConnectionBackoff` iterator which generates connection
128-
/// timeouts and backoffs to use when attempting to retry the connection.
129-
internal class func start(
130-
_ configuration: Configuration,
154+
/// - Parameter connectivityMonitor: A connectivity state monitor.
155+
/// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
156+
/// connection timeouts and backoff to use when attempting to create a connection.
157+
private class func makeChannel(
158+
configuration: ClientConnection.Configuration,
159+
connectivityMonitor: ConnectivityStateMonitor,
131160
backoffIterator: ConnectionBackoff.Iterator?
132-
) -> EventLoopFuture<ClientConnection> {
161+
) -> EventLoopFuture<Channel> {
162+
connectivityMonitor.state = .connecting
133163
let timeoutAndBackoff = backoffIterator?.next()
164+
var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)
134165

135-
var bootstrap = makeBootstrap(configuration: configuration)
136166
// Set a timeout, if we have one.
137167
if let timeout = timeoutAndBackoff?.timeout {
138168
bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
139169
}
140170

141-
let connection = bootstrap.connect(to: configuration.target)
142-
.flatMap { channel -> EventLoopFuture<ClientConnection> in
143-
let tlsVerified: EventLoopFuture<Void>?
144-
if configuration.tlsConfiguration != nil {
145-
tlsVerified = verifyTLS(channel: channel)
146-
} else {
147-
tlsVerified = nil
148-
}
149-
150-
return (tlsVerified ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
151-
makeClientConnection(channel: channel, configuration: configuration)
152-
}
171+
let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
172+
if configuration.tlsConfiguration != nil {
173+
return ClientConnection.verifyTLS(channel: channel).map { channel }
174+
} else {
175+
return channel.eventLoop.makeSucceededFuture(channel)
153176
}
177+
}.always { result in
178+
switch result {
179+
case .success:
180+
// Update the state once the channel has been assigned, when it may be used for making
181+
// RPCs.
182+
break
183+
184+
case .failure:
185+
// We might try again in a moment.
186+
connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
187+
}
188+
}
154189

155190
guard let backoff = timeoutAndBackoff?.backoff else {
156-
return connection
191+
return channel
157192
}
158193

159194
// If we're in error then schedule our next attempt.
160-
return connection.flatMapError { error in
195+
return channel.flatMapError { error in
161196
// The `futureResult` of the scheduled task is of type
162197
// `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
163198
// remove a level of indirection.
164-
return connection.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
165-
return start(configuration, backoffIterator: backoffIterator)
199+
return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
200+
return makeChannel(
201+
configuration: configuration,
202+
connectivityMonitor: connectivityMonitor,
203+
backoffIterator: backoffIterator
204+
)
166205
}.futureResult.flatMap { nextConnection in
167206
return nextConnection
168207
}
169208
}
170209
}
171210

172-
public let channel: Channel
173-
public let multiplexer: HTTP2StreamMultiplexer
174-
public let configuration: Configuration
175-
176-
init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, configuration: Configuration) {
177-
self.channel = channel
178-
self.multiplexer = multiplexer
179-
self.configuration = configuration
211+
/// Creates a `Channel` using the given configuration amd state connectivity monitor.
212+
///
213+
/// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
214+
private class func makeChannel(
215+
configuration: ClientConnection.Configuration,
216+
connectivityMonitor: ConnectivityStateMonitor
217+
) -> EventLoopFuture<Channel> {
218+
return makeChannel(
219+
configuration: configuration,
220+
connectivityMonitor: connectivityMonitor,
221+
backoffIterator: configuration.connectionBackoff?.makeIterator()
222+
)
180223
}
181224

182-
/// Fired when the client shuts down.
183-
public var onClose: EventLoopFuture<Void> {
184-
return channel.closeFuture
225+
/// Makes and configures a `ClientBootstrap` using the provided configuration.
226+
///
227+
/// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
228+
/// handlers detailed in the documentation for `ClientConnection`.
229+
///
230+
/// - Parameter configuration: The configuration to prepare the bootstrap with.
231+
private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
232+
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
233+
// Enable SO_REUSEADDR and TCP_NODELAY.
234+
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
235+
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
236+
.channelInitializer { channel in
237+
let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
238+
channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
239+
}
240+
241+
return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
242+
channel.configureHTTP2Pipeline(mode: .client)
243+
}.flatMap { _ in
244+
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
245+
return channel.pipeline.addHandler(errorHandler)
246+
}
247+
}
248+
249+
return bootstrap
185250
}
186251

187-
public func close() -> EventLoopFuture<Void> {
188-
return channel.close(mode: .all)
252+
/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
253+
///
254+
/// - Parameter channel: The channel to verify successful TLS setup on.
255+
private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
256+
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
257+
$0.verification
258+
}
189259
}
190260
}
191261

@@ -222,6 +292,9 @@ extension ClientConnection {
222292
/// cycle.
223293
public var errorDelegate: ClientErrorDelegate?
224294

295+
/// A delegate which is called when the connectivity state is changed.
296+
public var connectivityStateDelegate: ConnectivityStateDelegate?
297+
225298
/// TLS configuration for this connection. `nil` if TLS is not desired.
226299
public var tlsConfiguration: TLSConfiguration?
227300

@@ -240,19 +313,22 @@ extension ClientConnection {
240313
/// - Parameter eventLoopGroup: The event loop group to run the connection on.
241314
/// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
242315
/// on debug builds.
316+
/// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
243317
/// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
244318
/// - Parameter connectionBackoff: The connection backoff configuration to use, defaulting
245319
/// to `nil`.
246320
public init(
247321
target: ConnectionTarget,
248322
eventLoopGroup: EventLoopGroup,
249323
errorDelegate: ClientErrorDelegate? = DebugOnlyLoggingClientErrorDelegate.shared,
324+
connectivityStateDelegate: ConnectivityStateDelegate? = nil,
250325
tlsConfiguration: TLSConfiguration? = nil,
251326
connectionBackoff: ConnectionBackoff? = nil
252327
) {
253328
self.target = target
254329
self.eventLoopGroup = eventLoopGroup
255330
self.errorDelegate = errorDelegate
331+
self.connectivityStateDelegate = connectivityStateDelegate
256332
self.tlsConfiguration = tlsConfiguration
257333
self.connectionBackoff = connectionBackoff
258334
}
@@ -309,8 +385,7 @@ fileprivate extension Channel {
309385
context: configuration.sslContext,
310386
serverHostname: configuration.hostnameOverride)
311387

312-
let verificationHandler = TLSVerificationHandler(errorDelegate: errorDelegate)
313-
return self.pipeline.addHandlers(sslClientHandler, verificationHandler)
388+
return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler())
314389
} catch {
315390
return self.eventLoop.makeFailedFuture(error)
316391
}

0 commit comments

Comments
 (0)