Skip to content

Commit 38dea81

Browse files
committed
Merge branch 'composeability' into rev-2
* composeability: Update unit test references for new type names and to cover more test cases Update `DispatchRedis` module with new classes Remove deprecated classes Update command extensions Add `RedisDriver` to eventually replace `NIORedis` that works more deeply with NIO Add `RedisPipeline` to eventually replace `NIORedisPipeline` that works more deeply with NIO Add `RedisConnection` to eventually replace `NIORedisConnection` that works more deeply with NIO Add `RedisCommandHandler` to eventually replace `RedisMessenger` that works more deeply with NIO
2 parents 19cbadc + bd4cf78 commit 38dea81

16 files changed

+325
-324
lines changed

Sources/DispatchRedis/Redis.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import NIORedis
44

55
/// A factory that handles all necessary details for creating `RedisConnection` instances.
66
public final class Redis {
7-
private let driver: NIORedis
7+
private let driver: RedisDriver
88

99
deinit { try? driver.terminate() }
1010

1111
public init(threadCount: Int = 1) {
12-
self.driver = NIORedis(executionModel: .spawnThreads(threadCount))
12+
self.driver = RedisDriver(executionModel: .spawnThreads(threadCount))
1313
}
1414

1515
public func makeConnection(

Sources/DispatchRedis/RedisConnection.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@ import Foundation
22
import NIORedis
33

44
public final class RedisConnection {
5-
let _driverConnection: NIORedisConnection
5+
let _driverConnection: NIORedis.RedisConnection
66

77
private let queue: DispatchQueue
88

99
deinit { _driverConnection.close() }
1010

11-
init(driver: NIORedisConnection, callbackQueue: DispatchQueue) {
11+
init(driver: NIORedis.RedisConnection, callbackQueue: DispatchQueue) {
1212
self._driverConnection = driver
1313
self.queue = callbackQueue
1414
}
1515

1616
/// Creates a `RedisPipeline` for executing a batch of commands.
1717
public func makePipeline(callbackQueue: DispatchQueue? = nil) -> RedisPipeline {
18-
return .init(using: self, callbackQueue: callbackQueue ?? queue)
18+
return .init(connection: self, callbackQueue: callbackQueue ?? queue)
1919
}
2020

2121
public func get(

Sources/DispatchRedis/RedisPipeline.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ import NIORedis
1414
/// - Important: The larger the pipeline queue, the more memory both the Redis driver and Redis server will use.
1515
/// See https://redis.io/topics/pipelining#redis-pipelining
1616
public final class RedisPipeline {
17-
private let _driverPipeline: NIORedisPipeline
17+
private let _driverPipeline: NIORedis.RedisPipeline
1818
private let queue: DispatchQueue
1919

2020
/// Creates a new pipeline queue using the provided `RedisConnection`, executing callbacks on the provided `DispatchQueue`.
2121
/// - Parameters:
2222
/// - using: The connection to execute the commands on.
2323
/// - callbackQueue: The queue to execute all callbacks on.
24-
public init(using connection: RedisConnection, callbackQueue: DispatchQueue) {
25-
self._driverPipeline = NIORedisPipeline(using: connection._driverConnection)
24+
public init(connection: RedisConnection, callbackQueue: DispatchQueue) {
25+
self._driverPipeline = NIORedis.RedisPipeline(channel: connection._driverConnection.channel)
2626
self.queue = callbackQueue
2727
}
2828

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import Foundation
2+
import NIO
3+
4+
/// A context for `RedisCommandHandler` to operate within.
5+
public struct RedisCommandContext {
6+
/// A full command keyword and arguments stored as a single `RESPValue`.
7+
public let command: RESPValue
8+
/// A promise expected to be fulfilled with the `RESPValue` response to the command from Redis.
9+
public let promise: EventLoopPromise<RESPValue>
10+
}
11+
12+
/// A `ChannelDuplexHandler` that works with `RedisCommandContext`s to send commands and forward responses.
13+
open class RedisCommandHandler {
14+
/// Queue of promises waiting to receive a response value from a sent command.
15+
private var commandResponseQueue: [EventLoopPromise<RESPValue>]
16+
17+
public init() {
18+
self.commandResponseQueue = []
19+
}
20+
}
21+
22+
// MARK: ChannelInboundHandler
23+
24+
extension RedisCommandHandler: ChannelInboundHandler {
25+
/// See `ChannelInboundHandler.InboundIn`
26+
public typealias InboundIn = RESPValue
27+
28+
/// Invoked by NIO when an error has been thrown. The command response promise at the front of the queue will be
29+
/// failed with the error.
30+
///
31+
/// See `ChannelInboundHandler.errorCaught(ctx:error:)`
32+
public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
33+
guard let leadPromise = commandResponseQueue.last else {
34+
return assertionFailure("Received unexpected error while idle: \(error.localizedDescription)")
35+
}
36+
leadPromise.fail(error: error)
37+
}
38+
39+
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
40+
/// `RESPValue` to the promise awaiting a response at the front of the queue.
41+
///
42+
/// See `ChannelInboundHandler.channelRead(ctx:data:)`
43+
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
44+
let value = unwrapInboundIn(data)
45+
46+
guard let leadPromise = commandResponseQueue.last else {
47+
return assertionFailure("Read triggered with an empty promise queue! Ignoring: \(value)")
48+
}
49+
50+
let popped = commandResponseQueue.popLast()
51+
assert(popped != nil)
52+
53+
switch value {
54+
case .error(let e): leadPromise.fail(error: e)
55+
default: leadPromise.succeed(result: value)
56+
}
57+
}
58+
}
59+
60+
// MARK: ChannelOutboundHandler
61+
62+
extension RedisCommandHandler: ChannelOutboundHandler {
63+
/// See `ChannelOutboundHandler.OutboundIn`
64+
public typealias OutboundIn = RedisCommandContext
65+
/// See `ChannelOutboundHandler.OutboundOut`
66+
public typealias OutboundOut = RESPValue
67+
68+
/// Invoked by NIO when a `write` has been requested on the `Channel`.
69+
/// This unwraps a `RedisCommandContext`, retaining a callback to forward a response to later, and forwards
70+
/// the underlying command data further into the pipeline.
71+
///
72+
/// See `ChannelOutboundHandler.write(ctx:data:promise:)`
73+
public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
74+
let context = unwrapOutboundIn(data)
75+
commandResponseQueue.insert(context.promise, at: 0)
76+
ctx.write(wrapOutboundOut(context.command), promise: promise)
77+
}
78+
}

Sources/NIORedis/ChannelHandlers/RedisMessenger.swift

Lines changed: 0 additions & 102 deletions
This file was deleted.

Sources/NIORedis/Commands/BasicCommands.swift

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
import Foundation
22
import NIO
33

4-
extension NIORedisConnection {
4+
extension RedisConnection {
55
/// Select the Redis logical database having the specified zero-based numeric index.
66
/// New connections always use the database 0.
77
///
88
/// https://redis.io/commands/select
99
public func select(_ id: Int) -> EventLoopFuture<Void> {
10-
return command("SELECT", [RESPValue(bulk: id.description)])
10+
return command("SELECT", arguments: [RESPValue(bulk: id.description)])
1111
.map { _ in return () }
1212
}
1313

1414
/// Request for authentication in a password-protected Redis server.
1515
///
1616
/// https://redis.io/commands/auth
1717
public func authorize(with password: String) -> EventLoopFuture<Void> {
18-
return command("AUTH", [RESPValue(bulk: password)]).map { _ in return () }
18+
return command("AUTH", arguments: [RESPValue(bulk: password)])
19+
.map { _ in return () }
1920
}
2021

2122
/// Removes the specified keys. A key is ignored if it does not exist.
@@ -24,7 +25,7 @@ extension NIORedisConnection {
2425
/// - Returns: A future number of keys that were removed.
2526
public func delete(_ keys: String...) -> EventLoopFuture<Int> {
2627
let keyArgs = keys.map { RESPValue(bulk: $0) }
27-
return command("DEL", keyArgs)
28+
return command("DEL", arguments: keyArgs)
2829
.thenThrowing { res in
2930
guard let count = res.int else {
3031
throw RedisError(identifier: "delete", reason: "Unexpected response: \(res)")
@@ -41,7 +42,7 @@ extension NIORedisConnection {
4142
/// - after: The lifetime (in seconds) the key will expirate at.
4243
/// - Returns: A future bool indicating if the expiration was set or not.
4344
public func expire(_ key: String, after deadline: Int) -> EventLoopFuture<Bool> {
44-
return command("EXPIRE", [RESPValue(bulk: key), RESPValue(bulk: deadline.description)])
45+
return command("EXPIRE", arguments: [RESPValue(bulk: key), RESPValue(bulk: deadline.description)])
4546
.thenThrowing { res in
4647
guard let value = res.int else {
4748
throw RedisError(identifier: "expire", reason: "Unexpected response: \(res)")
@@ -56,7 +57,7 @@ extension NIORedisConnection {
5657
///
5758
/// https://redis.io/commands/get
5859
public func get(_ key: String) -> EventLoopFuture<String?> {
59-
return command("GET", [RESPValue(bulk: key)])
60+
return command("GET", arguments: [RESPValue(bulk: key)])
6061
.map { return $0.string }
6162
}
6263

@@ -66,7 +67,7 @@ extension NIORedisConnection {
6667
///
6768
/// https://redis.io/commands/set
6869
public func set(_ key: String, to value: String) -> EventLoopFuture<Void> {
69-
return command("SET", [RESPValue(bulk: key), RESPValue(bulk: value)])
70+
return command("SET", arguments: [RESPValue(bulk: key), RESPValue(bulk: value)])
7071
.map { _ in return () }
7172
}
7273
}

Sources/NIORedis/NIORedisConnection.swift

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)