Skip to content

Commit 063c356

Browse files
authored
Merge pull request #158 from vapor/error-handling
Add error handlers to TCPSocketSink
2 parents 50632dc + d7bcab1 commit 063c356

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

Sources/TCP/Streams/TCPSocketSink.swift

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ private let maxExcessSignalCount: Int = 2
77

88
/// Data stream wrapper for a dispatch socket.
99
public final class TCPSocketSink: Async.InputStream {
10+
/// Able to handle errors that are thrown to the Sink
11+
public typealias ErrorHandler = (TCPSocketSink, Error) -> ()
12+
1013
/// See InputStream.Input
1114
public typealias Input = ByteBuffer
1215

@@ -34,15 +37,19 @@ public final class TCPSocketSink: Async.InputStream {
3437
/// The current number of signals received while downstream was not ready
3538
/// since it was last ready
3639
private var excessSignalCount: Int
40+
41+
/// This closure will be called with an error thrown from upstream
42+
private let onError: ErrorHandler
3743

3844
/// Creates a new `SocketSink`
39-
internal init(socket: TCPSocket, on worker: Worker) {
45+
internal init(socket: TCPSocket, on worker: Worker, onError: @escaping ErrorHandler) {
4046
self.socket = socket
4147
self.eventLoop = worker.eventLoop
4248
self.inputBuffer = nil
4349
self.isClosed = false
4450
self.sourceIsSuspended = true
4551
self.excessSignalCount = 0
52+
self.onError = onError
4653
let writeSource = self.eventLoop.onWritable(descriptor: socket.descriptor, writeSourceSignal)
4754
self.writeSource = writeSource
4855
}
@@ -64,8 +71,7 @@ public final class TCPSocketSink: Async.InputStream {
6471
case .close:
6572
close()
6673
case .error(let e):
67-
close()
68-
fatalError("\(e)")
74+
onError(self, e)
6975
}
7076
}
7177

@@ -163,8 +169,15 @@ public final class TCPSocketSink: Async.InputStream {
163169

164170
extension TCPSocket {
165171
/// Creates a data stream for this socket on the supplied event loop.
172+
public func sink(on eventLoop: Worker, onError: @escaping TCPSocketSink.ErrorHandler) -> TCPSocketSink {
173+
return .init(socket: self, on: eventLoop, onError: onError)
174+
}
175+
176+
/// Creates a data stream for this socket on the supplied event loop.
177+
@available(*, deprecated)
166178
public func sink(on eventLoop: Worker) -> TCPSocketSink {
167-
return .init(socket: self, on: eventLoop)
179+
return .init(socket: self, on: eventLoop) { _, error in
180+
fatalError("Uncaught error in TCPSocketSink: \(error).")
181+
}
168182
}
169183
}
170-

Sources/TCP/Streams/TCPSocketStream.swift

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public final class TCPSocketStream: ByteStream {
1616
internal let sink: TCPSocketSink
1717

1818
/// Internal stream init. Use socket convenience method.
19-
internal init(socket: TCPSocket, bufferSize: Int, on worker: Worker) {
19+
internal init(socket: TCPSocket, bufferSize: Int, on worker: Worker, onError: @escaping TCPSocketSink.ErrorHandler) {
2020
self.source = socket.source(on: worker, bufferSize: bufferSize)
21-
self.sink = socket.sink(on: worker)
21+
self.sink = socket.sink(on: worker, onError: onError)
2222
}
2323

2424
/// See `InputStream.input(_:)`
@@ -34,7 +34,15 @@ public final class TCPSocketStream: ByteStream {
3434

3535
extension TCPSocket {
3636
/// Create a `TCPSocketStream` for this socket.
37+
public func stream(bufferSize: Int = 4096, on worker: Worker, onError: @escaping TCPSocketSink.ErrorHandler) -> TCPSocketStream {
38+
return TCPSocketStream(socket: self, bufferSize: bufferSize, on: worker, onError: onError)
39+
}
40+
41+
/// Create a `TCPSocketStream` for this socket.
42+
@available(*, deprecated)
3743
public func stream(bufferSize: Int = 4096, on worker: Worker) -> TCPSocketStream {
38-
return TCPSocketStream(socket: self, bufferSize: bufferSize, on: worker)
44+
return TCPSocketStream(socket: self, bufferSize: bufferSize, on: worker) { _, error in
45+
fatalError("Uncaught error in TCPSocketStream: \(error).")
46+
}
3947
}
4048
}

Tests/TCPTests/SocketsTests.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class SocketsTests: XCTestCase {
2525
/// set up the server stream
2626
serverStream.drain { client in
2727
let clientSource = client.socket.source(on: workerLoop)
28-
let clientSink = client.socket.sink(on: workerLoop)
28+
let clientSink = client.socket.sink(on: workerLoop) { sink, error in
29+
XCTFail("\(error)")
30+
sink.close()
31+
}
2932
clientSource.output(to: clientSink)
3033
}.catch { err in
3134
XCTFail("\(err)")

0 commit comments

Comments
 (0)