Skip to content

Commit 1fb1626

Browse files
authored
Add more properties to ClientContext and have the ClientTransport provide it (#2158)
The `ServerTransport` provides the `ServerContext`, as it contains information that only the transport knows about (such as the remote peer's address). For consistency and to allow the `ClientContext` to also hold some additional information (such as remote and local peer descriptions), this PR changes the `ClientTransport` protocol so that implementations also provide the corresponding `ClientContext`. This PR also adds additional information to the context (which will be used by the tracing interceptor but can be useful for users in general): remote and local peer addresses, server hostname, and network transport.
1 parent 76073a2 commit 1fb1626

17 files changed

+111
-56
lines changed

Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ extension VariableDescription {
4545
/// static let descriptor = GRPCCore.MethodDescriptor(
4646
/// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
4747
/// method: "<literalMethodName>"
48+
/// )
4849
/// ```
4950
package static func methodDescriptor(
5051
accessModifier: AccessModifier? = nil,

Sources/GRPCCore/Call/Client/ClientContext.swift

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,40 @@ public struct ClientContext: Sendable {
1919
/// A description of the method being called.
2020
public var descriptor: MethodDescriptor
2121

22+
/// A description of the remote peer.
23+
///
24+
/// The format of the description should follow the pattern "<transport>:<address>" where
25+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
26+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
27+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
28+
///
29+
/// Some examples include:
30+
/// - "ipv4:127.0.0.1:31415",
31+
/// - "ipv6:[::1]:443",
32+
/// - "in-process:27182".
33+
public var remotePeer: String
34+
35+
/// A description of the local peer.
36+
///
37+
/// The format of the description should follow the pattern "<transport>:<address>" where
38+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
39+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
40+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
41+
///
42+
/// Some examples include:
43+
/// - "ipv4:127.0.0.1:31415",
44+
/// - "ipv6:[::1]:443",
45+
/// - "in-process:27182".
46+
public var localPeer: String
47+
2248
/// Create a new client interceptor context.
23-
public init(descriptor: MethodDescriptor) {
49+
public init(
50+
descriptor: MethodDescriptor,
51+
remotePeer: String,
52+
localPeer: String
53+
) {
2454
self.descriptor = descriptor
55+
self.remotePeer = remotePeer
56+
self.localPeer = localPeer
2557
}
2658
}

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
322322
return try await self.transport.withStream(
323323
descriptor: method,
324324
options: options
325-
) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
325+
) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
326326
return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
327327
group.addTask {
328328
do {
@@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {
348348

349349
let response = await ClientRPCExecutor._execute(
350350
in: &group,
351+
context: context,
351352
request: request,
352-
method: method,
353353
attempt: attempt,
354354
serializer: self.serializer,
355355
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,14 @@ extension ClientRPCExecutor.OneShotExecutor {
9898
) async -> Result<R, any Error> {
9999
return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
100100
do {
101-
return try await self.transport.withStream(descriptor: method, options: options) { stream in
101+
return try await self.transport.withStream(
102+
descriptor: method,
103+
options: options
104+
) { stream, context in
102105
let response = await ClientRPCExecutor._execute(
103106
in: &group,
107+
context: context,
104108
request: request,
105-
method: method,
106109
attempt: 1,
107110
serializer: self.serializer,
108111
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
118118
let attemptResult = try await self.transport.withStream(
119119
descriptor: method,
120120
options: options
121-
) { stream in
121+
) { stream, context in
122122
group.addTask {
123123
var metadata = request.metadata
124124
// Work out the timeout from the deadline.
@@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
127127
}
128128

129129
return await self.executeAttempt(
130+
context: context,
130131
stream: stream,
131132
metadata: metadata,
132133
retryStream: retry.stream,
@@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
194195

195196
@inlinable
196197
func executeAttempt<R: Sendable>(
198+
context: ClientContext,
197199
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
198200
metadata: Metadata,
199201
retryStream: BroadcastAsyncSequence<Input>,
@@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor {
211213

212214
let response = await ClientRPCExecutor._execute(
213215
in: &group,
216+
context: context,
214217
request: request,
215-
method: method,
216218
attempt: attempt,
217219
serializer: self.serializer,
218220
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,25 @@ extension ClientRPCExecutor {
104104
///
105105
/// - Parameters:
106106
/// - request: The request to execute.
107-
/// - method: A description of the method to execute the request against.
107+
/// - context: The ``ClientContext`` related to this request.
108108
/// - attempt: The attempt number of the request.
109109
/// - serializer: A serializer to convert input messages to bytes.
110110
/// - deserializer: A deserializer to convert bytes to output messages.
111111
/// - interceptors: An array of interceptors which the request and response pass through. The
112112
/// interceptors will be called in the order of the array.
113+
/// - stream: The stream to excecute the RPC on.
113114
/// - Returns: The deserialized response.
114115
@inlinable // would be private
115116
static func _execute<Input: Sendable, Output: Sendable>(
116117
in group: inout TaskGroup<Void>,
118+
context: ClientContext,
117119
request: StreamingClientRequest<Input>,
118-
method: MethodDescriptor,
119120
attempt: Int,
120121
serializer: some MessageSerializer<Input>,
121122
deserializer: some MessageDeserializer<Output>,
122123
interceptors: [any ClientInterceptor],
123124
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
124125
) async -> StreamingClientResponse<Output> {
125-
let context = ClientContext(descriptor: method)
126126

127127
if interceptors.isEmpty {
128128
return await ClientStreamExecutor.execute(

Sources/GRPCCore/Transport/ClientTransport.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public protocol ClientTransport: Sendable {
4747
/// running ``connect()``.
4848
func beginGracefulShutdown()
4949

50-
/// Opens a stream using the transport, and uses it as input into a user-provided closure.
50+
/// Opens a stream using the transport, and uses it as input into a user-provided closure alongisde the given context.
5151
///
5252
/// - Important: The opened stream is closed after the closure is finished.
5353
///
@@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable {
5959
/// - Parameters:
6060
/// - descriptor: A description of the method to open a stream for.
6161
/// - options: Options specific to the stream.
62-
/// - closure: A closure that takes the opened stream as parameter.
62+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
6363
/// - Returns: Whatever value was returned from `closure`.
6464
func withStream<T: Sendable>(
6565
descriptor: MethodDescriptor,
6666
options: CallOptions,
67-
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
67+
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
6868
) async throws -> T
6969

7070
/// Returns the configuration for a given method.

Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,23 @@ extension InProcessTransport {
103103

104104
private let methodConfig: MethodConfigs
105105
private let state: Mutex<State>
106+
private let peer: String
106107

107108
/// Creates a new in-process client transport.
108109
///
109110
/// - Parameters:
110111
/// - server: The in-process server transport to connect to.
111112
/// - serviceConfig: Service configuration.
113+
/// - peer: The system's PID for the running client and server.
112114
package init(
113115
server: InProcessTransport.Server,
114-
serviceConfig: ServiceConfig = ServiceConfig()
116+
serviceConfig: ServiceConfig = ServiceConfig(),
117+
peer: String
115118
) {
116119
self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
117120
self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
118121
self.state = Mutex(.unconnected(.init(serverTransport: server)))
122+
self.peer = peer
119123
}
120124

121125
/// Establish and maintain a connection to the remote destination.
@@ -225,12 +229,12 @@ extension InProcessTransport {
225229
/// - Parameters:
226230
/// - descriptor: A description of the method to open a stream for.
227231
/// - options: Options specific to the stream.
228-
/// - closure: A closure that takes the opened stream as parameter.
232+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
229233
/// - Returns: Whatever value was returned from `closure`.
230234
public func withStream<T>(
231235
descriptor: MethodDescriptor,
232236
options: CallOptions,
233-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
237+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
234238
) async throws -> T {
235239
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
236240
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
@@ -297,11 +301,17 @@ extension InProcessTransport {
297301
}
298302
}
299303

304+
let clientContext = ClientContext(
305+
descriptor: descriptor,
306+
remotePeer: self.peer,
307+
localPeer: self.peer
308+
)
309+
300310
switch acceptStream {
301311
case .success(let streamID):
302312
let streamHandlingResult: Result<T, any Error>
303313
do {
304-
let result = try await closure(clientStream)
314+
let result = try await closure(clientStream, clientContext)
305315
streamHandlingResult = .success(result)
306316
} catch {
307317
streamHandlingResult = .failure(error)

Sources/GRPCInProcessTransport/InProcessTransport+Server.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ extension InProcessTransport {
3434

3535
private let newStreams: AsyncStream<RPCStream<Inbound, Outbound>>
3636
private let newStreamsContinuation: AsyncStream<RPCStream<Inbound, Outbound>>.Continuation
37-
private let peer: String
37+
package let peer: String
3838

3939
private struct State: Sendable {
4040
private var _nextID: UInt64
@@ -74,6 +74,9 @@ extension InProcessTransport {
7474
private let handles: Mutex<State>
7575

7676
/// Creates a new instance of ``Server``.
77+
///
78+
/// - Parameters:
79+
/// - peer: The system's PID for the running client and server.
7780
package init(peer: String) {
7881
(self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream()
7982
self.handles = Mutex(State())

Sources/GRPCInProcessTransport/InProcessTransport.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ public struct InProcessTransport: Sendable {
2727
public init(serviceConfig: ServiceConfig = ServiceConfig()) {
2828
let peer = "in-process:\(System.pid())"
2929
self.server = Self.Server(peer: peer)
30-
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig)
30+
self.client = Self.Client(server: self.server, serviceConfig: serviceConfig, peer: peer)
3131
}
3232
}

0 commit comments

Comments
 (0)