Skip to content

Commit c7ed19d

Browse files
committed
add support for buffered bytes from websocket channel
1 parent f15aa1e commit c7ed19d

File tree

6 files changed

+58
-124
lines changed

6 files changed

+58
-124
lines changed

Package.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ let package = Package(
77
.macOS(.v10_15),
88
.iOS(.v13),
99
.watchOS(.v6),
10-
.tvOS(.v13),
10+
.tvOS(.v13)
1111
],
1212
products: [
13-
.library(name: "WebSocketKit", targets: ["WebSocketKit"]),
13+
.library(name: "WebSocketKit", targets: ["WebSocketKit"])
1414
],
1515
dependencies: [
16-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"),
16+
.package(url: "https://github.com/johnnzhou/swift-nio.git", branch: "main"),
1717
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.16.0"),
1818
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.24.0"),
1919
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.16.0"),
20-
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"),
20+
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0")
2121
],
2222
targets: [
2323
.target(name: "WebSocketKit", dependencies: [
@@ -32,7 +32,7 @@ let package = Package(
3232
.product(name: "Atomics", package: "swift-atomics")
3333
]),
3434
.testTarget(name: "WebSocketKitTests", dependencies: [
35-
.target(name: "WebSocketKit"),
36-
]),
35+
.target(name: "WebSocketKit")
36+
])
3737
]
3838
)

Sources/WebSocketKit/BufferWritableMonitorDelegate.swift

Lines changed: 0 additions & 12 deletions
This file was deleted.

Sources/WebSocketKit/BufferWritableMonitorHandler.swift

Lines changed: 0 additions & 51 deletions
This file was deleted.

