Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import Dispatch
import NIOCore
import NIOFileSystem
import _NIOFileSystem

#if canImport(Darwin)
import Darwin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ package final class CommonHTTP2ServerTransport<
}

package func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
defer {
switch self.listeningAddressState.withLock({ $0.close() }) {
Expand Down Expand Up @@ -205,10 +206,11 @@ package final class CommonHTTP2ServerTransport<
private func handleConnection(
_ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
let remotePeer = connection.channel.remoteAddressInfo
let localPeer = connection.channel.localAddressInfo
Expand Down Expand Up @@ -246,10 +248,11 @@ package final class CommonHTTP2ServerTransport<
private func handleStream(
_ stream: NIOAsyncChannel<RPCRequestPart<Bytes>, RPCResponsePart<Bytes>>,
_ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
handler streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void,
handler streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void,
descriptor: EventLoopFuture<MethodDescriptor>,
remotePeer: String,
localPeer: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ extension HTTP2ServerTransport {
}

public func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
try await self.underlyingTransport.listen(streamHandler: streamHandler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ extension HTTP2ServerTransport {
}

public func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
try await self.underlyingTransport.listen(streamHandler: streamHandler)
}
Expand Down
14 changes: 8 additions & 6 deletions Tests/GRPCNIOTransportHTTP2Tests/ControlClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ internal struct ControlClient<Transport> where Transport: ClientTransport {
internal func waitForCancellation<R>(
request: GRPCCore.ClientRequest<CancellationKind>,
options: GRPCCore.CallOptions = .defaults,
_ body: @Sendable @escaping (
_ response: GRPCCore.StreamingClientResponse<CancellationKind>
) async throws -> R
_ body:
@Sendable @escaping (
_ response: GRPCCore.StreamingClientResponse<CancellationKind>
) async throws -> R
) async throws -> R where R: Sendable {
try await self.client.serverStreaming(
request: request,
Expand All @@ -109,9 +110,10 @@ internal struct ControlClient<Transport> where Transport: ClientTransport {

internal func peerInfo<R>(
options: GRPCCore.CallOptions = .defaults,
_ body: @Sendable @escaping (
_ response: GRPCCore.ClientResponse<ControlService.PeerInfoResponse>
) async throws -> R = { try $0.message }
_ body:
@Sendable @escaping (
_ response: GRPCCore.ClientResponse<ControlService.PeerInfoResponse>
) async throws -> R = { try $0.message }
) async throws -> R where R: Sendable {
try await self.client.unary(
request: ClientRequest(message: ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ struct HelloWorldService: HelloWorld.SimpleServiceProtocol {
) async throws -> HelloResponse

init(
implementation: @Sendable @escaping (
_: HelloRequest,
_: ServerContext
) async throws -> HelloResponse
implementation:
@Sendable @escaping (
_: HelloRequest,
_: ServerContext
) async throws -> HelloResponse
) {
self.implementation = implementation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ enum NIOServerTransport: ServerTransport, ListeningServerTransport {
}

func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
switch self {
case .posix(let transport):
Expand Down
Loading