Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ struct ServerRPCExecutor: Sendable {
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async {
// Wait for the first request part from the transport.
let firstPart = await Self._waitForFirstRequestPart(inbound: stream.inbound)
Expand Down Expand Up @@ -75,10 +76,11 @@ struct ServerRPCExecutor: Sendable {
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async {
if let timeout = metadata.timeout {
await Self._processRPCWithTimeout(
Expand Down Expand Up @@ -116,10 +118,11 @@ struct ServerRPCExecutor: Sendable {
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async {
await withTaskGroup(of: Void.self) { group in
group.addTask {
Expand Down Expand Up @@ -156,10 +159,11 @@ struct ServerRPCExecutor: Sendable {
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async {
let messages = UncheckedAsyncIteratorSequence(inbound.wrappedValue).map { part in
switch part {
Expand Down Expand Up @@ -294,10 +298,11 @@ extension ServerRPCExecutor {
request: StreamingServerRequest<Input>,
context: ServerContext,
interceptors: [any ServerInterceptor],
finally: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
finally:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async throws -> StreamingServerResponse<Output> {
return try await self._intercept(
request: request,
Expand All @@ -312,10 +317,11 @@ extension ServerRPCExecutor {
request: StreamingServerRequest<Input>,
context: ServerContext,
iterator: Array<any ServerInterceptor>.Iterator,
finally: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
finally:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async throws -> StreamingServerResponse<Output> {
var iterator = iterator

Expand Down
18 changes: 10 additions & 8 deletions Sources/GRPCCore/Call/Server/RPCRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ public struct RPCRouter<Transport: ServerTransport>: Sendable {
method: MethodDescriptor,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
handler: @Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) {
self._fn = { stream, context, interceptors in
await ServerRPCExecutor.execute(
Expand Down Expand Up @@ -125,10 +126,11 @@ public struct RPCRouter<Transport: ServerTransport>: Sendable {
forMethod descriptor: MethodDescriptor,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
handler: @Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
handler:
@Sendable @escaping (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) {
let handler = RPCHandler(
method: descriptor,
Expand Down
9 changes: 5 additions & 4 deletions Sources/GRPCCore/Call/Server/ServerInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public protocol ServerInterceptor: Sendable {
func intercept<Input: Sendable, Output: Sendable>(
request: StreamingServerRequest<Input>,
context: ServerContext,
next: @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
next:
@Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) async throws -> StreamingServerResponse<Output>
}
28 changes: 16 additions & 12 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ public final class GRPCClient<Transport: ClientTransport>: Sendable {
serializer: some MessageSerializer<Request>,
deserializer: some MessageDeserializer<Response>,
options: CallOptions,
onResponse handleResponse: @Sendable @escaping (
_ response: ClientResponse<Response>
) async throws -> ReturnValue
onResponse handleResponse:
@Sendable @escaping (
_ response: ClientResponse<Response>
) async throws -> ReturnValue
) async throws -> ReturnValue {
try await self.bidirectionalStreaming(
request: StreamingClientRequest(single: request),
Expand Down Expand Up @@ -294,9 +295,10 @@ public final class GRPCClient<Transport: ClientTransport>: Sendable {
serializer: some MessageSerializer<Request>,
deserializer: some MessageDeserializer<Response>,
options: CallOptions,
onResponse handleResponse: @Sendable @escaping (
_ response: ClientResponse<Response>
) async throws -> ReturnValue
onResponse handleResponse:
@Sendable @escaping (
_ response: ClientResponse<Response>
) async throws -> ReturnValue
) async throws -> ReturnValue {
try await self.bidirectionalStreaming(
request: request,
Expand Down Expand Up @@ -327,9 +329,10 @@ public final class GRPCClient<Transport: ClientTransport>: Sendable {
serializer: some MessageSerializer<Request>,
deserializer: some MessageDeserializer<Response>,
options: CallOptions,
onResponse handleResponse: @Sendable @escaping (
_ response: StreamingClientResponse<Response>
) async throws -> ReturnValue
onResponse handleResponse:
@Sendable @escaping (
_ response: StreamingClientResponse<Response>
) async throws -> ReturnValue
) async throws -> ReturnValue {
try await self.bidirectionalStreaming(
request: StreamingClientRequest(single: request),
Expand Down Expand Up @@ -361,9 +364,10 @@ public final class GRPCClient<Transport: ClientTransport>: Sendable {
serializer: some MessageSerializer<Request>,
deserializer: some MessageDeserializer<Response>,
options: CallOptions,
onResponse handleResponse: @Sendable @escaping (
_ response: StreamingClientResponse<Response>
) async throws -> ReturnValue
onResponse handleResponse:
@Sendable @escaping (
_ response: StreamingClientResponse<Response>
) async throws -> ReturnValue
) async throws -> ReturnValue {
let applicableInterceptors = try self.stateMachine.withLock {
try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
Expand Down
9 changes: 5 additions & 4 deletions Sources/GRPCCore/Transport/ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public protocol ServerTransport<Bytes>: Sendable {
/// period after which any open streams may be cancelled. You can also cancel the task running
/// ``listen(streamHandler:)`` to abruptly close connections and streams.
func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws

/// Indicates to the transport that no new streams should be accepted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ extension InProcessTransport {
}

public func listen(
streamHandler: @escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
_ stream: RPCStream<Inbound, Outbound>,
_ context: ServerContext
) async -> Void
) async throws {
await withDiscardingTaskGroup { group in
for await stream in self.newStreams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ extension ClientRPCExecutorTestHarness {
) async throws -> Void

init(
_ handler: @escaping @Sendable (
RPCStream<
RPCAsyncSequence<RPCRequestPart<[UInt8]>, any Error>,
RPCWriter<RPCResponsePart<[UInt8]>>.Closable
>
) async throws -> Void
_ handler:
@escaping @Sendable (
RPCStream<
RPCAsyncSequence<RPCRequestPart<[UInt8]>, any Error>,
RPCWriter<RPCResponsePart<[UInt8]>>.Closable
>
) async throws -> Void
) {
self.handler = handler
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ struct ServerRPCExecutorTestHarness {
) async throws -> StreamingServerResponse<Output>

init(
_ fn: @escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
_ fn:
@escaping @Sendable (
_ request: StreamingServerRequest<Input>,
_ context: ServerContext
) async throws -> StreamingServerResponse<Output>
) {
self.fn = fn
}
Expand All @@ -57,16 +58,19 @@ struct ServerRPCExecutorTestHarness {
bytes: Bytes.Type = Bytes.self,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
handler: @escaping @Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>,
producer: @escaping @Sendable (
RPCWriter<RPCRequestPart<Bytes>>.Closable
) async throws -> Void,
consumer: @escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async throws -> Void
handler:
@escaping @Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>,
producer:
@escaping @Sendable (
RPCWriter<RPCRequestPart<Bytes>>.Closable
) async throws -> Void,
consumer:
@escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async throws -> Void
) async throws {
try await self.execute(
deserializer: deserializer,
Expand All @@ -81,12 +85,14 @@ struct ServerRPCExecutorTestHarness {
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
handler: ServerHandler<Input, Output>,
producer: @escaping @Sendable (
RPCWriter<RPCRequestPart<Bytes>>.Closable
) async throws -> Void,
consumer: @escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async throws -> Void
producer:
@escaping @Sendable (
RPCWriter<RPCRequestPart<Bytes>>.Closable
) async throws -> Void,
consumer:
@escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async throws -> Void
) async throws {
let input = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart<Bytes>.self)
let output = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart<Bytes>.self)
Expand Down Expand Up @@ -132,12 +138,14 @@ struct ServerRPCExecutorTestHarness {

func execute(
handler: ServerHandler<[UInt8], [UInt8]> = .echo,
producer: @escaping @Sendable (
RPCWriter<RPCRequestPart<[UInt8]>>.Closable
) async throws -> Void,
consumer: @escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<[UInt8]>, any Error>
) async throws -> Void
producer:
@escaping @Sendable (
RPCWriter<RPCRequestPart<[UInt8]>>.Closable
) async throws -> Void,
consumer:
@escaping @Sendable (
RPCAsyncSequence<RPCResponsePart<[UInt8]>, any Error>
) async throws -> Void
) async throws {
try await self.execute(
deserializer: IdentityDeserializer(),
Expand Down
9 changes: 5 additions & 4 deletions Tests/GRPCCoreTests/Call/Server/RPCRouterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ struct NoServerTransport: ServerTransport {
typealias Bytes = [UInt8]

func listen(
streamHandler: @escaping @Sendable (
GRPCCore.RPCStream<Inbound, Outbound>,
GRPCCore.ServerContext
) async -> Void
streamHandler:
@escaping @Sendable (
GRPCCore.RPCStream<Inbound, Outbound>,
GRPCCore.ServerContext
) async -> Void
) async throws {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ struct RejectAllServerInterceptor: ServerInterceptor {
func intercept<Input: Sendable, Output: Sendable>(
request: StreamingServerRequest<Input>,
context: ServerContext,
next: @Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>
next:
@Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>
) async throws -> StreamingServerResponse<Output> {
switch self.mode {
case .throw(let error):
Expand Down Expand Up @@ -139,10 +140,11 @@ struct RequestCountingServerInterceptor: ServerInterceptor {
func intercept<Input: Sendable, Output: Sendable>(
request: StreamingServerRequest<Input>,
context: ServerContext,
next: @Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>
next:
@Sendable (
StreamingServerRequest<Input>,
ServerContext
) async throws -> StreamingServerResponse<Output>
) async throws -> StreamingServerResponse<Output> {
self.counter.increment()
return try await next(request, context)
Expand Down
Loading
Loading