Skip to content

Commit 781e272

Browse files
committed
fix various autobahn issues for websocket client and server
1 parent 4d5e912 commit 781e272

File tree

9 files changed

+202
-271
lines changed

9 files changed

+202
-271
lines changed

Sources/AutobahnClient/AutobahnClient.swift

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TestWebSocketClient {
2626

2727
static let serverAddress = "127.0.0.1"
2828
static let serverPort = 9001
29+
static let agentName = "LCLWebSocketClient"
2930

3031
public static func main() throws {
3132

@@ -55,16 +56,20 @@ struct TestWebSocketClient {
5556
ws.send(binary, opcode: .binary)
5657
}
5758
try client.connect(
58-
to: "ws://\(Self.serverAddress):\(Self.serverPort)/runCase?case=\(i)&agent={LCLWebSocketClient}",
59+
to: "ws://\(Self.serverAddress):\(Self.serverPort)/runCase?case=\(i)&agent=\(Self.agentName)",
5960
configuration: Self.config
6061
).wait()
6162
}
6263

6364
let closeClient = LCLWebSocket.client()
64-
try closeClient.connect(
65-
to: "ws://\(Self.serverAddress):\(Self.serverPort)/updateReports?agent={LCLWebSocketClient}",
66-
configuration: Self.config
67-
).wait()
65+
do {
66+
try closeClient.connect(
67+
to: "ws://\(Self.serverAddress):\(Self.serverPort)/updateReports?agent=\(Self.agentName)",
68+
configuration: Self.config
69+
).wait()
70+
} catch {
71+
print("Error closing client: \(error)")
72+
}
6873

6974
}
7075
}

Sources/AutobahnServer/AutohahnServer.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ struct AutohahnServer {
3737
let portIndex = portCmdIndex + 1
3838
port = Int(args[portIndex]) ?? port
3939
}
40-
41-
var server = LCLWebSocket.server()
40+
41+
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 4)
42+
var server = LCLWebSocket.server(on: elg)
4243

