Skip to content

Commit cc3add5

Browse files
committed
68 -- Catch situations where the remote connection is closed
1 parent c4ff3fc commit cc3add5

File tree

2 files changed

+95
-14
lines changed

2 files changed

+95
-14
lines changed

Sources/RediStack/ChannelHandlers/RedisCommandHandler.swift

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2019 RediStack project authors
5+
// Copyright (c) 2019-2020 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -37,6 +37,7 @@ public struct RedisCommand {
3737
public final class RedisCommandHandler {
3838
/// FIFO queue of promises waiting to receive a response value from a sent command.
3939
private var commandResponseQueue: CircularBuffer<EventLoopPromise<RESPValue>>
40+
private var state: State = .default
4041

4142
deinit {
4243
if !self.commandResponseQueue.isEmpty {
@@ -50,31 +51,45 @@ public final class RedisCommandHandler {
5051
public init(initialQueueCapacity: Int = 3) {
5152
self.commandResponseQueue = CircularBuffer(initialCapacity: initialQueueCapacity)
5253
}
54+
55+
private enum State {
56+
case `default`, error(Error)
57+
}
5358
}
5459

5560
// MARK: ChannelInboundHandler
5661

5762
extension RedisCommandHandler: ChannelInboundHandler {
58-
/// See `NIO.ChannelInboundHandler.InboundIn`
5963
public typealias InboundIn = RESPValue
6064

61-
/// Invoked by SwiftNIO when an error has been thrown. The command queue will be drained, with each promise in the queue being failed with the error thrown.
65+
/// Invoked by SwiftNIO when an error has been thrown. The command queue will be drained
66+
/// with each promise in the queue being failed with the error thrown.
6267
///
6368
/// See `NIO.ChannelInboundHandler.errorCaught(context:error:)`
6469
/// - Important: This will also close the socket connection to Redis.
65-
/// - Note:`RedisMetrics.commandFailureCount` is **not** incremented from this error.
66-
///
67-
/// A `Logging.LogLevel.critical` message will be written with the caught error.
70+
/// - Note:`RedisMetrics.commandFailureCount` is **not** incremented from this method.
6871
public func errorCaught(context: ChannelHandlerContext, error: Error) {
72+
self._failCommandQueue(because: error)
73+
context.close(promise: nil)
74+
}
75+
76+
/// Invoked by SwiftNIO when the channel's active state has changed, such as when it is closed. The command queue will be drained
77+
/// with each promise in the queue being failed from a connection closed error.
78+
///
79+
/// See `NIO.ChannelInboundHandler.channelInactive(context:)`
80+
/// - Note: `RedisMetrics.commandFailureCount` is **not** incremented from this method.
81+
public func channelInactive(context: ChannelHandlerContext) {
82+
self._failCommandQueue(because: RedisClientError.connectionClosed)
83+
}
84+
85+
private func _failCommandQueue(because error: Error) {
6986
let queue = self.commandResponseQueue
70-
7187
self.commandResponseQueue.removeAll()
7288
queue.forEach { $0.fail(error) }
73-
74-
context.close(promise: nil)
7589
}
7690

7791
/// Invoked by SwiftNIO when a read has been fired from earlier in the response chain.
92+
///
7893
/// This forwards the decoded `RESPValue` response message to the promise waiting to be fulfilled at the front of the command queue.
7994
/// - Note: `RedisMetrics.commandFailureCount` and `RedisMetrics.commandSuccessCount` are incremented from this method.
8095
///
@@ -94,6 +109,11 @@ extension RedisCommandHandler: ChannelInboundHandler {
94109
RedisMetrics.commandSuccessCount.increment()
95110
}
96111
}
112+
113+
// public func channelUnregistered(context: ChannelHandlerContext) {
114+
// self._drainCommandQueue(because: RedisClientError.connectionClosed)
115+
// }
116+
97117
}
98118

99119
// MARK: ChannelOutboundHandler
@@ -105,16 +125,23 @@ extension RedisCommandHandler: ChannelOutboundHandler {
105125
public typealias OutboundOut = RESPValue
106126

107127
/// Invoked by SwiftNIO when a `write` has been requested on the `Channel`.
128+
///
108129
/// This unwraps a `RedisCommand`, storing the `NIO.EventLoopPromise` in a command queue,
109130
/// to fulfill later with the response to the command that is about to be sent through the `NIO.Channel`.
110131
///
111132
/// See `NIO.ChannelOutboundHandler.write(context:data:promise:)`
112133
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
113134
let commandContext = self.unwrapOutboundIn(data)
114-
self.commandResponseQueue.append(commandContext.responsePromise)
115-
context.write(
116-
self.wrapOutboundOut(commandContext.message),
117-
promise: promise
118-
)
135+
136+
switch self.state {
137+
case let .error(e): commandContext.responsePromise.fail(e)
138+
139+
case .default:
140+
self.commandResponseQueue.append(commandContext.responsePromise)
141+
context.write(
142+
self.wrapOutboundOut(commandContext.message),
143+
promise: promise
144+
)
145+
}
119146
}
120147
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the RediStack open source project
4+
//
5+
// Copyright (c) 2019-2020 RediStack project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of RediStack project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
@testable import RediStack
17+
import XCTest
18+
19+
final class RedisCommandHandlerTests: XCTestCase {
20+
func test_whenRemoteConnectionCloses_handlerFailsCommandQueue() throws {
21+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
22+
defer { try? group.syncShutdownGracefully() }
23+
let socketAddress = try SocketAddress.makeAddressResolvingHost("localhost", port: 8080)
24+
25+
let server = try ServerBootstrap(group: group)
26+
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
27+
.childChannelInitializer { $0.pipeline.addHandler(RemoteCloseHandler()) }
28+
.bind(to: socketAddress)
29+
.wait()
30+
defer { try? server.close().wait() }
31+
32+
let connection = try RedisConnection.connect(
33+
to: socketAddress,
34+
on: group.next()
35+
).wait()
36+
defer { try? connection.close().wait() }
37+
38+
XCTAssertThrowsError(try connection.ping().wait()) {
39+
guard let error = $0 as? RedisClientError else {
40+
XCTFail("Wrong error type thrown")
41+
return
42+
}
43+
XCTAssertEqual(error, .connectionClosed)
44+
}
45+
}
46+
}
47+
48+
private final class RemoteCloseHandler: ChannelInboundHandler {
49+
typealias InboundIn = ByteBuffer
50+
51+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
52+
context.close(promise: nil)
53+
}
54+
}

0 commit comments

Comments
 (0)