Skip to content

Commit 4501709

Browse files
committed
Add more properties to ClientContext and have the ClientTransport provide it
1 parent 76073a2 commit 4501709

13 files changed

+96
-43
lines changed

Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ extension VariableDescription {
4545
/// static let descriptor = GRPCCore.MethodDescriptor(
4646
/// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
4747
/// method: "<literalMethodName>"
48+
/// )
4849
/// ```
4950
package static func methodDescriptor(
5051
accessModifier: AccessModifier? = nil,

Sources/GRPCCore/Call/Client/ClientContext.swift

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,50 @@ public struct ClientContext: Sendable {
1919
/// A description of the method being called.
2020
public var descriptor: MethodDescriptor
2121

22+
/// A description of the remote peer.
23+
///
24+
/// The format of the description should follow the pattern "<transport>:<address>" where
25+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
26+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
27+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
28+
///
29+
/// Some examples include:
30+
/// - "ipv4:127.0.0.1:31415",
31+
/// - "ipv6:[::1]:443",
32+
/// - "in-process:27182".
33+
public var remotePeer: String
34+
35+
/// The hostname of the RPC server.
36+
public var serverHostname: String
37+
38+
/// A description of the local peer.
39+
///
40+
/// The format of the description should follow the pattern "<transport>:<address>" where
41+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
42+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
43+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
44+
///
45+
/// Some examples include:
46+
/// - "ipv4:127.0.0.1:31415",
47+
/// - "ipv6:[::1]:443",
48+
/// - "in-process:27182".
49+
public var localPeer: String
50+
51+
/// The transport in use (e.g. "tcp", "udp").
52+
public var networkTransportMethod: String
53+
2254
/// Create a new client interceptor context.
23-
public init(descriptor: MethodDescriptor) {
55+
public init(
56+
descriptor: MethodDescriptor,
57+
remotePeer: String,
58+
localPeer: String,
59+
serverHostname: String,
60+
networkTransportMethod: String
61+
) {
2462
self.descriptor = descriptor
63+
self.remotePeer = remotePeer
64+
self.localPeer = localPeer
65+
self.serverHostname = serverHostname
66+
self.networkTransportMethod = networkTransportMethod
2567
}
2668
}

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
322322
return try await self.transport.withStream(
323323
descriptor: method,
324324
options: options
325-
) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
325+
) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
326326
return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
327327
group.addTask {
328328
do {
@@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {
348348

349349
let response = await ClientRPCExecutor._execute(
350350
in: &group,
351+
context: context,
351352
request: request,
352-
method: method,
353353
attempt: attempt,
354354
serializer: self.serializer,
355355
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ extension ClientRPCExecutor.OneShotExecutor {
9898
) async -> Result<R, any Error> {
9999
return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
100100
do {
101-
return try await self.transport.withStream(descriptor: method, options: options) { stream in
101+
return try await self.transport.withStream(descriptor: method, options: options) { stream, context in
102102
let response = await ClientRPCExecutor._execute(
103103
in: &group,
104+
context: context,
104105
request: request,
105-
method: method,
106106
attempt: 1,
107107
serializer: self.serializer,
108108
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
118118
let attemptResult = try await self.transport.withStream(
119119
descriptor: method,
120120
options: options
121-
) { stream in
121+
) { stream, context in
122122
group.addTask {
123123
var metadata = request.metadata
124124
// Work out the timeout from the deadline.
@@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
127127
}
128128

129129
return await self.executeAttempt(
130+
context: context,
130131
stream: stream,
131132
metadata: metadata,
132133
retryStream: retry.stream,
@@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
194195

195196
@inlinable
196197
func executeAttempt<R: Sendable>(
198+
context: ClientContext,
197199
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
198200
metadata: Metadata,
199201
retryStream: BroadcastAsyncSequence<Input>,
@@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor {
211213

212214
let response = await ClientRPCExecutor._execute(
213215
in: &group,
216+
context: context,
214217
request: request,
215-
method: method,
216218
attempt: attempt,
217219
serializer: self.serializer,
218220
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,25 @@ extension ClientRPCExecutor {
104104
///
105105
/// - Parameters:
106106
/// - request: The request to execute.
107-
/// - method: A description of the method to execute the request against.
107+
/// - context: The ``ClientContext`` related to this request.
108108
/// - attempt: The attempt number of the request.
109109
/// - serializer: A serializer to convert input messages to bytes.
110110
/// - deserializer: A deserializer to convert bytes to output messages.
111111
/// - interceptors: An array of interceptors which the request and response pass through. The
112112
/// interceptors will be called in the order of the array.
113+
/// - stream: The stream to excecute the RPC on.
113114
/// - Returns: The deserialized response.
114115
@inlinable // would be private
115116
static func _execute<Input: Sendable, Output: Sendable>(
116117
in group: inout TaskGroup<Void>,
118+
context: ClientContext,
117119
request: StreamingClientRequest<Input>,
118-
method: MethodDescriptor,
119120
attempt: Int,
120121
serializer: some MessageSerializer<Input>,
121122
deserializer: some MessageDeserializer<Output>,
122123
interceptors: [any ClientInterceptor],
123124
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
124125
) async -> StreamingClientResponse<Output> {
125-
let context = ClientContext(descriptor: method)
126126

127127
if interceptors.isEmpty {
128128
return await ClientStreamExecutor.execute(

Sources/GRPCCore/Transport/ClientTransport.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable {
5959
/// - Parameters:
6060
/// - descriptor: A description of the method to open a stream for.
6161
/// - options: Options specific to the stream.
62-
/// - closure: A closure that takes the opened stream as parameter.
62+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
6363
/// - Returns: Whatever value was returned from `closure`.
6464
func withStream<T: Sendable>(
6565
descriptor: MethodDescriptor,
6666
options: CallOptions,
67-
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
67+
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
6868
) async throws -> T
6969

7070
/// Returns the configuration for a given method.

Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,12 @@ extension InProcessTransport {
225225
/// - Parameters:
226226
/// - descriptor: A description of the method to open a stream for.
227227
/// - options: Options specific to the stream.
228-
/// - closure: A closure that takes the opened stream as parameter.
228+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
229229
/// - Returns: Whatever value was returned from `closure`.
230230
public func withStream<T>(
231231
descriptor: MethodDescriptor,
232232
options: CallOptions,
233-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
233+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
234234
) async throws -> T {
235235
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
236236
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
@@ -297,11 +297,19 @@ extension InProcessTransport {
297297
}
298298
}
299299

300+
let clientContext = ClientContext(
301+
descriptor: descriptor,
302+
remotePeer: "<in-process>",
303+
localPeer: "<in-process>",
304+
serverHostname: "<in-process>",
305+
networkTransportMethod: "in-process"
306+
)
307+
300308
switch acceptStream {
301309
case .success(let streamID):
302310
let streamHandlingResult: Result<T, any Error>
303311
do {
304-
let result = try await closure(clientStream)
312+
let result = try await closure(clientStream, clientContext)
305313
streamHandlingResult = .success(result)
306314
} catch {
307315
streamHandlingResult = .failure(error)

Tests/GRPCCoreTests/GRPCServerTests.swift

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ final class GRPCServerTests: XCTestCase {
4848
try await client.withStream(
4949
descriptor: BinaryEcho.Methods.get,
5050
options: .defaults
51-
) { stream in
51+
) { stream, _ in
5252
try await stream.outbound.write(.metadata([:]))
5353
try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
5454
await stream.outbound.finish()
@@ -75,7 +75,7 @@ final class GRPCServerTests: XCTestCase {
7575
try await client.withStream(
7676
descriptor: BinaryEcho.Methods.collect,
7777
options: .defaults
78-
) { stream in
78+
) { stream, _ in
7979
try await stream.outbound.write(.metadata([:]))
8080
try await stream.outbound.write(.message([3]))
8181
try await stream.outbound.write(.message([1]))
@@ -106,7 +106,7 @@ final class GRPCServerTests: XCTestCase {
106106
try await client.withStream(
107107
descriptor: BinaryEcho.Methods.expand,
108108
options: .defaults
109-
) { stream in
109+
) { stream, _ in
110110
try await stream.outbound.write(.metadata([:]))
111111
try await stream.outbound.write(.message([3, 1, 4, 1, 5]))
112112
await stream.outbound.finish()
@@ -135,7 +135,7 @@ final class GRPCServerTests: XCTestCase {
135135
try await client.withStream(
136136
descriptor: BinaryEcho.Methods.update,
137137
options: .defaults
138-
) { stream in
138+
) { stream, _ in
139139
try await stream.outbound.write(.metadata([:]))
140140
for byte in [3, 1, 4, 1, 5] as [UInt8] {
141141
try await stream.outbound.write(.message([byte]))
@@ -166,7 +166,7 @@ final class GRPCServerTests: XCTestCase {
166166
try await client.withStream(
167167
descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
168168
options: .defaults
169-
) { stream in
169+
) { stream, _ in
170170
try await stream.outbound.write(.metadata([:]))
171171
await stream.outbound.finish()
172172

@@ -187,7 +187,7 @@ final class GRPCServerTests: XCTestCase {
187187
try await client.withStream(
188188
descriptor: BinaryEcho.Methods.get,
189189
options: .defaults
190-
) { stream in
190+
) { stream, _ in
191191
try await stream.outbound.write(.metadata([:]))
192192
try await stream.outbound.write(.message([i]))
193193
await stream.outbound.finish()
@@ -225,7 +225,7 @@ final class GRPCServerTests: XCTestCase {
225225
try await client.withStream(
226226
descriptor: BinaryEcho.Methods.get,
227227
options: .defaults
228-
) { stream in
228+
) { stream, _ in
229229
try await stream.outbound.write(.metadata([:]))
230230
await stream.outbound.finish()
231231

@@ -250,7 +250,7 @@ final class GRPCServerTests: XCTestCase {
250250
try await client.withStream(
251251
descriptor: MethodDescriptor(fullyQualifiedService: "not", method: "implemented"),
252252
options: .defaults
253-
) { stream in
253+
) { stream, _ in
254254
try await stream.outbound.write(.metadata([:]))
255255
await stream.outbound.finish()
256256

@@ -277,7 +277,7 @@ final class GRPCServerTests: XCTestCase {
277277
try await client.withStream(
278278
descriptor: BinaryEcho.Methods.get,
279279
options: .defaults
280-
) { stream in
280+
) { stream, _ in
281281
XCTFail("Stream shouldn't be opened")
282282
}
283283
} errorHandler: { error in
@@ -291,7 +291,7 @@ final class GRPCServerTests: XCTestCase {
291291
try await client.withStream(
292292
descriptor: BinaryEcho.Methods.update,
293293
options: .defaults
294-
) { stream in
294+
) { stream, _ in
295295
try await stream.outbound.write(.metadata([:]))
296296
var iterator = stream.inbound.makeAsyncIterator()
297297
// Don't need to validate the response, just that the server is running.
@@ -364,7 +364,7 @@ final class GRPCServerTests: XCTestCase {
364364
try await transport.withStream(
365365
descriptor: BinaryEcho.Methods.get,
366366
options: .defaults
367-
) { stream in
367+
) { stream, _ in
368368
try await stream.outbound.write(.metadata([:]))
369369
try await stream.outbound.write(.message([0]))
370370
await stream.outbound.finish()
@@ -407,7 +407,7 @@ struct ServerTests {
407407
try await client.withStream(
408408
descriptor: BinaryEcho.Methods.get,
409409
options: .defaults
410-
) { stream in
410+
) { stream, _ in
411411
try await stream.outbound.write(.metadata([:]))
412412
try await stream.outbound.write(.message(Array("hello".utf8)))
413413
await stream.outbound.finish()
@@ -437,7 +437,7 @@ struct ServerTests {
437437
try await client.withStream(
438438
descriptor: HelloWorld.Methods.sayHello,
439439
options: .defaults
440-
) { stream in
440+
) { stream, _ in
441441
try await stream.outbound.write(.metadata([:]))
442442
try await stream.outbound.write(.message(Array("Swift".utf8)))
443443
await stream.outbound.finish()
@@ -494,7 +494,7 @@ struct ServerTests {
494494
try await client.withStream(
495495
descriptor: BinaryEcho.Methods.get,
496496
options: .defaults
497-
) { stream in
497+
) { stream, _ in
498498
try await stream.outbound.write(.metadata([:]))
499499
try await stream.outbound.write(.message(Array("hello".utf8)))
500500
await stream.outbound.finish()
@@ -524,7 +524,7 @@ struct ServerTests {
524524
try await client.withStream(
525525
descriptor: BinaryEcho.Methods.collect,
526526
options: .defaults
527-
) { stream in
527+
) { stream, _ in
528528
try await stream.outbound.write(.metadata([:]))
529529
try await stream.outbound.write(.message(Array("hello".utf8)))
530530
await stream.outbound.finish()

Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
2424
@Sendable (
2525
_ method: MethodDescriptor,
2626
_ options: CallOptions,
27-
_ body: (RPCStream<Inbound, Outbound>) async throws -> (any Sendable)
27+
_ body: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> (any Sendable)
2828
) async throws -> Any
2929
private let _connect: @Sendable () async throws -> Void
3030
private let _close: @Sendable () -> Void
@@ -34,8 +34,8 @@ struct AnyClientTransport: ClientTransport, Sendable {
3434
where Transport.Inbound == Inbound, Transport.Outbound == Outbound {
3535
self._retryThrottle = { transport.retryThrottle }
3636
self._withStream = { descriptor, options, closure in
37-
try await transport.withStream(descriptor: descriptor, options: options) { stream in
38-
try await closure(stream) as (any Sendable)
37+
try await transport.withStream(descriptor: descriptor, options: options) { stream, context in
38+
try await closure(stream, context) as (any Sendable)
3939
}
4040
}
4141

@@ -67,7 +67,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
6767
func withStream<T>(
6868
descriptor: MethodDescriptor,
6969
options: CallOptions,
70-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
70+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
7171
) async throws -> T {
7272
let result = try await self._withStream(descriptor, options, closure)
7373
return result as! T

0 commit comments

Comments
 (0)