4344
server.onBinary { ws, buffer in
4445
ws.send(buffer, opcode: .binary)

Sources/LCLWebSocket/Client/WebSocketClient.swift

Lines changed: 29 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,10 @@ extension WebSocketClient {
290290

291291
func makeClientBootstrap() -> EventLoopFuture<Channel> {
292292
ClientBootstrap(group: self.eventloopGroup)
293+
.channelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
294+
.channelOption(.tcpOption(.tcp_nodelay), value: SocketOptionValue(configuration.socketTcpNoDelay ? 1 : 0))
295+
.channelOption(.socketOption(.so_sndbuf), value: configuration.socketSendBufferSize)
296+
.channelOption(.socketOption(.so_rcvbuf), value: configuration.socketReceiveBufferSize)
293297
.connectTimeout(configuration.connectionTimeout)
294298
.channelInitializer(channelInitializer)
295299
.connect(to: resolvedAddress)
@@ -303,6 +307,7 @@ extension WebSocketClient {
303307

304308
return NIOTSConnectionBootstrap(group: self.eventloopGroup)
305309
.tcpOptions(tcpOptions)
310+
.channelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
306311
.channelInitializer(channelInitializer)
307312
.connect(to: resolvedAddress)
308313
}
@@ -327,81 +332,34 @@ extension WebSocketClient {
327332
) -> ChannelInitializer {
328333
@Sendable
329334
func makeChannelInitializer(_ channel: Channel) -> EventLoopFuture<Void> {
330-
if self.isMultiThreadedEventloop {
331-
if configuration.socketReuseAddress,
332-
let syncOptions = channel.syncOptions
333-
{
334-
do {
335-
try syncOptions.setOption(.socketOption(.so_reuseaddr), value: 1)
336-
} catch {
337-
return channel.eventLoop.makeFailedFuture(error)
338-
}
339-
}
340-
341-
if configuration.socketTcpNoDelay,
342-
let syncOptions = channel.syncOptions
343-
{
344-
do {
345-
try syncOptions.setOption(.socketOption(.tcp_nodelay), value: 1)
346-
} catch {
347-
return channel.eventLoop.makeFailedFuture(error)
348-
}
349-
}
350-
}
351-
352-
if let socketSendBufferSize = configuration.socketSendBufferSize,
353-
let syncOptions = channel.syncOptions
354-
{
355-
do {
356-
try syncOptions.setOption(.socketOption(.so_sndbuf), value: socketSendBufferSize)
357-
} catch {
358-
return channel.eventLoop.makeFailedFuture(error)
359-
}
360-
}
361-
362-
if let socketReceiveBuffer = configuration.socketReceiveBufferSize,
363-
let syncOptions = channel.syncOptions
364-
{
365-
do {
366-
try syncOptions.setOption(.socketOption(.so_rcvbuf), value: socketReceiveBuffer)
367-
} catch {
368-
return channel.eventLoop.makeFailedFuture(error)
369-
}
370-
}
371-
372-
// bind to selected device, if any
373335
if let deviceName = configuration.deviceName,
374-
let device = findDevice(with: deviceName, protocol: resolvedAddress.protocol)
375-
{
376-
do {
377-
try bindTo(device: device, on: channel)
378-
} catch {
379-
return channel.eventLoop.makeFailedFuture(error)
380-
}
381-
}
382-
383-
// enable TLS
384-
if scheme.enableTLS {
385-
let tlsConfig = configuration.tlsConfiguration ?? scheme.defaultTLSConfig!
386-
guard let sslContext = try? NIOSSLContext(configuration: tlsConfig) else {
387-
return channel.eventLoop.makeFailedFuture(LCLWebSocketError.tlsInitializationFailed)
388-
}
389-
390-
do {
391-
let sslClientHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: host)
392-
try channel.pipeline.syncOperations.addHandlers(sslClientHandler)
393-
} catch let error as NIOSSLExtraError where error == .invalidSNIHostname {
394-
do {
395-
let sslClientHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: nil)
396-
try channel.pipeline.syncOperations.addHandlers(sslClientHandler)
397-
} catch {
398-
return channel.eventLoop.makeFailedFuture(error)
336+
let device = findDevice(with: deviceName, protocol: resolvedAddress.protocol) {
337+
// bind to selected device, if any
338+
return bindTo(device: device, on: channel).flatMap { () -> EventLoopFuture<Void> in
339+
if scheme.enableTLS {
340+
// enale TLS
341+
let tlsConfig = configuration.tlsConfiguration ?? scheme.defaultTLSConfig!
342+
guard let sslContext = try? NIOSSLContext(configuration: tlsConfig) else {
343+
return channel.eventLoop.makeFailedFuture(LCLWebSocketError.tlsInitializationFailed)
344+
}
345+
346+
do {
347+
let sslClientHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: host)
348+
try channel.pipeline.syncOperations.addHandlers(sslClientHandler)
349+
} catch let error as NIOSSLExtraError where error == .invalidSNIHostname {
350+
do {
351+
let sslClientHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: nil)
352+
try channel.pipeline.syncOperations.addHandlers(sslClientHandler)
353+
} catch {
354+
return channel.eventLoop.makeFailedFuture(error)
355+
}
356+
} catch {
357+
return channel.eventLoop.makeFailedFuture(error)
358+
}
399359
}
400-
} catch {
401-
return channel.eventLoop.makeFailedFuture(error)
360+
return channel.eventLoop.makeSucceededVoidFuture()
402361
}
403362
}
404-
405363
return channel.eventLoop.makeSucceededVoidFuture()
406364
}
407365

Sources/LCLWebSocket/LCLWebSocket+ChannelInitializer.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@ typealias ChannelInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
2020
/// - Parameters:
2121
/// - device: the device to bind to
2222
/// - on: the channel that will be bound to the device
23-
internal func bindTo(device: NIONetworkDevice, on channel: Channel) throws {
23+
internal func bindTo(device: NIONetworkDevice, on channel: Channel) -> EventLoopFuture<Void> {
2424
#if canImport(Darwin)
2525
switch device.address {
2626
case .v4:
27-
try channel.syncOptions?.setOption(.ipOption(.ip_bound_if), value: CInt(device.interfaceIndex))
27+
return channel.setOption(.ipOption(.ip_bound_if), value: CInt(device.interfaceIndex))
2828
case .v6:
29-
try channel.syncOptions?.setOption(.ipv6Option(.ipv6_bound_if), value: CInt(device.interfaceIndex))
29+
return channel.setOption(.ipv6Option(.ipv6_bound_if), value: CInt(device.interfaceIndex))
3030
default:
31-
throw LCLWebSocketError.invalidDevice
31+
return channel.eventLoop.makeFailedFuture(LCLWebSocketError.invalidDevice)
3232
}
3333
#elseif canImport(Glibc) || canImport(Musl)
34-
try (channel as! SocketOptionProvider).setBindToDevice(device.name).wait()
34+
return (channel as! SocketOptionProvider).setBindToDevice(device.name)
3535
#endif
3636
}
3737

Sources/LCLWebSocket/LCLWebSocketListenable.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ protocol LCLWebSocketListenable {
3434
mutating func onBinary(_ onBinary: (@Sendable (WebSocket, ByteBuffer) -> Void)?)
3535

3636
/// Invoked when both peers have indicated that no more messages will be transmitted and
37-
/// the connection has been successfully released. No further calls to this listener will be made.
3837
mutating func onClosing(_ onBinary: (@Sendable (WebSocketErrorCode?, String?) -> Void)?)
3938

4039
/// Invoked when the remote peer has indicated that no more incoming messages will be transmitted.

Sources/LCLWebSocket/Server/WebSocketServer.swift

Lines changed: 10 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -233,20 +233,6 @@ public struct WebSocketServer: Sendable, LCLWebSocketListenable {
233233
extension WebSocketServer {
234234

235235
private func initializeChildChannel(using configuration: LCLWebSocket.Configuration, on channel: Channel) throws {
236-
if self.eventloopGroup is MultiThreadedEventLoopGroup {
237-
if configuration.socketReuseAddress,
238-
let syncOptions = channel.syncOptions
239-
{
240-
try syncOptions.setOption(.socketOption(.so_reuseaddr), value: 1)
241-
}
242-
243-
if configuration.socketTcpNoDelay,
244-
let syncOptions = channel.syncOptions
245-
{
246-
try syncOptions.setOption(.socketOption(.tcp_nodelay), value: 1)
247-
}
248-
}
249-
250236
// enable tls if configuration is provided
251237
if let tlsConfiguration = configuration.tlsConfiguration {
252238
guard let sslContext = try? NIOSSLContext(configuration: tlsConfiguration) else {
@@ -265,57 +251,20 @@ extension WebSocketServer {
265251

266252
func makeServerBootstrap() -> EventLoopFuture<Channel> {
267253
ServerBootstrap(group: self.eventloopGroup)
254+
.serverChannelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
255+
.serverChannelOption(.tcpOption(.tcp_nodelay), value: SocketOptionValue(configuration.socketTcpNoDelay ? 1 : 0))
256+
.childChannelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
257+
.childChannelOption(.tcpOption(.tcp_nodelay), value: SocketOptionValue(configuration.socketTcpNoDelay ? 1 : 0))
258+
.childChannelOption(.socketOption(.so_sndbuf), value: configuration.socketSendBufferSize)
259+
.childChannelOption(.socketOption(.so_rcvbuf), value: configuration.socketReceiveBufferSize)
268260
.serverChannelInitializer { channel in
269261
logger.info("Server is listening on \(resolvedAddress)")
270-
if configuration.socketReuseAddress,
271-
let syncOptions = channel.syncOptions
272-
{
273-
do {
274-
try syncOptions.setOption(.socketOption(.so_reuseaddr), value: 1)
275-
} catch {
276-
return channel.eventLoop.makeFailedFuture(error)
277-
}
278-
}
279-
280-
if configuration.socketTcpNoDelay,
281-
let syncOptions = channel.syncOptions
282-
{
283-
do {
284-
try syncOptions.setOption(.socketOption(.tcp_nodelay), value: 1)
285-
} catch {
286-
return channel.eventLoop.makeFailedFuture(error)
287-
}
288-
}
289-
290-
if let socketSendBufferSize = configuration.socketSendBufferSize,
291-
let syncOptions = channel.syncOptions
292-
{
293-
do {
294-
try syncOptions.setOption(.socketOption(.so_sndbuf), value: socketSendBufferSize)
295-
} catch {
296-
return channel.eventLoop.makeFailedFuture(error)
297-
}
298-
}
299-
300-
if let socketReceiveBuffer = configuration.socketReceiveBufferSize,
301-
let syncOptions = channel.syncOptions
302-
{
303-
do {
304-
try syncOptions.setOption(.socketOption(.so_rcvbuf), value: socketReceiveBuffer)
305-
} catch {
306-
return channel.eventLoop.makeFailedFuture(error)
307-
}
308-
}
309262

310263
// bind to selected device, if any
311264
if let deviceName = configuration.deviceName,
312265
let device = findDevice(with: deviceName, protocol: resolvedAddress.protocol)
313266
{
314-
do {
315-
try bindTo(device: device, on: channel)
316-
} catch {
317-
return channel.eventLoop.makeFailedFuture(error)
318-
}
267+
return bindTo(device: device, on: channel)
319268
}
320269

321270
return channel.eventLoop.makeSucceededVoidFuture()
@@ -333,37 +282,16 @@ extension WebSocketServer {
333282

334283
return NIOTSListenerBootstrap(group: self.eventloopGroup)
335284
.tcpOptions(tcpOptions)
285+
.serverChannelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
286+
.childChannelOption(.socketOption(.so_reuseaddr), value: SocketOptionValue(configuration.socketReuseAddress ? 1 : 0))
336287
.serverChannelInitializer { channel in
337288
logger.info("Server is listening on \(resolvedAddress)")
338-
if let socketSendBufferSize = configuration.socketSendBufferSize,
339-
let syncOptions = channel.syncOptions
340-
{
341-
do {
342-
try syncOptions.setOption(.socketOption(.so_sndbuf), value: socketSendBufferSize)
343-
} catch {
344-
return channel.eventLoop.makeFailedFuture(error)
345-
}
346-
}
347-
348-
if let socketReceiveBuffer = configuration.socketReceiveBufferSize,
349-
let syncOptions = channel.syncOptions
350-
{
351-
do {
352-
try syncOptions.setOption(.socketOption(.so_rcvbuf), value: socketReceiveBuffer)
353-
} catch {
354-
return channel.eventLoop.makeFailedFuture(error)
355-
}
356-
}
357289

358290
// bind to selected device, if any
359291
if let deviceName = configuration.deviceName,
360292
let device = findDevice(with: deviceName, protocol: resolvedAddress.protocol)
361293
{
362-
do {
363-
try bindTo(device: device, on: channel)
364-
} catch {
365-
return channel.eventLoop.makeFailedFuture(error)
366-
}
294+
return bindTo(device: device, on: channel)
367295
}
368296

369297
return channel.eventLoop.makeSucceededVoidFuture()

0 commit comments

Comments
 (0)