Sources/WebSocketKit/WebSocket.swift

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@ public final class WebSocket: Sendable {
2222
public var closeCode: WebSocketErrorCode? {
2323
_closeCode.withLockedValue { $0 }
2424
}
25-
25+
2626
private let _closeCode: NIOLockedValueBox<WebSocketErrorCode?>
2727

2828
public var onClose: EventLoopFuture<Void> {
2929
self.channel.closeFuture
3030
}
3131

32+
public var allocator: ByteBufferAllocator {
33+
self.channel.allocator
34+
}
35+
3236
@usableFromInline
3337
/* private but @usableFromInline */
3438
internal let channel: Channel
35-
private let onTextCallback: NIOLoopBoundBox<@Sendable (WebSocket, String) -> ()>
36-
private let onBinaryCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> ()>
37-
private let onPongCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> ()>
38-
private let onPingCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> ()>
39+
private let onTextCallback: NIOLoopBoundBox<@Sendable (WebSocket, String) -> Void>
40+
private let onBinaryCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> Void>
41+
private let onPongCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> Void>
42+
private let onPingCallback: NIOLoopBoundBox<@Sendable (WebSocket, ByteBuffer) -> Void>
3943
private let type: PeerType
4044
private let waitingForPong: NIOLockedValueBox<Bool>
4145
private let waitingForClose: NIOLockedValueBox<Bool>
@@ -60,29 +64,29 @@ public final class WebSocket: Sendable {
6064
self._bufferedBytes = .init(0)
6165
}
6266

63-
@preconcurrency public func onText(_ callback: @Sendable @escaping (WebSocket, String) -> ()) {
67+
@preconcurrency public func onText(_ callback: @Sendable @escaping (WebSocket, String) -> Void) {
6468
self.onTextCallback.value = callback
6569
}
6670

67-
@preconcurrency public func onBinary(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> ()) {
71+
@preconcurrency public func onBinary(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> Void) {
6872
self.onBinaryCallback.value = callback
6973
}
70-
71-
public func onPong(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> ()) {
74+
75+
public func onPong(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> Void) {
7276
self.onPongCallback.value = callback
7377
}
74-
78+
7579
@available(*, deprecated, message: "Please use `onPong { socket, data in /* … */ }` with the additional `data` parameter.")
76-
@preconcurrency public func onPong(_ callback: @Sendable @escaping (WebSocket) -> ()) {
80+
@preconcurrency public func onPong(_ callback: @Sendable @escaping (WebSocket) -> Void) {
7781
self.onPongCallback.value = { ws, _ in callback(ws) }
7882
}
7983

80-
public func onPing(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> ()) {
84+
public func onPing(_ callback: @Sendable @escaping (WebSocket, ByteBuffer) -> Void) {
8185
self.onPingCallback.value = callback
8286
}
83-
87+
8488
@available(*, deprecated, message: "Please use `onPing { socket, data in /* … */ }` with the additional `data` parameter.")
85-
@preconcurrency public func onPing(_ callback: @Sendable @escaping (WebSocket) -> ()) {
89+
@preconcurrency public func onPing(_ callback: @Sendable @escaping (WebSocket) -> Void) {
8690
self.onPingCallback.value = { ws, _ in callback(ws) }
8791
}
8892

@@ -111,8 +115,7 @@ public final class WebSocket: Sendable {
111115

112116
@inlinable
113117
public func send<S>(_ text: S, promise: EventLoopPromise<Void>? = nil)
114-
where S: Collection, S.Element == Character
115-
{
118+
where S: Collection, S.Element == Character {
116119
let string = String(text)
117120
let buffer = channel.allocator.buffer(string: string)
118121
self.send(buffer, opcode: .text, fin: true, promise: promise)
@@ -143,8 +146,7 @@ public final class WebSocket: Sendable {
143146
fin: Bool = true,
144147
promise: EventLoopPromise<Void>? = nil
145148
)
146-
where Data: DataProtocol
147-
{
149+
where Data: DataProtocol {
148150
if let byteBufferView = data as? ByteBufferView {
149151
// optimisation: converting from `ByteBufferView` to `ByteBuffer` doesn't allocate or copy any data
150152
send(ByteBuffer(byteBufferView), opcode: opcode, fin: fin, promise: promise)
@@ -342,13 +344,9 @@ public final class WebSocket: Sendable {
342344
}
343345
}
344346

345-
extension WebSocket: BufferWritableMonitorDelegate {
346-
func onBufferWritableChanged(amountQueued: Int) {
347-
self._bufferedBytes.withLockedValue { $0 = amountQueued }
348-
}
349-
350-
public var bufferedBytes: Int {
351-
self._bufferedBytes.withLockedValue { $0 }
347+
extension WebSocket {
348+
public func getBufferedBytes() -> EventLoopFuture<Int> {
349+
return channel.getOption(.bufferedWritableBytes)
352350
}
353351
}
354352

Sources/WebSocketKit/WebSocketClient.swift

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public final class WebSocketClient: Sendable {
2323

2424
public struct Configuration: Sendable {
2525
public var tlsConfiguration: TLSConfiguration?
26-
public var maxFrameSize: Int
26+
public var maxFrameSize: Int = 1 << 24
2727

2828
/// Defends against small payloads in frame aggregation.
2929
/// See `NIOWebSocketFrameAggregator` for details.
@@ -37,7 +37,7 @@ public final class WebSocketClient: Sendable {
3737

3838
public init(
3939
tlsConfiguration: TLSConfiguration? = nil,
40-
maxFrameSize: Int = 1 << 14
40+
maxFrameSize: Int = 1 << 24
4141
) {
4242
self.tlsConfiguration = tlsConfiguration
4343
self.maxFrameSize = maxFrameSize
@@ -71,11 +71,11 @@ public final class WebSocketClient: Sendable {
7171
path: String = "/",
7272
query: String? = nil,
7373
headers: HTTPHeaders = [:],
74-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
74+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
7575
) -> EventLoopFuture<Void> {
7676
self.connect(scheme: scheme, host: host, port: port, path: path, query: query, headers: headers, proxy: nil, onUpgrade: onUpgrade)
7777
}
78-
78+
7979
@preconcurrency
8080
public func connect(
8181
scheme: String,
@@ -85,11 +85,11 @@ public final class WebSocketClient: Sendable {
8585
query: String? = nil,
8686
headers: HTTPHeaders = [:],
8787
maxQueueSize: Int? = nil,
88-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
88+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
8989
) -> EventLoopFuture<Void> {
9090
self.connect(scheme: scheme, host: host, port: port, path: path, query: query, maxQueueSize: maxQueueSize, headers: headers, proxy: nil, onUpgrade: onUpgrade)
9191
}
92-
92+
9393
/// Establish a WebSocket connection via a proxy server.
9494
///
9595
/// - Parameters:
@@ -118,7 +118,7 @@ public final class WebSocketClient: Sendable {
118118
proxyPort: Int? = nil,
119119
proxyHeaders: HTTPHeaders = [:],
120120
proxyConnectDeadline: NIODeadline = NIODeadline.distantFuture,
121-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
121+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
122122
) -> EventLoopFuture<Void> {
123123
assert(["ws", "wss"].contains(scheme))
124124
let upgradePromise = self.group.any().makePromise(of: Void.self)
@@ -152,14 +152,14 @@ public final class WebSocketClient: Sendable {
152152
let websocketUpgrader = NIOWebSocketClientUpgrader(
153153
maxFrameSize: self.configuration.maxFrameSize,
154154
automaticErrorHandling: true,
155-
upgradePipelineHandler: { channel, req in
155+
upgradePipelineHandler: { channel, _ in
156156
return WebSocket.client(on: channel, maxQueueSize: maxQueueSize, config: .init(clientConfig: self.configuration), onUpgrade: onUpgrade)
157157
}
158158
)
159159

160160
let config: NIOHTTPClientUpgradeConfiguration = (
161161
upgraders: [websocketUpgrader],
162-
completionHandler: { context in
162+
completionHandler: { _ in
163163
upgradePromise.succeed(())
164164
channel.pipeline.removeHandler(httpUpgradeRequestHandlerBox.value, promise: nil)
165165
}
@@ -254,16 +254,24 @@ public final class WebSocketClient: Sendable {
254254

255255
let connect = bootstrap.connect(host: proxy ?? host, port: proxyPort ?? port)
256256
connect.cascadeFailure(to: upgradePromise)
257-
return connect.flatMap { channel in
257+
return connect.flatMap { _ in
258258
return upgradePromise.futureResult
259259
}
260260
}
261261

262262
@Sendable
263263
private func makeTLSHandler(tlsConfiguration: TLSConfiguration?, host: String) throws -> NIOSSLClientHandler {
264+
var tlsConfig: TLSConfiguration
265+
if self.configuration.tlsConfiguration == nil {
266+
tlsConfig = .makeClientConfiguration()
267+
tlsConfig.certificateVerification = .none
268+
} else {
269+
tlsConfig = self.configuration.tlsConfiguration!
270+
}
264271
let context = try NIOSSLContext(
265-
configuration: self.configuration.tlsConfiguration ?? .makeClientConfiguration()
272+
configuration: tlsConfig
266273
)
274+
267275
let tlsHandler: NIOSSLClientHandler
268276
do {
269277
tlsHandler = try NIOSSLClientHandler(context: context, serverHostname: host)
@@ -289,7 +297,7 @@ public final class WebSocketClient: Sendable {
289297
}
290298
}
291299
}
292-
300+
293301
private static func makeBootstrap(on eventLoop: EventLoopGroup) -> NIOClientTCPBootstrapProtocol {
294302
#if canImport(Network)
295303
if let tsBootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoop) {

Sources/WebSocketKit/WebSocketHandler.swift

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extension WebSocket {
3636
@preconcurrency
3737
public static func client(
3838
on channel: Channel,
39-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
39+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
4040
) -> EventLoopFuture<Void> {
4141
return self.configure(on: channel, as: .client, with: Configuration(), onUpgrade: onUpgrade)
4242
}
@@ -52,7 +52,7 @@ extension WebSocket {
5252
on channel: Channel,
5353
maxQueueSize: Int? = nil,
5454
config: Configuration,
55-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
55+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
5656
) -> EventLoopFuture<Void> {
5757
return self.configure(on: channel, as: .client, maxQueueSize: maxQueueSize, with: config, onUpgrade: onUpgrade)
5858
}
@@ -65,7 +65,7 @@ extension WebSocket {
6565
@preconcurrency
6666
public static func server(
6767
on channel: Channel,
68-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
68+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
6969
) -> EventLoopFuture<Void> {
7070
return self.configure(on: channel, as: .server, with: Configuration(), onUpgrade: onUpgrade)
7171
}
@@ -80,7 +80,7 @@ extension WebSocket {
8080
public static func server(
8181
on channel: Channel,
8282
config: Configuration,
83-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
83+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
8484
) -> EventLoopFuture<Void> {
8585
return self.configure(on: channel, as: .server, with: config, onUpgrade: onUpgrade)
8686
}
@@ -90,7 +90,7 @@ extension WebSocket {
9090
as type: PeerType,
9191
maxQueueSize: Int? = nil,
9292
with config: Configuration,
93-
onUpgrade: @Sendable @escaping (WebSocket) -> ()
93+
onUpgrade: @Sendable @escaping (WebSocket) -> Void
9494
) -> EventLoopFuture<Void> {
9595
let webSocket = WebSocket(channel: channel, type: type)
9696

@@ -102,19 +102,10 @@ extension WebSocket {
102102
),
103103
WebSocketHandler(webSocket: webSocket)
104104
]).map { _ in
105-
if let maxQueueSize = maxQueueSize {
106-
channel.pipeline.addHandler(BufferWritableMonitorHandler(capacity: maxQueueSize, delegate: webSocket), position: .first).whenSuccess { _ in
107-
#if DEBUG
108-
print(channel.pipeline.debugDescription)
109-
#endif
110-
onUpgrade(webSocket)
111-
}
112-
} else {
113-
#if DEBUG
114-
print(channel.pipeline.debugDescription)
115-
#endif
116-
onUpgrade(webSocket)
117-
}
105+
#if DEBUG
106+
print(channel.pipeline.debugDescription)
107+
#endif
108+
onUpgrade(webSocket)
118109
}.flatMapError { error in
119110
return channel.eventLoop.makeFailedFuture(error)
120111
}

0 commit comments

Comments
 (0)