Skip to content

Commit d069bf1

Browse files
authored
Merge branch 'main' into v2/bag-o-bytes
2 parents 34fccb6 + 1fb1626 commit d069bf1

17 files changed

+111
-56
lines changed

Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ extension VariableDescription {
4545
/// static let descriptor = GRPCCore.MethodDescriptor(
4646
/// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
4747
/// method: "<literalMethodName>"
48+
/// )
4849
/// ```
4950
package static func methodDescriptor(
5051
accessModifier: AccessModifier? = nil,

Sources/GRPCCore/Call/Client/ClientContext.swift

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,40 @@ public struct ClientContext: Sendable {
1919
/// A description of the method being called.
2020
public var descriptor: MethodDescriptor
2121

22+
/// A description of the remote peer.
23+
///
24+
/// The format of the description should follow the pattern "<transport>:<address>" where
25+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
26+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
27+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
28+
///
29+
/// Some examples include:
30+
/// - "ipv4:127.0.0.1:31415",
31+
/// - "ipv6:[::1]:443",
32+
/// - "in-process:27182".
33+
public var remotePeer: String
34+
35+
/// A description of the local peer.
36+
///
37+
/// The format of the description should follow the pattern "<transport>:<address>" where
38+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
39+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
40+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
41+
///
42+
/// Some examples include:
43+
/// - "ipv4:127.0.0.1:31415",
44+
/// - "ipv6:[::1]:443",
45+
/// - "in-process:27182".
46+
public var localPeer: String
47+
2248
/// Create a new client interceptor context.
23-
public init(descriptor: MethodDescriptor) {
49+
public init(
50+
descriptor: MethodDescriptor,
51+
remotePeer: String,
52+
localPeer: String
53+
) {
2454
self.descriptor = descriptor
55+
self.remotePeer = remotePeer
56+
self.localPeer = localPeer
2557
}
2658
}

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
322322
return try await self.transport.withStream(
323323
descriptor: method,
324324
options: options
325-
) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
325+
) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
326326
return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
327327
group.addTask {
328328
do {
@@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {
348348

349349
let response = await ClientRPCExecutor._execute(
350350
in: &group,
351+
context: context,
351352
request: request,
352-
method: method,
353353
attempt: attempt,
354354
serializer: self.serializer,
355355
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,14 @@ extension ClientRPCExecutor.OneShotExecutor {
9898
) async -> Result<R, any Error> {
9999
return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
100100
do {
101-
return try await self.transport.withStream(descriptor: method, options: options) { stream in
101+
return try await self.transport.withStream(
102+
descriptor: method,
103+
options: options
104+
) { stream, context in
102105
let response = await ClientRPCExecutor._execute(
103106
in: &group,
107+
context: context,
104108
request: request,
105-
method: method,
106109
attempt: 1,
107110
serializer: self.serializer,
108111
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
118118
let attemptResult = try await self.transport.withStream(
119119
descriptor: method,
120120
options: options
121-
) { stream in
121+
) { stream, context in
122122
group.addTask {
123123
var metadata = request.metadata
124124
// Work out the timeout from the deadline.
@@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
127127
}
128128

129129
return await self.executeAttempt(
130+
context: context,
130131
stream: stream,
131132
metadata: metadata,
132133
retryStream: retry.stream,
@@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
194195

195196
@inlinable
196197
func executeAttempt<R: Sendable, Bytes: GRPCContiguousBytes>(
198+
context: ClientContext,
197199
stream: RPCStream<
198200
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
199201
RPCWriter<RPCRequestPart<Bytes>>.Closable
@@ -214,8 +216,8 @@ extension ClientRPCExecutor.RetryExecutor {
214216

215217
let response = await ClientRPCExecutor._execute(
216218
in: &group,
219+
context: context,
217220
request: request,
218-
method: method,
219221
attempt: attempt,
220222
serializer: self.serializer,
221223
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,19 @@ extension ClientRPCExecutor {
104104
///
105105
/// - Parameters:
106106
/// - request: The request to execute.
107-
/// - method: A description of the method to execute the request against.
107+
/// - context: The ``ClientContext`` related to this request.
108108
/// - attempt: The attempt number of the request.
109109
/// - serializer: A serializer to convert input messages to bytes.
110110
/// - deserializer: A deserializer to convert bytes to output messages.
111111
/// - interceptors: An array of interceptors which the request and response pass through. The
112112
/// interceptors will be called in the order of the array.
113+
/// - stream: The stream to excecute the RPC on.
113114
/// - Returns: The deserialized response.
114115
@inlinable // would be private
115116
static func _execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
116117
in group: inout TaskGroup<Void>,
118+
context: ClientContext,
117119
request: StreamingClientRequest<Input>,
118-
method: MethodDescriptor,
119120
attempt: Int,
120121
serializer: some MessageSerializer<Input>,
121122
deserializer: some MessageDeserializer<Output>,
@@ -125,7 +126,6 @@ extension ClientRPCExecutor {
125126
RPCWriter<RPCRequestPart<Bytes>>.Closable
126127
>
127128
) async -> StreamingClientResponse<Output> {
128-
let context = ClientContext(descriptor: method)
129129

130130
if interceptors.isEmpty {
131131
return await ClientStreamExecutor.execute(

Sources/GRPCCore/Transport/ClientTransport.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public protocol ClientTransport<Bytes>: Sendable {
5050
/// running ``connect()``.
5151
func beginGracefulShutdown()
5252

53-
/// Opens a stream using the transport, and uses it as input into a user-provided closure.
53+
/// Opens a stream using the transport, and uses it as input into a user-provided closure alongisde the given context.
5454
///
5555
/// - Important: The opened stream is closed after the closure is finished.
5656
///
@@ -62,12 +62,12 @@ public protocol ClientTransport<Bytes>: Sendable {
6262
/// - Parameters:
6363
/// - descriptor: A description of the method to open a stream for.
6464
/// - options: Options specific to the stream.
65-
/// - closure: A closure that takes the opened stream as parameter.
65+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
6666
/// - Returns: Whatever value was returned from `closure`.
6767
func withStream<T: Sendable>(
6868
descriptor: MethodDescriptor,
6969
options: CallOptions,
70-
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
70+
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
7171
) async throws -> T
7272

7373
/// Returns the configuration for a given method.

Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,23 @@ extension InProcessTransport {
104104

105105
private let methodConfig: MethodConfigs
106106
private let state: Mutex<State>
107+
private let peer: String
107108

108109
/// Creates a new in-process client transport.
109110
///
110111
/// - Parameters:
111112
/// - server: The in-process server transport to connect to.
112113
/// - serviceConfig: Service configuration.
114+
/// - peer: The system's PID for the running client and server.
113115
package init(
114116
server: InProcessTransport.Server,
115-
serviceConfig: ServiceConfig = ServiceConfig()
117+
serviceConfig: ServiceConfig = ServiceConfig(),
118+
peer: String
116119
) {
117120
self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
118121
self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
119122
self.state = Mutex(.unconnected(.init(serverTransport: server)))
123+
self.peer = peer
120124
}
121125

122126
/// Establish and maintain a connection to the remote destination.
@@ -226,12 +230,12 @@ extension InProcessTransport {
226230
/// - Parameters:
227231
/// - descriptor: A description of the method to open a stream for.
228232
/// - options: Options specific to the stream.
229-
/// - closure: A closure that takes the opened stream as parameter.
233+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
230234
/// - Returns: Whatever value was returned from `closure`.
231235
public func withStream<T>(
232236
descriptor: MethodDescriptor,
233237
options: CallOptions,
234-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
238+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
235239
) async throws -> T {
236240
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart<Bytes>.self)
237241
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart<Bytes>.self)
@@ -298,11 +302,17 @@ extension InProcessTransport {
298302
}
299303
}
300304

305+
let clientContext = ClientContext(
306+
descriptor: descriptor,
307+
remotePeer: self.peer,
308+
localPeer: self.peer
309+
)
310+
301311
switch acceptStream {
302312
case .success(let streamID):
303313
let streamHandlingResult: Result<T, any Error>
304314
do {
305-
let result = try await closure(clientStream)
315+
let result = try await closure(clientStream, clientContext)
306316
streamHandlingResult = .success(result)
307317
} catch {
308318
streamHandlingResult = .failure(error)

Sources/GRPCInProcessTransport/InProcessTransport+Server.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extension InProcessTransport {
3636

3737
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
3838
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
39-
private let peer: String
39+
package let peer: String
4040

4141
private struct State: Sendable {
4242
private var _nextID: UInt64
@@ -76,6 +76,9 @@ extension InProcessTransport {
7676
private let handles: Mutex<State>
7777

7878
/// Creates a new instance of ``Server``.
79+
///
80+
/// - Parameters:
81+
/// - peer: The system's PID for the running client and server.
7982
package init(peer: String) {
8083
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
8184
self.handles = Mutex(State())

Sources/GRPCInProcessTransport/InProcessTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ public struct InProcessTransport: Sendable {
2727
public init(serviceConfig: ServiceConfig = ServiceConfig()) {
2828
let peer = "in-process:\(System.pid())"
2929
self.server = Self.Server(peer: peer)
30-
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig)
30+
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig, peer: peer)
3131
}
3232
}

0 commit comments

Comments
 (0)