diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index ee761680a6..38797233eb 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -79,12 +79,11 @@ public struct NIOAsyncChannel: Sendable { /// The stream of inbound messages. /// /// - Important: The `inbound` stream is a unicast `AsyncSequence` and only one iterator can be created. - @available(*, deprecated, message: "Use the executeThenClose scoped method instead.") public var inbound: NIOAsyncChannelInboundStream { self._inbound } + /// The writer for writing outbound messages. - @available(*, deprecated, message: "Use the executeThenClose scoped method instead.") public var outbound: NIOAsyncChannelOutboundWriter { self._outbound } diff --git a/Sources/NIOPosix/Bootstrap.swift b/Sources/NIOPosix/Bootstrap.swift index a79148ee6d..4acbdc8a9f 100644 --- a/Sources/NIOPosix/Bootstrap.swift +++ b/Sources/NIOPosix/Bootstrap.swift @@ -521,6 +521,209 @@ public final class ServerBootstrap { // MARK: Async bind methods extension ServerBootstrap { + + /// Represents a target address or socket for binding a server socket. + /// + /// `BindTarget` provides a type-safe way to specify different types of binding targets + /// for server bootstraps. It supports various address types including network addresses, + /// Unix domain sockets, VSOCK addresses, and existing socket handles. + @_spi(StructuredConcurrencyNIOAsyncChannel) + public struct BindTarget: Sendable { + + enum Base { + case hostAndPort(host: String, port: Int) + case socketAddress(SocketAddress) + case unixDomainSocketPath(String) + case vsockAddress(VsockAddress) + case socket(NIOBSDSocket.Handle) + } + + var base: Base + + /// Creates a binding target for a hostname and port. + /// + /// This method creates a target that will resolve the hostname and bind to the + /// specified port. The hostname resolution follows standard system behavior + /// and may resolve to both IPv4 and IPv6 addresses depending on system configuration. + /// + /// - Parameters: + /// - host: The hostname or IP address to bind to. Can be a domain name like + /// "localhost" or "example.com", or an IP address like "127.0.0.1" or "::1" + /// - port: The port number to bind to (0-65535). Use 0 to let the system + /// choose an available port + public static func hostAndPort(_ host: String, _ port: Int) -> BindTarget { + BindTarget(base: .hostAndPort(host: host, port: port)) + } + + /// Creates a binding target for a specific socket address. + /// + /// Use this method when you have a pre-constructed ``SocketAddress`` that + /// specifies the exact binding location, including IPv4, IPv6, or Unix domain addresses. + /// + /// - Parameter address: The socket address to bind to + public static func socketAddress(_ address: SocketAddress) -> BindTarget { + BindTarget(base: .socketAddress(address)) + } + + /// Creates a binding target for a Unix domain socket. + /// + /// Unix domain sockets provide high-performance inter-process communication + /// on the same machine using filesystem paths. The socket file will be created + /// at the specified path when binding occurs. + /// + /// - Parameter path: The filesystem path for the Unix domain socket. + /// Must be a valid filesystem path and should not exist. + /// - Warning: The path must not exist. + public static func unixDomainSocketPath(_ path: String) -> BindTarget { + BindTarget(base: .unixDomainSocketPath(path)) + } + + /// Creates a binding target for a VSOCK address. + /// + /// VSOCK (Virtual Socket) provides communication between virtual machines and their hosts, + /// or between different virtual machines on the same host. This is commonly used + /// in virtualized environments for guest-host communication. + /// + /// - Parameter vsockAddress: The VSOCK address to bind to, containing both + /// context ID (CID) and port number + /// - Note: VSOCK support depends on the underlying platform and virtualization technology + public static func vsockAddress(_ vsockAddress: VsockAddress) -> BindTarget { + BindTarget(base: .vsockAddress(vsockAddress)) + } + + /// Creates a binding target for an existing socket handle. + /// + /// This method allows you to use a pre-existing socket that has already been + /// created and optionally configured. This is useful for advanced scenarios where you + /// need custom socket setup before binding, or when integrating with external libraries. + /// + /// - Parameters: + /// - handle: The existing socket handle to use. Must be a valid, open socket + /// that is compatible with the intended server bootstrap type + /// - Note: The bootstrap will take ownership of the socket handle and will close + /// it when the server shuts down + public static func socket(_ handle: NIOBSDSocket.Handle) -> BindTarget { + BindTarget(base: .socket(handle)) + } + } + + /// Bind the `ServerSocketChannel` to the ``BindTarget``. This method will returns once all connections that + /// were spawned have been closed. + /// + /// # Supporting graceful shutdown + /// + /// To support a graceful server shutdown we recommend using the `ServerQuiescingHelper` from the + /// SwiftNIO extras package. The `ServerQuiescingHelper` can be installed using the + /// ``ServerBootstrap/serverChannelInitializer`` callback. + /// + /// Below you can find the code to setup a simple TCP echo server that supports graceful server closure. + /// + /// ```swift + /// let quiesce = ServerQuiescingHelper(group: group) + /// let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue) + /// signalSource.setEventHandler { + /// signalSource.cancel() + /// print("received signal, initiating shutdown which should complete after the last request finished.") + /// + /// quiesce.initiateShutdown(promise: fullyShutdownPromise) + /// } + /// try await ServerBootstrap(group: self.eventLoopGroup) + /// .serverChannelInitializer { channel in + /// channel.eventLoop.makeCompletedFuture { + /// try channel.pipeline.syncOperations.addHandler(quiesce.makeServerChannelHandler(channel: channel)) + /// } + /// } + /// .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) + /// .bind( + /// target: .hostAndPort(self.host, self.port), + /// childChannelInitializer: { channel in + /// channel.eventLoop.makeCompletedFuture { + /// try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) + /// try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder())) + /// + /// return try NIOAsyncChannel( + /// wrappingChannelSynchronously: channel, + /// configuration: NIOAsyncChannel.Configuration( + /// inboundType: String.self, + /// outboundType: String.self + /// ) + /// ) + /// } + /// } + /// ) { channel in + /// print("Handling new connection") + /// await self.handleConnection(channel: channel) + /// print("Done handling connection") + /// } + /// + /// ``` + /// + /// - Parameters: + /// - target: The ``BindTarget`` to use. + /// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. + /// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is used in the + /// `handleChildChannel` closure. + /// - handleChildChannel: A closure to handle the connection. Use the channel's `inbound` property to read from + /// the connection and channel's `outbound` to write to the connection. + /// - handleServerChannel: A closure that will be called once the server has been started. Use this to get access to + /// the serverChannel, if you used port `0` in the ``BindTarget``. You can also use it to + /// send events on the server channel pipeline. + /// - Note: The bind method respects task cancellation which will force close the server. If you want to gracefully + /// shut-down use the quiescing helper approach as outlined above. + @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) + @_spi(StructuredConcurrencyNIOAsyncChannel) + public func bind( + target: BindTarget, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture>, + handleChildChannel: + @escaping @Sendable ( + _ channel: NIOAsyncChannel + ) async -> Void, + handleServerChannel: @Sendable @escaping (Channel) async -> Void = { _ in } + ) async throws { + let channel = try await self.makeConnectedChannel( + target: target, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + + try await withTaskCancellationHandler { + try await channel.executeThenClose { inbound, outbound in + // we need to dance the result dance here, since we can't throw from the + // withDiscardingTaskGroup closure. + let result = await withDiscardingTaskGroup { group -> Result in + + group.addTask { + await handleServerChannel(channel.channel) + } + + do { + try await channel.executeThenClose { inbound in + for try await connectionChannel in inbound { + group.addTask { + do { + try await connectionChannel.executeThenClose { _, _ in + await handleChildChannel(connectionChannel) + } + } catch { + // ignore single connection failures + } + } + } + } + return .success(()) + } catch { + return .failure(error) + } + } + try result.get() + } + } onCancel: { + channel.channel.close(promise: nil) + } + } + /// Bind the `ServerSocketChannel` to the `host` and `port` parameters. /// /// - Parameters: @@ -622,6 +825,86 @@ extension ServerBootstrap { to vsockAddress: VsockAddress, serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture + ) async throws -> NIOAsyncChannel { + try await self._bind( + to: vsockAddress, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + } + + /// Use the existing bound socket file descriptor. + /// + /// - Parameters: + /// - socket: The _Unix file descriptor_ representing the bound stream socket. + /// - cleanupExistingSocketFile: Unused. + /// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. + /// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect` + /// method. + /// - Returns: The result of the channel initializer. + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + public func bind( + _ socket: NIOBSDSocket.Handle, + cleanupExistingSocketFile: Bool = false, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture + ) async throws -> NIOAsyncChannel { + try await self._bind( + socket, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + } + + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + private func makeConnectedChannel( + target: BindTarget, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture> + ) async throws -> NIOAsyncChannel, Never> { + switch target.base { + case .hostAndPort(let host, let port): + try await self.bind( + to: try SocketAddress.makeAddressResolvingHost(host, port: port), + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + + case .unixDomainSocketPath(let unixDomainSocketPath): + try await self.bind( + to: try SocketAddress(unixDomainSocketPath: unixDomainSocketPath), + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + + case .socketAddress(let address): + try await self.bind( + to: address, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + + case .vsockAddress(let vsockAddress): + try await self._bind( + to: vsockAddress, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + + case .socket(let handle): + try await self._bind( + handle, + serverBackPressureStrategy: serverBackPressureStrategy, + childChannelInitializer: childChannelInitializer + ) + } + } + + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + private func _bind( + to vsockAddress: VsockAddress, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture ) async throws -> NIOAsyncChannel { func makeChannel( _ eventLoop: SelectableEventLoop, @@ -652,19 +935,9 @@ extension ServerBootstrap { }.get() } - /// Use the existing bound socket file descriptor. - /// - /// - Parameters: - /// - socket: The _Unix file descriptor_ representing the bound stream socket. - /// - cleanupExistingSocketFile: Unused. - /// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect` - /// method. - /// - Returns: The result of the channel initializer. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) - public func bind( + private func _bind( _ socket: NIOBSDSocket.Handle, - cleanupExistingSocketFile: Bool = false, serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture ) async throws -> NIOAsyncChannel { @@ -1270,6 +1543,132 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol { // MARK: Async connect methods extension ClientBootstrap { + + /// Represents a target address or socket for creating a client socket. + /// + /// `ConnectTarget` provides a type-safe way to specify different types of connecting targets + /// for client bootstraps. It supports various address types including network addresses, + /// Unix domain sockets, VSOCK addresses, and existing socket handles. + @_spi(StructuredConcurrencyNIOAsyncChannel) + public struct ConnectTarget: Sendable { + + enum Base { + case hostAndPort(host: String, port: Int) + case socketAddress(SocketAddress) + case unixDomainSocketPath(String) + case vsockAddress(VsockAddress) + case socket(NIOBSDSocket.Handle) + } + + var base: Base + + /// Creates a connect target for a hostname and port. + /// + /// This method creates a target that will resolve the hostname and connect to the + /// specified port. The hostname resolution follows standard system behavior + /// and may resolve to both IPv4 and IPv6 addresses depending on system configuration. + /// + /// - Parameters: + /// - host: The hostname or IP address to bind to. Can be a domain name like + /// "localhost" or "example.com", or an IP address like "189.201.14.13" or "::1" + /// - port: The port number to connect to (0-65535). + public static func hostAndPort(_ host: String, _ port: Int) -> ConnectTarget { + ConnectTarget(base: .hostAndPort(host: host, port: port)) + } + + /// Creates a connect target for a specific socket address. + /// + /// Use this method when you have a pre-constructed ``SocketAddress`` that + /// specifies the exact connect location, including IPv4, IPv6, or Unix domain addresses. + /// + /// - Parameter address: The socket address to connect to + public static func socketAddress(_ address: SocketAddress) -> ConnectTarget { + ConnectTarget(base: .socketAddress(address)) + } + + /// Creates a connect target for a Unix domain socket. + /// + /// Unix domain sockets provide high-performance inter-process communication + /// on the same machine using filesystem paths. The socket file needs to exist in + /// order to connect to it. + /// + /// - Parameter path: The filesystem path for the Unix domain socket. + /// Must be a valid filesystem path and should exist. + /// - Warning: The path must exist. + public static func unixDomainSocketPath(_ path: String) -> ConnectTarget { + ConnectTarget(base: .unixDomainSocketPath(path)) + } + + /// Creates a connect target for a VSOCK address. + /// + /// VSOCK (Virtual Socket) provides communication between virtual machines and their hosts, + /// or between different virtual machines on the same host. This is commonly used + /// in virtualized environments for guest-host communication. + /// + /// - Parameter vsockAddress: The VSOCK address to connect to, containing both + /// context ID (CID) and port number + /// - Note: VSOCK support depends on the underlying platform and virtualization technology + public static func vsockAddress(_ vsockAddress: VsockAddress) -> ConnectTarget { + ConnectTarget(base: .vsockAddress(vsockAddress)) + } + + /// Creates a connect target for an existing socket handle. + /// + /// This method allows you to use a pre-existing socket that has already been + /// created and optionally configured. This is useful for advanced scenarios where you + /// need custom socket setup before binding, or when integrating with external libraries. + /// + /// - Parameters: + /// - handle: The existing socket handle to use. Must be a valid, open socket + /// that is compatible with the intended server bootstrap type + /// - Note: The bootstrap will take ownership of the socket handle and will close + /// it when the server shuts down + public static func socket(_ handle: NIOBSDSocket.Handle) -> ConnectTarget { + ConnectTarget(base: .socket(handle)) + } + } + + /// Create a client connection to the ``ConnectTarget``. The connection will be closed once the scope of the + /// `handleChannel` closure is exited. + /// + /// - Parameters: + /// - target: The ``ConnectTarget`` to use. + /// - backPressureStrategy: The back pressure strategy used by the channel. + /// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is used in handleChannel + /// closure. + /// - handleChannel: A closure to handle the client connection. Use the channel's `inbound` property to read from + /// the connection and channel's `outbound` to write to the connection. + @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) + @_spi(StructuredConcurrencyNIOAsyncChannel) + public func connect( + target: ConnectTarget, + backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture>, + handleChannel: (_ channel: NIOAsyncChannel) async -> sending Result + ) async throws -> sending Result { + let channel: NIOAsyncChannel + switch target.base { + case .socketAddress(let socketAddress): + channel = try await self.connect(to: socketAddress, channelInitializer: channelInitializer) + + case .hostAndPort(let host, let port): + channel = try await self.connect(host: host, port: port, channelInitializer: channelInitializer) + + case .unixDomainSocketPath(let path): + channel = try await self.connect(unixDomainSocketPath: path, channelInitializer: channelInitializer) + + case .vsockAddress(let vsockAddress): + channel = try await self.connect(to: vsockAddress, channelInitializer: channelInitializer) + + case .socket(let handle): + channel = try await self.withConnectedSocket(handle, channelInitializer: channelInitializer) + } + + return try await channel.executeThenClose { _, _ in + await handleChannel(channel) + } + } + /// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established. /// /// - Parameters: diff --git a/Sources/NIOTCPEchoClient/Client.swift b/Sources/NIOTCPEchoClient/Client.swift index 67f917e9da..4e04624dfd 100644 --- a/Sources/NIOTCPEchoClient/Client.swift +++ b/Sources/NIOTCPEchoClient/Client.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import NIOCore -import NIOPosix +@_spi(StructuredConcurrencyNIOAsyncChannel) import NIOPosix @available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) @main @@ -48,11 +48,10 @@ struct Client { } private func sendRequest(number: Int) async throws { - let channel = try await ClientBootstrap(group: self.eventLoopGroup) + try await ClientBootstrap(group: self.eventLoopGroup) .connect( - host: self.host, - port: self.port - ) { channel in + target: ClientBootstrap.ConnectTarget.hostAndPort(self.host, self.port) + ) { channel -> EventLoopFuture> in channel.eventLoop.makeCompletedFuture { // We are using two simple handlers here to frame our messages with "\n" try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) @@ -66,21 +65,23 @@ struct Client { ) ) } + } handleChannel: { channel in + print("Connection(\(number)): Writing request") + do { + try await channel.outbound.write("Hello on connection \(number)") + + for try await inboundData in channel.inbound { + print("Connection(\(number)): Received response (\(inboundData))") + + // We only expect a single response so we can exit here. + // Once, we exit out of this loop and the references to the `NIOAsyncChannel` are dropped + // the connection is going to close itself. + break + } + } catch { + print("Unexpected error occured: \(error)") + } } - - try await channel.executeThenClose { inbound, outbound in - print("Connection(\(number)): Writing request") - try await outbound.write("Hello on connection \(number)") - - for try await inboundData in inbound { - print("Connection(\(number)): Received response (\(inboundData))") - - // We only expect a single response so we can exit here. - // Once, we exit out of this loop and the references to the `NIOAsyncChannel` are dropped - // the connection is going to close itself. - break - } - } } } diff --git a/Sources/NIOTCPEchoServer/Server.swift b/Sources/NIOTCPEchoServer/Server.swift index 6f0d98c1b8..2d3ea8da6d 100644 --- a/Sources/NIOTCPEchoServer/Server.swift +++ b/Sources/NIOTCPEchoServer/Server.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import NIOCore -import NIOPosix +@_spi(StructuredConcurrencyNIOAsyncChannel) import NIOPosix @available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) @main @@ -36,14 +36,12 @@ struct Server { /// This method starts the server and handles incoming connections. func run() async throws { - let channel = try await ServerBootstrap(group: self.eventLoopGroup) + try await ServerBootstrap(group: self.eventLoopGroup) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .bind( - host: self.host, - port: self.port + target: .hostAndPort(self.host, self.port) ) { channel in channel.eventLoop.makeCompletedFuture { - // We are using two simple handlers here to frame our messages with "\n" try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder())) @@ -55,25 +53,14 @@ struct Server { ) ) } + } handleChildChannel: { channel in + print("Handling new connection") + await self.handleConnection(channel: channel) + print("Done handling connection") + } handleServerChannel: { serverChannel in + // you can access the server channel here. You must not use call + // `inbound` or `outbound` on it. } - - // We are handling each incoming connection in a separate child task. It is important - // to use a discarding task group here which automatically discards finished child tasks. - // A normal task group retains all child tasks and their outputs in memory until they are - // consumed by iterating the group or by exiting the group. Since, we are never consuming - // the results of the group we need the group to automatically discard them; otherwise, this - // would result in a memory leak over time. - try await withThrowingDiscardingTaskGroup { group in - try await channel.executeThenClose { inbound in - for try await connectionChannel in inbound { - group.addTask { - print("Handling new connection") - await self.handleConnection(channel: connectionChannel) - print("Done handling connection") - } - } - } - } } /// This method handles a single connection by echoing back all inbound data. @@ -82,11 +69,9 @@ struct Server { // We do this since we don't want to tear down the whole server when a single connection // encounters an error. do { - try await channel.executeThenClose { inbound, outbound in - for try await inboundData in inbound { - print("Received request (\(inboundData))") - try await outbound.write(inboundData) - } + for try await inboundData in channel.inbound { + print("Received request (\(inboundData))") + try await channel.outbound.write(inboundData) } } catch { print("Hit error: \(error)")