Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 157 additions & 68 deletions Sources/GRPC/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,7 @@ public final class Server: @unchecked Sendable {
}

#if canImport(NIOSSL)
// Making a `NIOSSLContext` is expensive, we should only do it once per TLS configuration so
// we'll do it now, before accepting connections. Unfortunately our API isn't throwing so we'll
// only surface any error when initializing a child channel.
//
// 'nil' means we're not using TLS, or we're using the Network.framework TLS backend. If we're
// using the Network.framework TLS backend we'll apply the settings just below.
let sslContext: Result<NIOSSLContext, Error>?

if let tlsConfiguration = configuration.tlsConfiguration {
do {
sslContext = try tlsConfiguration.makeNIOSSLContext().map { .success($0) }
} catch {
sslContext = .failure(error)
}

} else {
// No TLS configuration, no SSL context.
sslContext = nil
}
let sslContext = Self.makeNIOSSLContext(configuration: configuration)
#endif // canImport(NIOSSL)

#if canImport(Network)
Expand Down Expand Up @@ -152,53 +134,10 @@ public final class Server: @unchecked Sendable {
)
// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
var configuration = configuration
configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
configuration.logger.addIPAddressMetadata(
local: channel.localAddress,
remote: channel.remoteAddress
)

do {
let sync = channel.pipeline.syncOperations
Self.configureAcceptedChannel(channel, configuration: configuration) { sync in
#if canImport(NIOSSL)
if let sslContext = try sslContext?.get() {
let sslHandler: NIOSSLServerHandler
if let verify = configuration.tlsConfiguration?.nioSSLCustomVerificationCallback {
sslHandler = NIOSSLServerHandler(
context: sslContext,
customVerificationCallback: verify
)
} else {
sslHandler = NIOSSLServerHandler(context: sslContext)
}

try sync.addHandler(sslHandler)
}
try Self.addNIOSSLHandler(sslContext, configuration: configuration, sync: sync)
#endif // canImport(NIOSSL)

// Configures the pipeline based on whether the connection uses TLS or not.
try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))

// Work around the zero length write issue, if needed.
let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
group: configuration.eventLoopGroup,
hasTLS: configuration.tlsConfiguration != nil
)
if requiresZeroLengthWorkaround,
#available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
{
try sync.addHandler(NIOFilterEmptyWritesHandler())
}
} catch {
return channel.eventLoop.makeFailedFuture(error)
}

// Run the debug initializer, if there is one.
if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
return debugAcceptedChannelInitializer(channel)
} else {
return channel.eventLoop.makeSucceededVoidFuture()
}
}

Expand All @@ -210,11 +149,108 @@ public final class Server: @unchecked Sendable {
)
}

#if canImport(NIOSSL)
private static func makeNIOSSLContext(
configuration: Configuration
) -> Result<NIOSSLContext, Error>? {
// Making a `NIOSSLContext` is expensive, we should only do it once per TLS configuration so
// we'll do it now, before accepting connections. Unfortunately our API isn't throwing so we'll
// only surface any error when initializing a child channel.
//
// 'nil' means we're not using TLS, or we're using the Network.framework TLS backend. If we're
// using the Network.framework TLS backend we'll apply the settings just below.
let sslContext: Result<NIOSSLContext, Error>?

if let tlsConfiguration = configuration.tlsConfiguration {
do {
sslContext = try tlsConfiguration.makeNIOSSLContext().map { .success($0) }
} catch {
sslContext = .failure(error)
}

} else {
// No TLS configuration, no SSL context.
sslContext = nil
}

return sslContext
}

private static func addNIOSSLHandler(
_ sslContext: Result<NIOSSLContext, Error>?,
configuration: Configuration,
sync: ChannelPipeline.SynchronousOperations
) throws {
if let sslContext = try sslContext?.get() {
let sslHandler: NIOSSLServerHandler
if let verify = configuration.tlsConfiguration?.nioSSLCustomVerificationCallback {
sslHandler = NIOSSLServerHandler(
context: sslContext,
customVerificationCallback: verify
)
} else {
sslHandler = NIOSSLServerHandler(context: sslContext)
}

try sync.addHandler(sslHandler)
}
}
#endif // canImport(NIOSSL)

private static func configureAcceptedChannel(
_ channel: Channel,
configuration: Configuration,
addNIOSSLIfNecessary: (ChannelPipeline.SynchronousOperations) throws -> Void
) -> EventLoopFuture<Void> {
var configuration = configuration
configuration.logger[metadataKey: MetadataKey.connectionID] = "\(UUID().uuidString)"
configuration.logger.addIPAddressMetadata(
local: channel.localAddress,
remote: channel.remoteAddress
)

do {
let sync = channel.pipeline.syncOperations
try addNIOSSLIfNecessary(sync)

// Configures the pipeline based on whether the connection uses TLS or not.
try sync.addHandler(GRPCServerPipelineConfigurator(configuration: configuration))

// Work around the zero length write issue, if needed.
let requiresZeroLengthWorkaround = PlatformSupport.requiresZeroLengthWriteWorkaround(
group: configuration.eventLoopGroup,
hasTLS: configuration.tlsConfiguration != nil
)
if requiresZeroLengthWorkaround,
#available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *)
{
try sync.addHandler(NIOFilterEmptyWritesHandler())
}
} catch {
return channel.eventLoop.makeFailedFuture(error)
}

// Run the debug initializer, if there is one.
if let debugAcceptedChannelInitializer = configuration.debugChannelInitializer {
return debugAcceptedChannelInitializer(channel)
} else {
return channel.eventLoop.makeSucceededVoidFuture()
}
}

