Skip to content

Commit 0f8c911

Browse files
authored
fix connection leak (#17)
motivation: recent PR to address NIO atomic changes has caused a conection leak changes: * implement a state machine to cache and manage connection state * add connection count test * refactor tests * safer concurrency
1 parent 9525518 commit 0f8c911

File tree

3 files changed

+177
-188
lines changed

3 files changed

+177
-188
lines changed

Sources/StatsdClient/StatsdClient.swift

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import CoreMetrics
16+
import Dispatch
1617
import NIO
1718
import NIOConcurrencyHelpers
1819

@@ -245,6 +246,15 @@ private final class Client {
245246

246247
private let isShutdown = NIOAtomic<Bool>.makeAtomic(value: false)
247248

249+
private var state = State.disconnected
250+
private let lock = Lock()
251+
252+
private enum State {
253+
case disconnected
254+
case connecting(EventLoopFuture<Void>)
255+
case connected(Channel)
256+
}
257+
248258
init(eventLoopGroupProvider: StatsdClient.EventLoopGroupProvider, address: SocketAddress) {
249259
self.eventLoopGroupProvider = eventLoopGroupProvider
250260
switch self.eventLoopGroupProvider {
@@ -273,8 +283,37 @@ private final class Client {
273283
}
274284

275285
func emit(_ metric: Metric) -> EventLoopFuture<Void> {
276-
return self.connect().flatMap { channel in
277-
channel.writeAndFlush(metric)
286+
self.lock.lock()
287+
switch self.state {
288+
case .disconnected:
289+
let promise = self.eventLoopGroup.next().makePromise(of: Void.self)
290+
self.state = .connecting(promise.futureResult)
291+
self.lock.unlock()
292+
self.connect().flatMap { channel -> EventLoopFuture<Void> in
293+
self.lock.withLock {
294+
guard case .connecting = self.state else {
295+
preconditionFailure("invalid state \(self.state)")
296+
}
297+
self.state = .connected(channel)
298+
}
299+
return self.emit(metric)
300+
}.cascade(to: promise)
301+
return promise.futureResult
302+
case .connecting(let future):
303+
let future = future.flatMap {
304+
self.emit(metric)
305+
}
306+
self.state = .connecting(future)
307+
self.lock.unlock()
308+
return future
309+
case .connected(let channel):
310+
guard channel.isActive else {
311+
self.state = .disconnected
312+
self.lock.unlock()
313+
return self.emit(metric)
314+
}
315+
self.lock.unlock()
316+
return channel.writeAndFlush(metric)
278317
}
279318
}
280319

Tests/StatsdClientTests/StatsdClientTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ extension StatsdClientTests {
3636
("testRecorderInteger", testRecorderInteger),
3737
("testRecorderDouble", testRecorderDouble),
3838
("testCouncurrency", testCouncurrency),
39+
("testNumberOfConnections", testNumberOfConnections),
3940
]
4041
}
4142
}

0 commit comments

Comments
 (0)