Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Sources/NIOCore/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ extension ChannelCore {
data.forceAs()
}

/// Unwraps the given `NIOAny` as a specific concrete type.
///
/// This method is intended for use when writing custom `ChannelCore` implementations.
/// This can safely be called in methods like `write0` to extract data from the `NIOAny`
/// provided in those cases.
///
/// Note that if the unwrap fails, this will cause a runtime trap. `ChannelCore`
/// implementations should be concrete about what types they support writing. If multiple
/// types are supported, consider using a tagged union to store the type information like
/// NIO's own `IOData`, which will minimise the amount of runtime type checking.
///
/// - Parameters:
/// - data: The `NIOAny` to unwrap.
/// - as: The type to extract from the `NIOAny`.
/// - Returns: The content of the `NIOAny`.
@inlinable
public static func unwrapData<T>(_ data: NIOAny, as: T.Type = T.self) -> T {
data.forceAs()
}

/// Attempts to unwrap the given `NIOAny` as a specific concrete type.
///
/// This method is intended for use when writing custom `ChannelCore` implementations.
Expand All @@ -326,6 +346,28 @@ extension ChannelCore {
data.tryAs()
}

/// Attempts to unwrap the given `NIOAny` as a specific concrete type.
///
/// This method is intended for use when writing custom `ChannelCore` implementations.
/// This can safely be called in methods like `write0` to extract data from the `NIOAny`
/// provided in those cases.
///
/// If the unwrap fails, this will return `nil`. `ChannelCore` implementations should almost
/// always support only one runtime type, so in general they should avoid using this and prefer
/// using `unwrapData` instead. This method exists for rare use-cases where tolerating type
/// mismatches is acceptable.
///
/// - Parameters:
/// - data: The `NIOAny` to unwrap.
/// - as: The type to extract from the `NIOAny`.
/// - Returns: The content of the `NIOAny`, or `nil` if the type is incorrect.
/// - warning: If you are implementing a `ChannelCore`, you should use `unwrapData` unless you
/// are doing something _extremely_ unusual.
@inlinable
public static func tryUnwrapData<T>(_ data: NIOAny, as: T.Type = T.self) -> T? {
data.tryAs()
}

/// Removes the `ChannelHandler`s from the `ChannelPipeline` belonging to `channel`, and
/// closes that `ChannelPipeline`.
///
Expand Down
17 changes: 12 additions & 5 deletions Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private struct SocketChannelLifecycleManager {
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
/// for subclasses to implement the specific logic to handle their writes and reads.
class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore, @unchecked Sendable {
class BaseSocketChannel<SocketType: BaseSocketProtocol, WriteType>: SelectableChannel, ChannelCore, @unchecked Sendable {
typealias SelectableType = SocketType.SelectableType

struct AddressCache {
Expand Down Expand Up @@ -472,7 +472,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
}

/// Buffer a write in preparation for a flush.
func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
func bufferPendingWrite(data: WriteType, promise: EventLoopPromise<Void>?) {
fatalError("this must be overridden by sub class")
}

Expand Down Expand Up @@ -732,6 +732,10 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
}
}

internal func unwrapAsWriteType(_ data: NIOAny) -> WriteType {
return Self.unwrapData(data, as: WriteType.self)
}

public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()

Expand All @@ -741,7 +745,8 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
return
}

bufferPendingWrite(data: data, promise: promise)
let data = self.unwrapAsWriteType(data)
self.bufferPendingWrite(data: data, promise: promise)
}

private func registerForWritable() {
Expand Down Expand Up @@ -1401,12 +1406,14 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
}