/// Starts a server with the given configuration. See `Server.Configuration` for the options
/// available to configure the server.
public static func start(configuration: Configuration) -> EventLoopFuture<Server> {
let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
switch configuration.target.wrapped {
case .connectedSocket(let handle) where configuration.connectedSocketTargetIsAcceptedConnection:
return Self.startServerFromAcceptedConnection(handle: handle, configuration: configuration)
case .connectedSocket, .hostAndPort, .unixDomainSocket, .socketAddress, .vsockAddress:
return Self.startServer(configuration: configuration)
}
}

private static func startServer(configuration: Configuration) -> EventLoopFuture<Server> {
let quiescingHelper = ServerQuiescingHelper(group: configuration.eventLoopGroup)
return self.makeBootstrap(configuration: configuration)
.serverChannelInitializer { channel in
channel.pipeline.addHandler(quiescingHelper.makeServerChannelHandler(channel: channel))
Expand All @@ -229,13 +265,53 @@ public final class Server: @unchecked Sendable {
}
}

private static func startServerFromAcceptedConnection(
handle: NIOBSDSocket.Handle,
configuration: Configuration
) -> EventLoopFuture<Server> {
guard let bootstrap = ClientBootstrap(validatingGroup: configuration.eventLoopGroup) else {
let status = GRPCStatus(
code: .unimplemented,
message: """
You must use a NIOPosix EventLoopGroup to create a server from an already accepted \
socket.
"""
)
return configuration.eventLoopGroup.any().makeFailedFuture(status)
}

#if canImport(NIOSSL)
let sslContext = Self.makeNIOSSLContext(configuration: configuration)
#endif // canImport(NIOSSL)

return bootstrap.channelInitializer { channel in
Self.configureAcceptedChannel(channel, configuration: configuration) { sync in
#if canImport(NIOSSL)
try Self.addNIOSSLHandler(sslContext, configuration: configuration, sync: sync)
#endif // canImport(NIOSSL)
}
}.withConnectedSocket(handle).map { channel in
Server(
channel: channel,
quiescingHelper: nil,
errorDelegate: configuration.errorDelegate
)
}
}

/// The listening server channel.
///
/// If the server was created from an already accepted connection then this channel will
/// be for the accepted connection.
public let channel: Channel
private let quiescingHelper: ServerQuiescingHelper

/// Quiescing helper. `nil` if `channel` is for an accepted connection.
private let quiescingHelper: ServerQuiescingHelper?
private var errorDelegate: ServerErrorDelegate?

private init(
channel: Channel,
quiescingHelper: ServerQuiescingHelper,
quiescingHelper: ServerQuiescingHelper?,
errorDelegate: ServerErrorDelegate?
) {
self.channel = channel
Expand Down Expand Up @@ -264,7 +340,13 @@ public final class Server: @unchecked Sendable {
/// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
/// connections will be rejected.
public func initiateGracefulShutdown(promise: EventLoopPromise<Void>?) {
self.quiescingHelper.initiateShutdown(promise: promise)
if let quiescingHelper = self.quiescingHelper {
quiescingHelper.initiateShutdown(promise: promise)
} else {
// No quiescing helper: the channel must be for an already accepted connection.
self.channel.closeFuture.cascade(to: promise)
self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
}
}

/// Initiates a graceful shutdown. Existing RPCs may run to completion, any new RPCs or
Expand Down Expand Up @@ -436,6 +518,13 @@ extension Server {
/// CORS configuration for gRPC-Web support.
public var webCORS = Configuration.CORS()

/// Indicates whether a `connectedSocket` ``target`` is treated as an accepted connection.
///
/// If ``target`` is a `connectedSocket` then this flag indicates whether that socket is for
/// an already accepted connection. If the value is `false` then the socket is treated as a
/// listener. This value is ignored if ``target`` is any value other than `connectedSocket`.
public var connectedSocketTargetIsAcceptedConnection: Bool = false

#if canImport(NIOSSL)
/// Create a `Configuration` with some pre-defined defaults.
///
Expand Down
18 changes: 18 additions & 0 deletions Sources/GRPC/ServerBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ extension Server {
self.configuration.tlsConfiguration = self.maybeTLS
return Server.start(configuration: self.configuration)
}

/// Create a gRPC server from the file descriptor of an already accepted TCP connection.
///
/// - Parameter handle: The handle to the accepted socket.
/// - Important: This is only supported with `NIOPosix` (i.e. when using a
/// `MultiThreadedEventLoopGroup` or one of its loops and TLS configured via `NIOSSL`).
/// - Warning: By calling this function you hand responsibility of the socket to gRPC.
/// Crucially you must **not** close the socket directly after calling this function, gRPC
/// will do it for you.
/// - Returns: A configured gRPC server.
public func fromAcceptedConnection(
takingOwnershipOf handle: NIOBSDSocket.Handle
) -> EventLoopFuture<Server> {
self.configuration.target = .connectedSocket(handle)
self.configuration.connectedSocketTargetIsAcceptedConnection = true
self.configuration.tlsConfiguration = self.maybeTLS
return Server.start(configuration: self.configuration)
}
}
}

Expand Down
Loading