diff --git a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift index 6bc8ff3c..c0a112bf 100644 --- a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift @@ -37,10 +37,11 @@ struct ServerRPCExecutor: Sendable { deserializer: some MessageDeserializer, serializer: some MessageSerializer, interceptors: [any ServerInterceptor], - handler: @Sendable @escaping ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @Sendable @escaping ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async { // Wait for the first request part from the transport. let firstPart = await Self._waitForFirstRequestPart(inbound: stream.inbound) @@ -75,10 +76,11 @@ struct ServerRPCExecutor: Sendable { deserializer: some MessageDeserializer, serializer: some MessageSerializer, interceptors: [any ServerInterceptor], - handler: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async { if let timeout = metadata.timeout { await Self._processRPCWithTimeout( @@ -116,10 +118,11 @@ struct ServerRPCExecutor: Sendable { deserializer: some MessageDeserializer, serializer: some MessageSerializer, interceptors: [any ServerInterceptor], - handler: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async { await withTaskGroup(of: Void.self) { group in group.addTask { @@ -156,10 +159,11 @@ struct ServerRPCExecutor: Sendable { deserializer: some MessageDeserializer, serializer: some MessageSerializer, interceptors: [any ServerInterceptor], - handler: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async { let messages = UncheckedAsyncIteratorSequence(inbound.wrappedValue).map { part in switch part { @@ -294,10 +298,11 @@ extension ServerRPCExecutor { request: StreamingServerRequest, context: ServerContext, interceptors: [any ServerInterceptor], - finally: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + finally: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse { return try await self._intercept( request: request, @@ -312,10 +317,11 @@ extension ServerRPCExecutor { request: StreamingServerRequest, context: ServerContext, iterator: Array.Iterator, - finally: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + finally: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse { var iterator = iterator diff --git a/Sources/GRPCCore/Call/Server/RPCRouter.swift b/Sources/GRPCCore/Call/Server/RPCRouter.swift index 88c89c63..bf0b8d2c 100644 --- a/Sources/GRPCCore/Call/Server/RPCRouter.swift +++ b/Sources/GRPCCore/Call/Server/RPCRouter.swift @@ -54,10 +54,11 @@ public struct RPCRouter: Sendable { method: MethodDescriptor, deserializer: some MessageDeserializer, serializer: some MessageSerializer, - handler: @Sendable @escaping ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @Sendable @escaping ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) { self._fn = { stream, context, interceptors in await ServerRPCExecutor.execute( @@ -125,10 +126,11 @@ public struct RPCRouter: Sendable { forMethod descriptor: MethodDescriptor, deserializer: some MessageDeserializer, serializer: some MessageSerializer, - handler: @Sendable @escaping ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + handler: + @Sendable @escaping ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) { let handler = RPCHandler( method: descriptor, diff --git a/Sources/GRPCCore/Call/Server/ServerInterceptor.swift b/Sources/GRPCCore/Call/Server/ServerInterceptor.swift index 8192fc21..bb29af7f 100644 --- a/Sources/GRPCCore/Call/Server/ServerInterceptor.swift +++ b/Sources/GRPCCore/Call/Server/ServerInterceptor.swift @@ -74,9 +74,10 @@ public protocol ServerInterceptor: Sendable { func intercept( request: StreamingServerRequest, context: ServerContext, - next: @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + next: + @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse } diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index a41c8b26..2315f4dd 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -261,9 +261,10 @@ public final class GRPCClient: Sendable { serializer: some MessageSerializer, deserializer: some MessageDeserializer, options: CallOptions, - onResponse handleResponse: @Sendable @escaping ( - _ response: ClientResponse - ) async throws -> ReturnValue + onResponse handleResponse: + @Sendable @escaping ( + _ response: ClientResponse + ) async throws -> ReturnValue ) async throws -> ReturnValue { try await self.bidirectionalStreaming( request: StreamingClientRequest(single: request), @@ -294,9 +295,10 @@ public final class GRPCClient: Sendable { serializer: some MessageSerializer, deserializer: some MessageDeserializer, options: CallOptions, - onResponse handleResponse: @Sendable @escaping ( - _ response: ClientResponse - ) async throws -> ReturnValue + onResponse handleResponse: + @Sendable @escaping ( + _ response: ClientResponse + ) async throws -> ReturnValue ) async throws -> ReturnValue { try await self.bidirectionalStreaming( request: request, @@ -327,9 +329,10 @@ public final class GRPCClient: Sendable { serializer: some MessageSerializer, deserializer: some MessageDeserializer, options: CallOptions, - onResponse handleResponse: @Sendable @escaping ( - _ response: StreamingClientResponse - ) async throws -> ReturnValue + onResponse handleResponse: + @Sendable @escaping ( + _ response: StreamingClientResponse + ) async throws -> ReturnValue ) async throws -> ReturnValue { try await self.bidirectionalStreaming( request: StreamingClientRequest(single: request), @@ -361,9 +364,10 @@ public final class GRPCClient: Sendable { serializer: some MessageSerializer, deserializer: some MessageDeserializer, options: CallOptions, - onResponse handleResponse: @Sendable @escaping ( - _ response: StreamingClientResponse - ) async throws -> ReturnValue + onResponse handleResponse: + @Sendable @escaping ( + _ response: StreamingClientResponse + ) async throws -> ReturnValue ) async throws -> ReturnValue { let applicableInterceptors = try self.stateMachine.withLock { try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor) diff --git a/Sources/GRPCCore/Transport/ServerTransport.swift b/Sources/GRPCCore/Transport/ServerTransport.swift index d3891147..a01f6495 100644 --- a/Sources/GRPCCore/Transport/ServerTransport.swift +++ b/Sources/GRPCCore/Transport/ServerTransport.swift @@ -41,10 +41,11 @@ public protocol ServerTransport: Sendable { /// period after which any open streams may be cancelled. You can also cancel the task running /// ``listen(streamHandler:)`` to abruptly close connections and streams. func listen( - streamHandler: @escaping @Sendable ( - _ stream: RPCStream, - _ context: ServerContext - ) async -> Void + streamHandler: + @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void ) async throws /// Indicates to the transport that no new streams should be accepted. diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift index cc01e204..fcdebb23 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift @@ -103,10 +103,11 @@ extension InProcessTransport { } public func listen( - streamHandler: @escaping @Sendable ( - _ stream: RPCStream, - _ context: ServerContext - ) async -> Void + streamHandler: + @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void ) async throws { await withDiscardingTaskGroup { group in for await stream in self.newStreams { diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift index 11d661dd..9523ac1d 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift @@ -30,12 +30,13 @@ extension ClientRPCExecutorTestHarness { ) async throws -> Void init( - _ handler: @escaping @Sendable ( - RPCStream< - RPCAsyncSequence, any Error>, - RPCWriter>.Closable - > - ) async throws -> Void + _ handler: + @escaping @Sendable ( + RPCStream< + RPCAsyncSequence, any Error>, + RPCWriter>.Closable + > + ) async throws -> Void ) { self.handler = handler } diff --git a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift index f0f156c2..c942f067 100644 --- a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift @@ -27,10 +27,11 @@ struct ServerRPCExecutorTestHarness { ) async throws -> StreamingServerResponse init( - _ fn: @escaping @Sendable ( - _ request: StreamingServerRequest, - _ context: ServerContext - ) async throws -> StreamingServerResponse + _ fn: + @escaping @Sendable ( + _ request: StreamingServerRequest, + _ context: ServerContext + ) async throws -> StreamingServerResponse ) { self.fn = fn } @@ -57,16 +58,19 @@ struct ServerRPCExecutorTestHarness { bytes: Bytes.Type = Bytes.self, deserializer: some MessageDeserializer, serializer: some MessageSerializer, - handler: @escaping @Sendable ( - StreamingServerRequest, - ServerContext - ) async throws -> StreamingServerResponse, - producer: @escaping @Sendable ( - RPCWriter>.Closable - ) async throws -> Void, - consumer: @escaping @Sendable ( - RPCAsyncSequence, any Error> - ) async throws -> Void + handler: + @escaping @Sendable ( + StreamingServerRequest, + ServerContext + ) async throws -> StreamingServerResponse, + producer: + @escaping @Sendable ( + RPCWriter>.Closable + ) async throws -> Void, + consumer: + @escaping @Sendable ( + RPCAsyncSequence, any Error> + ) async throws -> Void ) async throws { try await self.execute( deserializer: deserializer, @@ -81,12 +85,14 @@ struct ServerRPCExecutorTestHarness { deserializer: some MessageDeserializer, serializer: some MessageSerializer, handler: ServerHandler, - producer: @escaping @Sendable ( - RPCWriter>.Closable - ) async throws -> Void, - consumer: @escaping @Sendable ( - RPCAsyncSequence, any Error> - ) async throws -> Void + producer: + @escaping @Sendable ( + RPCWriter>.Closable + ) async throws -> Void, + consumer: + @escaping @Sendable ( + RPCAsyncSequence, any Error> + ) async throws -> Void ) async throws { let input = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) let output = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) @@ -132,12 +138,14 @@ struct ServerRPCExecutorTestHarness { func execute( handler: ServerHandler<[UInt8], [UInt8]> = .echo, - producer: @escaping @Sendable ( - RPCWriter>.Closable - ) async throws -> Void, - consumer: @escaping @Sendable ( - RPCAsyncSequence, any Error> - ) async throws -> Void + producer: + @escaping @Sendable ( + RPCWriter>.Closable + ) async throws -> Void, + consumer: + @escaping @Sendable ( + RPCAsyncSequence, any Error> + ) async throws -> Void ) async throws { try await self.execute( deserializer: IdentityDeserializer(), diff --git a/Tests/GRPCCoreTests/Call/Server/RPCRouterTests.swift b/Tests/GRPCCoreTests/Call/Server/RPCRouterTests.swift index 5ec97ddb..c20d18f4 100644 --- a/Tests/GRPCCoreTests/Call/Server/RPCRouterTests.swift +++ b/Tests/GRPCCoreTests/Call/Server/RPCRouterTests.swift @@ -70,10 +70,11 @@ struct NoServerTransport: ServerTransport { typealias Bytes = [UInt8] func listen( - streamHandler: @escaping @Sendable ( - GRPCCore.RPCStream, - GRPCCore.ServerContext - ) async -> Void + streamHandler: + @escaping @Sendable ( + GRPCCore.RPCStream, + GRPCCore.ServerContext + ) async -> Void ) async throws { } diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift index 1372c6d8..6388bb44 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift @@ -83,10 +83,11 @@ struct RejectAllServerInterceptor: ServerInterceptor { func intercept( request: StreamingServerRequest, context: ServerContext, - next: @Sendable ( - StreamingServerRequest, - ServerContext - ) async throws -> StreamingServerResponse + next: + @Sendable ( + StreamingServerRequest, + ServerContext + ) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse { switch self.mode { case .throw(let error): @@ -139,10 +140,11 @@ struct RequestCountingServerInterceptor: ServerInterceptor { func intercept( request: StreamingServerRequest, context: ServerContext, - next: @Sendable ( - StreamingServerRequest, - ServerContext - ) async throws -> StreamingServerResponse + next: + @Sendable ( + StreamingServerRequest, + ServerContext + ) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse { self.counter.increment() return try await next(request, context) diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index 2e97f8f0..a542cc03 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -98,10 +98,11 @@ struct AnyServerTransport: ServerTransport, Sendable { } func listen( - streamHandler: @escaping @Sendable ( - _ stream: RPCStream, - _ context: ServerContext - ) async -> Void + streamHandler: + @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void ) async throws { try await self._listen(streamHandler) } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 5b1ef428..ab889910 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -93,10 +93,11 @@ struct StreamCountingServerTransport: ServerTransport, Sendable { } func listen( - streamHandler: @escaping @Sendable ( - _ stream: RPCStream, - _ context: ServerContext - ) async -> Void + streamHandler: + @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void ) async throws { try await self.transport.listen { stream, context in self._acceptedStreams.increment()