extension BaseSocketChannel {
typealias BaseSocketChannelType = BaseSocketChannel

public struct SynchronousOptions: NIOSynchronousChannelOptions {
@usableFromInline // should be private
internal let _channel: BaseSocketChannel<SocketType>
internal let _channel: BaseSocketChannelType

@inlinable // should be fileprivate
internal init(_channel channel: BaseSocketChannel<SocketType>) {
internal init(_channel channel: BaseSocketChannelType) {
self._channel = channel
}

Expand Down
6 changes: 2 additions & 4 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//
import NIOCore

class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>, @unchecked Sendable {
class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket, IOData>, @unchecked Sendable {
internal var connectTimeoutScheduled: Optional<Scheduled<Void>>
private var allowRemoteHalfClosure: Bool = false
private var inputShutdown: Bool = false
Expand Down Expand Up @@ -288,14 +288,12 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
super.read0()
}

final override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
final override func bufferPendingWrite(data: IOData, promise: EventLoopPromise<Void>?) {
if self.outputShutdown {
promise?.fail(ChannelError._outputClosed)
return
}

let data = self.unwrapData(data, as: IOData.self)

if !self.pendingWrites.add(data: data, promise: promise) {
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
Expand Down
36 changes: 25 additions & 11 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ final class SocketChannel: BaseStreamSocketChannel<Socket>, @unchecked Sendable
/// A `Channel` for a server socket.
///
/// - Note: All operations on `ServerSocketChannel` are thread-safe.
final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sendable {
final class ServerSocketChannel: BaseSocketChannel<ServerSocket, Void>, @unchecked Sendable {

private var backlog: Int32 = 128
private let group: EventLoopGroup
Expand Down Expand Up @@ -422,8 +422,8 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen
false
}

override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
promise?.fail(ChannelError._operationUnsupported)
override func bufferPendingWrite(data: Void, promise: EventLoopPromise<Void>?) {
promise?.fail(ChannelError.operationUnsupported)
}

override func markFlushPoint() {
Expand Down Expand Up @@ -459,12 +459,19 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen
promise?.fail(ChannelError._operationUnsupported)
}
}

override func unwrapAsWriteType(_ data: NIOAny) -> () {}
}

enum DatagramWriteType {
case addressed(AddressedEnvelope<ByteBuffer>)
case unaddressed(ByteBuffer)
}

/// A channel used with datagram sockets.
///
/// Currently, it does not support connected mode which is well worth adding.
final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
final class DatagramChannel: BaseSocketChannel<Socket, DatagramWriteType>, @unchecked Sendable {
private var reportExplicitCongestionNotifications = false
private var receivePacketInfo = false

Expand Down Expand Up @@ -894,6 +901,14 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
}
}

override func unwrapAsWriteType(_ data: NIOAny) -> DatagramWriteType {
if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope<ByteBuffer>.self) {
return .addressed(envelope)
} else {
return .unaddressed(Self.unwrapData(data, as: ByteBuffer.self))
}
}

/// Buffer a write in preparation for a flush.
///
/// When the channel is unconnected, `data` _must_ be of type `AddressedEnvelope<ByteBuffer>`.
Expand All @@ -902,14 +917,13 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
/// `AddressedEnvelope<ByteBuffer>` to allow users to provide protocol control messages via
/// `AddressedEnvelope.metadata`. In this case, `AddressedEnvelope.remoteAddress` _must_ match
/// the address of the connected peer.
override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope<ByteBuffer>.self) {
return bufferPendingAddressedWrite(envelope: envelope, promise: promise)
override func bufferPendingWrite(data: DatagramWriteType, promise: EventLoopPromise<Void>?) {
switch data {
case .addressed(let addressedBytes):
return self.bufferPendingAddressedWrite(envelope: addressedBytes, promise: promise)
case .unaddressed(let bytes):
return self.bufferPendingUnaddressedWrite(data: bytes, promise: promise)
}
// If it's not an `AddressedEnvelope` then it must be a `ByteBuffer` so we let the common
// `unwrapData(_:as:)` throw the fatal error if it's some other type.
let data = self.unwrapData(data, as: ByteBuffer.self)
return bufferPendingUnaddressedWrite(data: data, promise: promise)
}

/// Buffer a write in preparation for a flush.
Expand Down
Loading