Skip to content

Commit dc4510e

Browse files
committed
Added ConnectionPool for redis. Fixed bug with hostName.
1 parent 8447b61 commit dc4510e

File tree

9 files changed

+136
-90
lines changed

9 files changed

+136
-90
lines changed

Sources/Configuration.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public struct Configuration {
2323
/// Used in order to specify a custom queue to consume from
2424
let queue: String
2525
/// Used for consumer specific processing queues.
26-
/// If not provided Host.current().name will be used
26+
/// If not provided the servers hostname will be used.
2727
let consumerName: String?
2828

2929
let middleware: [Middleware]
@@ -59,16 +59,19 @@ public struct RedisConfig {
5959
let port: UInt16
6060

6161
let password: String?
62+
/// Max number of connections that will be created by the connection pool.
63+
let connections: Int
6264

6365
public static var development: RedisConfig {
64-
return .init(redisDB: nil, hostname: "127.0.0.1", port: 6379, password: nil)
66+
return .init(redisDB: nil, hostname: "127.0.0.1", port: 6379, password: nil, connections: 4)
6567
}
6668

67-
public init(redisDB: Int?, hostname: String, port: UInt16, password: String?) {
69+
public init(redisDB: Int?, hostname: String, port: UInt16, password: String?, connections: Int) {
6870
self.redisDB = redisDB
6971
self.hostname = hostname
7072
self.port = port
7173
self.password = password
74+
self.connections = connections
7275
}
7376

7477
}

Sources/ConnectionPool.swift

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//
2+
// ConnectionPool.swift
3+
// SwiftQ
4+
//
5+
// Created by John Connolly on 2017-08-09.
6+
//
7+
//
8+
9+
import Foundation
10+
import Dispatch
11+
12+
final class ConnectionPool<T> {
13+
14+
var connections = [T]()
15+
16+
private let connectionFactory: () throws -> T
17+
18+
private let max: Int
19+
20+
private let semaphore: DispatchSemaphore
21+
/// Semaphore seems to perform much better that a serial queue
22+
/// Probably related to the extra heap allocation of the sync { } closure.
23+
private let lock = DispatchSemaphore(value: 1)
24+
25+
26+
init(max: Int, factory: @escaping () throws -> T) throws {
27+
self.max = max
28+
self.connectionFactory = factory
29+
self.semaphore = DispatchSemaphore(value: max)
30+
31+
try (0...max).forEach { _ in
32+
connections.append(try factory())
33+
}
34+
}
35+
36+
/// Borrows a connection from the pool. Any borrowed connections
37+
/// have to be returned. If all connections are borrowed the semaphore
38+
/// will block until one is returned.
39+
func borrow() -> T {
40+
semaphore.wait()
41+
defer {
42+
lock.signal()
43+
}
44+
lock.wait()
45+
return connections.removeFirst()
46+
47+
}
48+
49+
/// Returns the connection to the pool.
50+
func takeBack(connection: T) {
51+
semaphore.signal()
52+
defer {
53+
lock.signal()
54+
}
55+
lock.wait()
56+
connections.append(connection)
57+
}
58+
59+
}

Sources/Decoder.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ struct Decoder {
1212

1313
let types: [Task.Type]
1414

15-
private var zippedTasks: [(String , Task.Type)] {
15+
private let zippedTasks: [(String , Task.Type)]
16+
17+
init(types: [Task.Type]) {
18+
self.types = types
1619
let taskNames = types.map(String.init(describing:))
17-
return zip(taskNames, types).array
20+
self.zippedTasks = zip(taskNames, types).array
1821
}
19-
2022

2123
/// Returns the correct task type based on the zipped tasks
2224
func decode(data: Foundation.Data) throws -> DecoderResult {

Sources/Host.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
//
2+
// Host.swift
3+
// SwiftQ
4+
//
5+
// Created by John Connolly on 2017-08-11.
6+
//
7+
//
8+
9+
import Foundation
10+
#if os(Linux)
11+
import Glibc
12+
#endif
13+
14+
struct Host {
15+
16+
let name: String
17+
18+
19+
init() {
20+
self.name = Host.currentHostName()
21+
}
22+
23+
/// https://github.com/apple/swift-corelibs-foundation/blob/master/Foundation/Host.swift
24+
private static func currentHostName() -> String {
25+
let hname = UnsafeMutablePointer<Int8>.allocate(capacity: Int(NI_MAXHOST))
26+
defer {
27+
hname.deinitialize()
28+
hname.deallocate(capacity: Int(NI_MAXHOST))
29+
}
30+
let r = gethostname(hname, Int(NI_MAXHOST))
31+
if r < 0 || hname[0] == 0 {
32+
return "localhost"
33+
}
34+
return String(cString: hname)
35+
}
36+
37+
}

Sources/IPAddress.swift

Lines changed: 0 additions & 54 deletions
This file was deleted.

Sources/RedisAdaptor.swift

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,43 @@
88

99
import Foundation
1010
import Redis
11-
import Dispatch
1211

1312
final class RedisAdaptor: Adaptor {
14-
15-
private let client: Redis.TCPClient
1613

17-
private let dispatchQueue = DispatchQueue(label: "com.swiftq.redis")
14+
private let client: Redis.TCPClient
1815

16+
let pool: ConnectionPool<Redis.TCPClient>
1917

2018
init(config: RedisConfig) throws {
19+
self.pool = try ConnectionPool(max: config.connections) {
20+
return try Redis.TCPClient(hostname: config.hostname, port: config.port, password: config.password)
21+
}
2122
self.client = try Redis.TCPClient(hostname: config.hostname, port: config.port, password: config.password)
2223
if let database = config.redisDB {
23-
try execute(Command(command: "select", args: [.string(database.description)]))
24+
try execute(Command(command: "SELECT", args: [.string(database.description)]))
2425
}
2526
}
2627

27-
2828
@discardableResult
2929
func execute(_ command: Command) throws -> RedisResponseRepresentable {
30-
return try dispatchQueue.sync {
31-
VaporRedisResponse(response: try client.command(Redis.Command(command.command), command.bytes))
32-
}
30+
let client = pool.borrow()
31+
defer { pool.takeBack(connection: client) }
32+
return VaporRedisResponse(response: try client.command(Redis.Command(command.command), command.bytes))
3333
}
3434

3535

3636
@discardableResult
3737
func pipeline(_ commands: () -> ([Command])) throws -> [RedisResponseRepresentable] {
38-
return try dispatchQueue.sync {
39-
let arguments = commands()
40-
let pipeline = client.makePipeline()
41-
try arguments.forEach { argument in
42-
try pipeline.enqueue(Redis.Command(argument.command), argument.bytes)
43-
}
44-
return try pipeline.execute().map(VaporRedisResponse.init)
38+
let client = pool.borrow()
39+
defer {
40+
pool.takeBack(connection: client)
41+
}
42+
let arguments = commands()
43+
let pipeline = client.makePipeline()
44+
try arguments.forEach { argument in
45+
try pipeline.enqueue(Redis.Command(argument.command), argument.bytes)
4546
}
47+
return try pipeline.execute().map(VaporRedisResponse.init)
4648
}
4749

4850
}

Sources/RedisResponse.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
import Foundation
1010
import Redis
1111

12-
1312
struct VaporRedisResponse: RedisResponseRepresentable {
1413

1514
let response: Redis.Data?
1615

1716
var data: Foundation.Data? {
18-
return response?.string?.data(using: .utf8)
17+
return response?.bytes.map(Foundation.Data.init(bytes:))
1918
}
2019

2120
var string: String? {
@@ -27,9 +26,9 @@ struct VaporRedisResponse: RedisResponseRepresentable {
2726
}
2827

2928
var array: [Foundation.Data]? {
30-
return response?
31-
.array?
32-
.flatMap { $0?.string?.data(using: .utf8) }
29+
return response?.array?.flatMap { data -> Foundation.Data? in
30+
return data?.bytes.map(Foundation.Data.init(bytes:))
31+
}
3332
}
3433

3534
}

Sources/ReliableQueue.swift

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ final class ReliableQueue {
1313

1414
private let redisAdaptor: Adaptor
1515

16-
private let blockingRedisAdaptor: Adaptor // Maybe have a pool here instead
17-
18-
private let ipAddress = IPAddress().address
16+
private let hostName = Host().name
1917

2018
private let consumer: String?
2119

@@ -26,7 +24,6 @@ final class ReliableQueue {
2624
self.queue = queue
2725
self.consumer = consumer
2826
self.redisAdaptor = try RedisAdaptor(config: config)
29-
self.blockingRedisAdaptor = try RedisAdaptor(config: config)
3027
}
3128

3229

@@ -35,7 +32,7 @@ final class ReliableQueue {
3532
}
3633

3734
var consumerName: String {
38-
return consumer ?? ipAddress
35+
return consumer ?? hostName
3936
}
4037

4138

@@ -112,11 +109,11 @@ final class ReliableQueue {
112109
.string(RedisKey.workQ(queue).name),
113110
.string(processingQKey),
114111
.string("0")])
115-
return try blockingRedisAdaptor.execute(dequeueCommand).string
112+
return try redisAdaptor.execute(dequeueCommand).string
116113
.map { id in
117114
return Command(command: "GET", args:[.string(id)])
118115
}.flatMap { command in
119-
return try blockingRedisAdaptor.execute(command).data
116+
return try redisAdaptor.execute(command).data
120117
}
121118
}
122119

@@ -126,9 +123,11 @@ final class ReliableQueue {
126123
let incrKey = success ? RedisKey.success(consumerName).name : RedisKey.failure(consumerName).name
127124
try redisAdaptor.pipeline {
128125
let commands = [
126+
Command(command: "MULTI"),
129127
Command(command: "LREM", args: [.string(processingQKey), .string("0"), .string(item.uuid)]),
130128
Command(command: "INCR", args: [.string(incrKey) ]),
131-
Command(command: "DEL", args: [.string(item.uuid)])
129+
Command(command: "DEL", args: [.string(item.uuid)]),
130+
Command(command: "EXEC")
132131
]
133132
return commands
134133
}

Sources/Worker.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ final class Worker {
4646
do {
4747
guard let data = try self.reliableQueue.dequeue() else { return }
4848
try self.decode(data)
49-
5049
} catch {
5150
Logger.log(("Decoding Failure", error))
5251
}

0 commit comments

Comments
 (0)