Skip to content

Commit 93d0536

Browse files
authored
Make transport generic over its bag-of-bytes type (#2155)
Motivation: The transport protocols deal in request/response part types. The bag-of-bytes message type used in `[UInt8]`. This means that transports might have to copy to and from the bag-of-bytes they use which is inefficient. Modifications: - Add a `GRPCContiguousBytes` protocol defining a basic bag-of-bytes type. - Make the transport protocols have an associated `Bytes` type which conforms to `GRPCContiguousBytes`. - Propagate this requirement throughout the codebase; this affects the generated code. - Update the code generator to generate the appropriate code. - Update tests Result: - Transports can use a bag-of-bytes type of their choosing.
1 parent eb7ed6f commit 93d0536

File tree

43 files changed

+360
-219
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+360
-219
lines changed

Sources/GRPCCodeGen/Internal/StructuredSwift+Client.swift

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -645,10 +645,10 @@ extension FunctionDescription {
645645

646646
extension StructDescription {
647647
/// ```
648-
/// struct <Name>: <ClientProtocol> {
649-
/// private let client: GRPCCore.GRPCClient
648+
/// struct <Name><Transport>: <ClientProtocol> where Transport: GRPCCore.ClientTransport {
649+
/// private let client: GRPCCore.GRPCClient<Transport>
650650
///
651-
/// init(wrapping client: GRPCCore.GRPCClient) {
651+
/// init(wrapping client: GRPCCore.GRPCClient<Transport>) {
652652
/// self.client = client
653653
/// }
654654
///
@@ -665,9 +665,18 @@ extension StructDescription {
665665
StructDescription(
666666
accessModifier: accessLevel,
667667
name: name,
668+
generics: [.member("Transport")],
668669
conformances: [clientProtocol],
670+
whereClause: WhereClause(
671+
requirements: [.conformance("Transport", "GRPCCore.ClientTransport")]
672+
),
669673
members: [
670-
.variable(accessModifier: .private, kind: .let, left: "client", type: .grpcClient),
674+
.variable(
675+
accessModifier: .private,
676+
kind: .let,
677+
left: "client",
678+
type: .grpcClient(genericOver: "Transport")
679+
),
671680
.commentable(
672681
.preFormatted(
673682
"""
@@ -681,7 +690,13 @@ extension StructDescription {
681690
accessModifier: accessLevel,
682691
kind: .initializer,
683692
parameters: [
684-
ParameterDescription(label: "wrapping", name: "client", type: .grpcClient)
693+
ParameterDescription(
694+
label: "wrapping",
695+
name: "client",
696+
type: .grpcClient(
697+
genericOver: "Transport"
698+
)
699+
)
685700
],
686701
whereClause: nil,
687702
body: [

Sources/GRPCCodeGen/Internal/StructuredSwift+Server.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,14 +311,20 @@ extension FunctionDescription {
311311
return FunctionDescription(
312312
accessModifier: accessLevel,
313313
kind: .function(name: "registerMethods"),
314+
generics: [.member("Transport")],
314315
parameters: [
315316
ParameterDescription(
316317
label: "with",
317318
name: "router",
318-
type: .rpcRouter,
319+
type: .rpcRouter(genericOver: "Transport"),
319320
`inout`: true
320321
)
321322
],
323+
whereClause: WhereClause(
324+
requirements: [
325+
.conformance("Transport", "GRPCCore.ServerTransport")
326+
]
327+
),
322328
body: methods.map { method in
323329
.functionCall(
324330
.registerWithRouter(

Sources/GRPCCodeGen/Internal/StructuredSwift+Types.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ extension ExistingTypeDescription {
5454
}
5555

5656
package static let serverContext: Self = .grpcCore("ServerContext")
57-
package static let rpcRouter: Self = .grpcCore("RPCRouter")
57+
58+
package static func rpcRouter(genericOver type: String) -> Self {
59+
.generic(wrapper: .grpcCore("RPCRouter"), wrapped: .member(type))
60+
}
61+
5862
package static let serviceDescriptor: Self = .grpcCore("ServiceDescriptor")
5963
package static let methodDescriptor: Self = .grpcCore("MethodDescriptor")
6064

@@ -80,5 +84,8 @@ extension ExistingTypeDescription {
8084

8185
package static let callOptions: Self = .grpcCore("CallOptions")
8286
package static let metadata: Self = .grpcCore("Metadata")
83-
package static let grpcClient: Self = .grpcCore("GRPCClient")
87+
88+
package static func grpcClient(genericOver transport: String) -> Self {
89+
.generic(wrapper: .grpcCore("GRPCClient"), wrapped: [.member(transport)])
90+
}
8491
}

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,12 @@ extension ClientRPCExecutor.RetryExecutor {
194194
}
195195

196196
@inlinable
197-
func executeAttempt<R: Sendable>(
197+
func executeAttempt<R: Sendable, Bytes: GRPCContiguousBytes>(
198198
context: ClientContext,
199-
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
199+
stream: RPCStream<
200+
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
201+
RPCWriter<RPCRequestPart<Bytes>>.Closable
202+
>,
200203
metadata: Metadata,
201204
retryStream: BroadcastAsyncSequence<Input>,
202205
method: MethodDescriptor,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,18 @@ extension ClientRPCExecutor {
113113
/// - stream: The stream to excecute the RPC on.
114114
/// - Returns: The deserialized response.
115115
@inlinable // would be private
116-
static func _execute<Input: Sendable, Output: Sendable>(
116+
static func _execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
117117
in group: inout TaskGroup<Void>,
118118
context: ClientContext,
119119
request: StreamingClientRequest<Input>,
120120
attempt: Int,
121121
serializer: some MessageSerializer<Input>,
122122
deserializer: some MessageDeserializer<Output>,
123123
interceptors: [any ClientInterceptor],
124-
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
124+
stream: RPCStream<
125+
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
126+
RPCWriter<RPCRequestPart<Bytes>>.Closable
127+
>
125128
) async -> StreamingClientResponse<Output> {
126129

127130
if interceptors.isEmpty {

Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@ internal enum ClientStreamExecutor {
2828
/// - stream: The stream to excecute the RPC on.
2929
/// - Returns: A streamed response.
3030
@inlinable
31-
static func execute<Input: Sendable, Output: Sendable>(
31+
static func execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
3232
in group: inout TaskGroup<Void>,
3333
request: StreamingClientRequest<Input>,
3434
context: ClientContext,
3535
attempt: Int,
3636
serializer: some MessageSerializer<Input>,
3737
deserializer: some MessageDeserializer<Output>,
38-
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
38+
stream: RPCStream<
39+
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
40+
RPCWriter<RPCRequestPart<Bytes>>.Closable
41+
>
3942
) async -> StreamingClientResponse<Output> {
4043
// Let the server know this is a retry.
4144
var metadata = request.metadata
@@ -83,8 +86,8 @@ internal enum ClientStreamExecutor {
8386
}
8487

8588
@inlinable // would be private
86-
static func _processRequest<Outbound>(
87-
on stream: some ClosableRPCWriterProtocol<RPCRequestPart>,
89+
static func _processRequest<Outbound, Bytes: GRPCContiguousBytes>(
90+
on stream: some ClosableRPCWriterProtocol<RPCRequestPart<Bytes>>,
8891
request: StreamingClientRequest<Outbound>,
8992
serializer: some MessageSerializer<Outbound>
9093
) async {
@@ -104,16 +107,19 @@ internal enum ClientStreamExecutor {
104107
}
105108

106109
@usableFromInline
107-
enum OnFirstResponsePart: Sendable {
108-
case metadata(Metadata, UnsafeTransfer<ClientTransport.Inbound.AsyncIterator>)
110+
enum OnFirstResponsePart<Bytes: GRPCContiguousBytes>: Sendable {
111+
case metadata(
112+
Metadata,
113+
UnsafeTransfer<RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>.AsyncIterator>
114+
)
109115
case status(Status, Metadata)
110116
case failed(RPCError)
111117
}
112118

113119
@inlinable // would be private
114-
static func _waitForFirstResponsePart(
115-
on stream: ClientTransport.Inbound
116-
) async -> OnFirstResponsePart {
120+
static func _waitForFirstResponsePart<Bytes: GRPCContiguousBytes>(
121+
on stream: RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
122+
) async -> OnFirstResponsePart<Bytes> {
117123
var iterator = stream.makeAsyncIterator()
118124
let result = await Result<OnFirstResponsePart, any Error> {
119125
switch try await iterator.next() {
@@ -165,7 +171,8 @@ internal enum ClientStreamExecutor {
165171

166172
@usableFromInline
167173
struct RawBodyPartToMessageSequence<
168-
Base: AsyncSequence<RPCResponsePart, Failure>,
174+
Base: AsyncSequence<RPCResponsePart<Bytes>, Failure>,
175+
Bytes: GRPCContiguousBytes,
169176
Message: Sendable,
170177
Deserializer: MessageDeserializer<Message>,
171178
Failure: Error

Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ struct ServerRPCExecutor {
2727
/// interceptors will be called in the order of the array.
2828
/// - handler: A handler which turns the request into a response.
2929
@inlinable
30-
static func execute<Input, Output>(
30+
static func execute<Input, Output, Bytes: GRPCContiguousBytes>(
3131
context: ServerContext,
3232
stream: RPCStream<
33-
RPCAsyncSequence<RPCRequestPart, any Error>,
34-
RPCWriter<RPCResponsePart>.Closable
33+
RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>,
34+
RPCWriter<RPCResponsePart<Bytes>>.Closable
3535
>,
3636
deserializer: some MessageDeserializer<Input>,
3737
serializer: some MessageSerializer<Output>,
@@ -66,11 +66,11 @@ struct ServerRPCExecutor {
6666
}
6767

6868
@inlinable
69-
static func _execute<Input, Output>(
69+
static func _execute<Input, Output, Bytes: GRPCContiguousBytes>(
7070
context: ServerContext,
7171
metadata: Metadata,
72-
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
73-
outbound: RPCWriter<RPCResponsePart>.Closable,
72+
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
73+
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
7474
deserializer: some MessageDeserializer<Input>,
7575
serializer: some MessageSerializer<Output>,
7676
interceptors: [any ServerInterceptor],
@@ -106,12 +106,12 @@ struct ServerRPCExecutor {
106106
}
107107

108108
@inlinable
109-
static func _processRPCWithTimeout<Input, Output>(
109+
static func _processRPCWithTimeout<Input, Output, Bytes: GRPCContiguousBytes>(
110110
timeout: Duration,
111111
context: ServerContext,
112112
metadata: Metadata,
113-
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
114-
outbound: RPCWriter<RPCResponsePart>.Closable,
113+
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
114+
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
115115
deserializer: some MessageDeserializer<Input>,
116116
serializer: some MessageSerializer<Output>,
117117
interceptors: [any ServerInterceptor],
@@ -147,11 +147,11 @@ struct ServerRPCExecutor {
147147
}
148148

149149
@inlinable
150-
static func _processRPC<Input, Output>(
150+
static func _processRPC<Input, Output, Bytes: GRPCContiguousBytes>(
151151
context: ServerContext,
152152
metadata: Metadata,
153-
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
154-
outbound: RPCWriter<RPCResponsePart>.Closable,
153+
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
154+
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
155155
deserializer: some MessageDeserializer<Input>,
156156
serializer: some MessageSerializer<Output>,
157157
interceptors: [any ServerInterceptor],
@@ -235,12 +235,12 @@ struct ServerRPCExecutor {
235235
}
236236

237237
@inlinable
238-
static func _waitForFirstRequestPart(
239-
inbound: RPCAsyncSequence<RPCRequestPart, any Error>
240-
) async -> OnFirstRequestPart {
238+
static func _waitForFirstRequestPart<Bytes: GRPCContiguousBytes>(
239+
inbound: RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>
240+
) async -> OnFirstRequestPart<Bytes> {
241241
var iterator = inbound.makeAsyncIterator()
242242
let part = await Result { try await iterator.next() }
243-
let onFirstRequestPart: OnFirstRequestPart
243+
let onFirstRequestPart: OnFirstRequestPart<Bytes>
244244

245245
switch part {
246246
case .success(.metadata(let metadata)):
@@ -275,10 +275,10 @@ struct ServerRPCExecutor {
275275
}
276276

277277
@usableFromInline
278-
enum OnFirstRequestPart {
278+
enum OnFirstRequestPart<Bytes: GRPCContiguousBytes> {
279279
case process(
280280
Metadata,
281-
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>
281+
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>
282282
)
283283
case reject(RPCError)
284284
}

Sources/GRPCCore/Call/Server/RPCRouter.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@
3434
/// 1. Remove individual methods by calling ``removeHandler(forMethod:)``, or
3535
/// 2. Implement ``RegistrableRPCService/registerMethods(with:)`` to register only the methods you
3636
/// want to be served.
37-
public struct RPCRouter: Sendable {
37+
public struct RPCRouter<Transport: ServerTransport>: Sendable {
3838
@usableFromInline
3939
struct RPCHandler: Sendable {
4040
@usableFromInline
4141
let _fn:
4242
@Sendable (
4343
_ stream: RPCStream<
44-
RPCAsyncSequence<RPCRequestPart, any Error>,
45-
RPCWriter<RPCResponsePart>.Closable
44+
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
45+
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
4646
>,
4747
_ context: ServerContext,
4848
_ interceptors: [any ServerInterceptor]
@@ -73,8 +73,8 @@ public struct RPCRouter: Sendable {
7373
@inlinable
7474
func handle(
7575
stream: RPCStream<
76-
RPCAsyncSequence<RPCRequestPart, any Error>,
77-
RPCWriter<RPCResponsePart>.Closable
76+
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
77+
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
7878
>,
7979
context: ServerContext,
8080
interceptors: [any ServerInterceptor]
@@ -171,8 +171,8 @@ public struct RPCRouter: Sendable {
171171
extension RPCRouter {
172172
internal func handle(
173173
stream: RPCStream<
174-
RPCAsyncSequence<RPCRequestPart, any Error>,
175-
RPCWriter<RPCResponsePart>.Closable
174+
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
175+
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
176176
>,
177177
context: ServerContext
178178
) async {

Sources/GRPCCore/Call/Server/RegistrableRPCService.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ public protocol RegistrableRPCService: Sendable {
2626
/// Registers methods to server with the provided ``RPCRouter``.
2727
///
2828
/// - Parameter router: The router to register methods with.
29-
func registerMethods(with router: inout RPCRouter)
29+
func registerMethods<Transport: ServerTransport>(with router: inout RPCRouter<Transport>)
3030
}

Sources/GRPCCore/Coding/Coding.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public protocol MessageSerializer<Message>: Sendable {
3030
///
3131
/// - Parameter message: The message to serialize.
3232
/// - Returns: The serialized bytes of a message.
33-
func serialize(_ message: Message) throws -> [UInt8]
33+
func serialize<Bytes: GRPCContiguousBytes>(_ message: Message) throws -> Bytes
3434
}
3535

3636
/// Deserializes a sequence of bytes into a message.
@@ -49,5 +49,5 @@ public protocol MessageDeserializer<Message>: Sendable {
4949
///
5050
/// - Parameter serializedMessageBytes: The bytes to deserialize.
5151
/// - Returns: The deserialized message.
52-
func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message
52+
func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws -> Message
5353
}

0 commit comments

Comments
 (0)