Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ extension FunctionDescription {

extension StructDescription {
/// ```
/// struct <Name>: <ClientProtocol> {
/// private let client: GRPCCore.GRPCClient
/// struct <Name><Transport>: <ClientProtocol> where Transport: GRPCCore.ClientTransport {
/// private let client: GRPCCore.GRPCClient<Transport>
///
/// init(wrapping client: GRPCCore.GRPCClient) {
/// init(wrapping client: GRPCCore.GRPCClient<Transport>) {
/// self.client = client
/// }
///
Expand All @@ -665,9 +665,18 @@ extension StructDescription {
StructDescription(
accessModifier: accessLevel,
name: name,
generics: [.member("Transport")],
conformances: [clientProtocol],
whereClause: WhereClause(
requirements: [.conformance("Transport", "GRPCCore.ClientTransport")]
),
members: [
.variable(accessModifier: .private, kind: .let, left: "client", type: .grpcClient),
.variable(
accessModifier: .private,
kind: .let,
left: "client",
type: .grpcClient(genericOver: "Transport")
),
.commentable(
.preFormatted(
"""
Expand All @@ -681,7 +690,13 @@ extension StructDescription {
accessModifier: accessLevel,
kind: .initializer,
parameters: [
ParameterDescription(label: "wrapping", name: "client", type: .grpcClient)
ParameterDescription(
label: "wrapping",
name: "client",
type: .grpcClient(
genericOver: "Transport"
)
)
],
whereClause: nil,
body: [
Expand Down
8 changes: 7 additions & 1 deletion Sources/GRPCCodeGen/Internal/StructuredSwift+Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,20 @@ extension FunctionDescription {
return FunctionDescription(
accessModifier: accessLevel,
kind: .function(name: "registerMethods"),
generics: [.member("Transport")],
parameters: [
ParameterDescription(
label: "with",
name: "router",
type: .rpcRouter,
type: .rpcRouter(genericOver: "Transport"),
`inout`: true
)
],
whereClause: WhereClause(
requirements: [
.conformance("Transport", "GRPCCore.ServerTransport")
]
),
body: methods.map { method in
.functionCall(
.registerWithRouter(
Expand Down
11 changes: 9 additions & 2 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ extension ExistingTypeDescription {
}

package static let serverContext: Self = .grpcCore("ServerContext")
package static let rpcRouter: Self = .grpcCore("RPCRouter")

package static func rpcRouter(genericOver type: String) -> Self {
.generic(wrapper: .grpcCore("RPCRouter"), wrapped: .member(type))
}

package static let serviceDescriptor: Self = .grpcCore("ServiceDescriptor")
package static let methodDescriptor: Self = .grpcCore("MethodDescriptor")

Expand All @@ -80,5 +84,8 @@ extension ExistingTypeDescription {

package static let callOptions: Self = .grpcCore("CallOptions")
package static let metadata: Self = .grpcCore("Metadata")
package static let grpcClient: Self = .grpcCore("GRPCClient")

package static func grpcClient(genericOver transport: String) -> Self {
.generic(wrapper: .grpcCore("GRPCClient"), wrapped: [.member(transport)])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,11 @@ extension ClientRPCExecutor.RetryExecutor {
}

@inlinable
func executeAttempt<R: Sendable>(
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
func executeAttempt<R: Sendable, Bytes: GRPCContiguousBytes>(
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>,
metadata: Metadata,
retryStream: BroadcastAsyncSequence<Input>,
method: MethodDescriptor,
Expand Down
7 changes: 5 additions & 2 deletions Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,18 @@ extension ClientRPCExecutor {
/// interceptors will be called in the order of the array.
/// - Returns: The deserialized response.
@inlinable // would be private
static func _execute<Input: Sendable, Output: Sendable>(
static func _execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
request: StreamingClientRequest<Input>,
method: MethodDescriptor,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
interceptors: [any ClientInterceptor],
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {
let context = ClientContext(descriptor: method)

Expand Down
27 changes: 17 additions & 10 deletions Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ internal enum ClientStreamExecutor {
/// - stream: The stream to excecute the RPC on.
/// - Returns: A streamed response.
@inlinable
static func execute<Input: Sendable, Output: Sendable>(
static func execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
request: StreamingClientRequest<Input>,
context: ClientContext,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {
// Let the server know this is a retry.
var metadata = request.metadata
Expand Down Expand Up @@ -83,8 +86,8 @@ internal enum ClientStreamExecutor {
}

@inlinable // would be private
static func _processRequest<Outbound>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart>,
static func _processRequest<Outbound, Bytes: GRPCContiguousBytes>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart<Bytes>>,
request: StreamingClientRequest<Outbound>,
serializer: some MessageSerializer<Outbound>
) async {
Expand All @@ -104,16 +107,19 @@ internal enum ClientStreamExecutor {
}

@usableFromInline
enum OnFirstResponsePart: Sendable {
case metadata(Metadata, UnsafeTransfer<ClientTransport.Inbound.AsyncIterator>)
enum OnFirstResponsePart<Bytes: GRPCContiguousBytes>: Sendable {
case metadata(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>.AsyncIterator>
)
case status(Status, Metadata)
case failed(RPCError)
}

@inlinable // would be private
static func _waitForFirstResponsePart(
on stream: ClientTransport.Inbound
) async -> OnFirstResponsePart {
static func _waitForFirstResponsePart<Bytes: GRPCContiguousBytes>(
on stream: RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async -> OnFirstResponsePart<Bytes> {
var iterator = stream.makeAsyncIterator()
let result = await Result<OnFirstResponsePart, any Error> {
switch try await iterator.next() {
Expand Down Expand Up @@ -165,7 +171,8 @@ internal enum ClientStreamExecutor {

@usableFromInline
struct RawBodyPartToMessageSequence<
Base: AsyncSequence<RPCResponsePart, Failure>,
Base: AsyncSequence<RPCResponsePart<Bytes>, Failure>,
Bytes: GRPCContiguousBytes,
Message: Sendable,
Deserializer: MessageDeserializer<Message>,
Failure: Error
Expand Down
36 changes: 18 additions & 18 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ struct ServerRPCExecutor {
/// interceptors will be called in the order of the array.
/// - handler: A handler which turns the request into a response.
@inlinable
static func execute<Input, Output>(
static func execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>,
RPCWriter<RPCResponsePart<Bytes>>.Closable
>,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
Expand Down Expand Up @@ -66,11 +66,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _execute<Input, Output>(
static func _execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -106,12 +106,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPCWithTimeout<Input, Output>(
static func _processRPCWithTimeout<Input, Output, Bytes: GRPCContiguousBytes>(
timeout: Duration,
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -147,11 +147,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPC<Input, Output>(
static func _processRPC<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -235,12 +235,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _waitForFirstRequestPart(
inbound: RPCAsyncSequence<RPCRequestPart, any Error>
) async -> OnFirstRequestPart {
static func _waitForFirstRequestPart<Bytes: GRPCContiguousBytes>(
inbound: RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>
) async -> OnFirstRequestPart<Bytes> {
var iterator = inbound.makeAsyncIterator()
let part = await Result { try await iterator.next() }
let onFirstRequestPart: OnFirstRequestPart
let onFirstRequestPart: OnFirstRequestPart<Bytes>

switch part {
case .success(.metadata(let metadata)):
Expand Down Expand Up @@ -275,10 +275,10 @@ struct ServerRPCExecutor {
}

@usableFromInline
enum OnFirstRequestPart {
enum OnFirstRequestPart<Bytes: GRPCContiguousBytes> {
case process(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>
)
case reject(RPCError)
}
Expand Down
14 changes: 7 additions & 7 deletions Sources/GRPCCore/Call/Server/RPCRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
/// 1. Remove individual methods by calling ``removeHandler(forMethod:)``, or
/// 2. Implement ``RegistrableRPCService/registerMethods(with:)`` to register only the methods you
/// want to be served.
public struct RPCRouter: Sendable {
public struct RPCRouter<Transport: ServerTransport>: Sendable {
@usableFromInline
struct RPCHandler: Sendable {
@usableFromInline
let _fn:
@Sendable (
_ stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
_ context: ServerContext,
_ interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -73,8 +73,8 @@ public struct RPCRouter: Sendable {
@inlinable
func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext,
interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -170,8 +170,8 @@ public struct RPCRouter: Sendable {
extension RPCRouter {
internal func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext
) async {
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCCore/Call/Server/RegistrableRPCService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public protocol RegistrableRPCService: Sendable {
/// Registers methods to server with the provided ``RPCRouter``.
///
/// - Parameter router: The router to register methods with.
func registerMethods(with router: inout RPCRouter)
func registerMethods<Transport: ServerTransport>(with router: inout RPCRouter<Transport>)
}
4 changes: 2 additions & 2 deletions Sources/GRPCCore/Coding/Coding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public protocol MessageSerializer<Message>: Sendable {
///
/// - Parameter message: The message to serialize.
/// - Returns: The serialized bytes of a message.
func serialize(_ message: Message) throws -> [UInt8]
func serialize<Bytes: GRPCContiguousBytes>(_ message: Message) throws -> Bytes
}

/// Deserializes a sequence of bytes into a message.
Expand All @@ -49,5 +49,5 @@ public protocol MessageDeserializer<Message>: Sendable {
///
/// - Parameter serializedMessageBytes: The bytes to deserialize.
/// - Returns: The deserialized message.
func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message
func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws -> Message
}
Loading
Loading