Skip to content

Commit b005418

Browse files
authored
Merge branch 'main' into v2/conditional-interceptor
2 parents 85f92f6 + 4214504 commit b005418

21 files changed

+186
-72
lines changed

Sources/GRPCCodeGen/Internal/StructuredSwift+ServiceMetadata.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ extension VariableDescription {
4545
/// static let descriptor = GRPCCore.MethodDescriptor(
4646
/// service: GRPCCore.ServiceDescriptor(fullyQualifiedServiceName: "<literalFullyQualifiedService>"),
4747
/// method: "<literalMethodName>"
48+
/// )
4849
/// ```
4950
package static func methodDescriptor(
5051
accessModifier: AccessModifier? = nil,

Sources/GRPCCore/Call/Client/ClientContext.swift

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,45 @@ public struct ClientContext: Sendable {
1919
/// A description of the method being called.
2020
public var descriptor: MethodDescriptor
2121

22+
/// A description of the remote peer.
23+
///
24+
/// The format of the description should follow the pattern "<transport>:<address>" where
25+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
26+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
27+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
28+
///
29+
/// Some examples include:
30+
/// - "ipv4:127.0.0.1:31415",
31+
/// - "ipv6:[::1]:443",
32+
/// - "in-process:27182".
33+
public var remotePeer: String
34+
35+
/// A description of the local peer.
36+
///
37+
/// The format of the description should follow the pattern "<transport>:<address>" where
38+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
39+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
40+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
41+
///
42+
/// Some examples include:
43+
/// - "ipv4:127.0.0.1:31415",
44+
/// - "ipv6:[::1]:443",
45+
/// - "in-process:27182".
46+
public var localPeer: String
47+
2248
/// Create a new client interceptor context.
23-
public init(descriptor: MethodDescriptor) {
49+
///
50+
/// - Parameters:
51+
/// - descriptor: A description of the method being called.
52+
/// - remotePeer: A description of the remote peer.
53+
/// - localPeer: A description of the local peer.
54+
public init(
55+
descriptor: MethodDescriptor,
56+
remotePeer: String,
57+
localPeer: String
58+
) {
2459
self.descriptor = descriptor
60+
self.remotePeer = remotePeer
61+
self.localPeer = localPeer
2562
}
2663
}

Sources/GRPCCore/Call/Client/ClientInterceptor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@
1414
* limitations under the License.
1515
*/
1616

17+
// - FIXME: Update example and documentation to show how to register an interceptor.
18+
1719
/// A type that intercepts requests and response for clients.
1820
///
1921
/// Interceptors allow you to inspect and modify requests and responses. Requests are intercepted
2022
/// before they are handed to a transport and responses are intercepted after they have been
2123
/// received from the transport. They are typically used for cross-cutting concerns like injecting
2224
/// metadata, validating messages, logging additional data, and tracing.
2325
///
24-
/// Interceptors are registered with the server via ``ConditionalInterceptor``s.
26+
/// Interceptors are registered with the client via ``ConditionalInterceptor``s.
2527
/// You may register them for all services registered with a server, for RPCs directed to specific services, or
2628
/// for RPCs directed to specific methods. If you need to modify the behavior of an interceptor on a
2729
/// per-RPC basis in more detail, then you can use the ``ClientContext/descriptor`` to determine
2830
/// which RPC is being called and conditionalise behavior accordingly.
2931
///
30-
/// - TODO: Update example and documentation to show how to register an interceptor.
31-
///
3232
/// Some examples of simple interceptors follow.
3333
///
3434
/// ## Metadata injection

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ extension ClientRPCExecutor.HedgingExecutor {
322322
return try await self.transport.withStream(
323323
descriptor: method,
324324
options: options
325-
) { stream -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
325+
) { stream, context -> _HedgingAttemptTaskResult<R, Output>.AttemptResult in
326326
return await withTaskGroup(of: _HedgingAttemptTaskResult<R, Output>.self) { group in
327327
group.addTask {
328328
do {
@@ -348,8 +348,8 @@ extension ClientRPCExecutor.HedgingExecutor {
348348

349349
let response = await ClientRPCExecutor._execute(
350350
in: &group,
351+
context: context,
351352
request: request,
352-
method: method,
353353
attempt: attempt,
354354
serializer: self.serializer,
355355
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,14 @@ extension ClientRPCExecutor.OneShotExecutor {
9898
) async -> Result<R, any Error> {
9999
return await withTaskGroup(of: Void.self, returning: Result<R, any Error>.self) { group in
100100
do {
101-
return try await self.transport.withStream(descriptor: method, options: options) { stream in
101+
return try await self.transport.withStream(
102+
descriptor: method,
103+
options: options
104+
) { stream, context in
102105
let response = await ClientRPCExecutor._execute(
103106
in: &group,
107+
context: context,
104108
request: request,
105-
method: method,
106109
attempt: 1,
107110
serializer: self.serializer,
108111
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ extension ClientRPCExecutor.RetryExecutor {
118118
let attemptResult = try await self.transport.withStream(
119119
descriptor: method,
120120
options: options
121-
) { stream in
121+
) { stream, context in
122122
group.addTask {
123123
var metadata = request.metadata
124124
// Work out the timeout from the deadline.
@@ -127,6 +127,7 @@ extension ClientRPCExecutor.RetryExecutor {
127127
}
128128

129129
return await self.executeAttempt(
130+
context: context,
130131
stream: stream,
131132
metadata: metadata,
132133
retryStream: retry.stream,
@@ -194,6 +195,7 @@ extension ClientRPCExecutor.RetryExecutor {
194195

195196
@inlinable
196197
func executeAttempt<R: Sendable>(
198+
context: ClientContext,
197199
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
198200
metadata: Metadata,
199201
retryStream: BroadcastAsyncSequence<Input>,
@@ -211,8 +213,8 @@ extension ClientRPCExecutor.RetryExecutor {
211213

212214
let response = await ClientRPCExecutor._execute(
213215
in: &group,
216+
context: context,
214217
request: request,
215-
method: method,
216218
attempt: attempt,
217219
serializer: self.serializer,
218220
deserializer: self.deserializer,

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,25 @@ extension ClientRPCExecutor {
104104
///
105105
/// - Parameters:
106106
/// - request: The request to execute.
107-
/// - method: A description of the method to execute the request against.
107+
/// - context: The ``ClientContext`` related to this request.
108108
/// - attempt: The attempt number of the request.
109109
/// - serializer: A serializer to convert input messages to bytes.
110110
/// - deserializer: A deserializer to convert bytes to output messages.
111111
/// - interceptors: An array of interceptors which the request and response pass through. The
112112
/// interceptors will be called in the order of the array.
113+
/// - stream: The stream to excecute the RPC on.
113114
/// - Returns: The deserialized response.
114115
@inlinable // would be private
115116
static func _execute<Input: Sendable, Output: Sendable>(
116117
in group: inout TaskGroup<Void>,
118+
context: ClientContext,
117119
request: StreamingClientRequest<Input>,
118-
method: MethodDescriptor,
119120
attempt: Int,
120121
serializer: some MessageSerializer<Input>,
121122
deserializer: some MessageDeserializer<Output>,
122123
interceptors: [any ClientInterceptor],
123124
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
124125
) async -> StreamingClientResponse<Output> {
125-
let context = ClientContext(descriptor: method)
126126

127127
if interceptors.isEmpty {
128128
return await ClientStreamExecutor.execute(

Sources/GRPCCore/Call/Server/ServerContext.swift

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,37 @@ public struct ServerContext: Sendable {
3030
/// - "ipv4:127.0.0.1:31415",
3131
/// - "ipv6:[::1]:443",
3232
/// - "in-process:27182".
33-
public var peer: String
33+
@available(*, deprecated, renamed: "remotePeer")
34+
public var peer: String {
35+
get { remotePeer }
36+
set { remotePeer = newValue }
37+
}
38+
39+
/// A description of the remote peer.
40+
///
41+
/// The format of the description should follow the pattern "<transport>:<address>" where
42+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
43+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
44+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
45+
///
46+
/// Some examples include:
47+
/// - "ipv4:127.0.0.1:31415",
48+
/// - "ipv6:[::1]:443",
49+
/// - "in-process:27182".
50+
public var remotePeer: String
51+
52+
/// A description of the local peer.
53+
///
54+
/// The format of the description should follow the pattern "<transport>:<address>" where
55+
/// "<transport>" indicates the underlying network transport (such as "ipv4", "unix", or
56+
/// "in-process"). This is a guideline for how descriptions should be formatted; different
57+
/// implementations may not follow this format so you shouldn't make assumptions based on it.
58+
///
59+
/// Some examples include:
60+
/// - "ipv4:127.0.0.1:31415",
61+
/// - "ipv6:[::1]:443",
62+
/// - "in-process:27182".
63+
public var localPeer: String
3464

3565
/// A handle for checking the cancellation status of an RPC.
3666
public var cancellation: RPCCancellationHandle
@@ -39,16 +69,19 @@ public struct ServerContext: Sendable {
3969
///
4070
/// - Parameters:
4171
/// - descriptor: A description of the method being called.
42-
/// - peer: A description of the remote peer.
72+
/// - remotePeer: A description of the remote peer.
73+
/// - localPeer: A description of the local peer.
4374
/// - cancellation: A cancellation handle. You can create a cancellation handle
4475
/// using ``withServerContextRPCCancellationHandle(_:)``.
4576
public init(
4677
descriptor: MethodDescriptor,
47-
peer: String,
78+
remotePeer: String,
79+
localPeer: String,
4880
cancellation: RPCCancellationHandle
4981
) {
5082
self.descriptor = descriptor
51-
self.peer = peer
83+
self.remotePeer = remotePeer
84+
self.localPeer = localPeer
5285
self.cancellation = cancellation
5386
}
5487
}

Sources/GRPCCore/Transport/ClientTransport.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public protocol ClientTransport: Sendable {
4747
/// running ``connect()``.
4848
func beginGracefulShutdown()
4949

50-
/// Opens a stream using the transport, and uses it as input into a user-provided closure.
50+
/// Opens a stream using the transport, and uses it as input into a user-provided closure alongisde the given context.
5151
///
5252
/// - Important: The opened stream is closed after the closure is finished.
5353
///
@@ -59,12 +59,12 @@ public protocol ClientTransport: Sendable {
5959
/// - Parameters:
6060
/// - descriptor: A description of the method to open a stream for.
6161
/// - options: Options specific to the stream.
62-
/// - closure: A closure that takes the opened stream as parameter.
62+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
6363
/// - Returns: Whatever value was returned from `closure`.
6464
func withStream<T: Sendable>(
6565
descriptor: MethodDescriptor,
6666
options: CallOptions,
67-
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
67+
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
6868
) async throws -> T
6969

7070
/// Returns the configuration for a given method.

Sources/GRPCInProcessTransport/InProcessTransport+Client.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,23 @@ extension InProcessTransport {
103103

104104
private let methodConfig: MethodConfigs
105105
private let state: Mutex<State>
106+
private let peer: String
106107

107108
/// Creates a new in-process client transport.
108109
///
109110
/// - Parameters:
110111
/// - server: The in-process server transport to connect to.
111112
/// - serviceConfig: Service configuration.
113+
/// - peer: The system's PID for the running client and server.
112114
package init(
113115
server: InProcessTransport.Server,
114-
serviceConfig: ServiceConfig = ServiceConfig()
116+
serviceConfig: ServiceConfig = ServiceConfig(),
117+
peer: String
115118
) {
116119
self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
117120
self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
118121
self.state = Mutex(.unconnected(.init(serverTransport: server)))
122+
self.peer = peer
119123
}
120124

121125
/// Establish and maintain a connection to the remote destination.
@@ -225,12 +229,12 @@ extension InProcessTransport {
225229
/// - Parameters:
226230
/// - descriptor: A description of the method to open a stream for.
227231
/// - options: Options specific to the stream.
228-
/// - closure: A closure that takes the opened stream as parameter.
232+
/// - closure: A closure that takes the opened stream and the client context as its parameters.
229233
/// - Returns: Whatever value was returned from `closure`.
230234
public func withStream<T>(
231235
descriptor: MethodDescriptor,
232236
options: CallOptions,
233-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
237+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
234238
) async throws -> T {
235239
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
236240
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
@@ -297,11 +301,17 @@ extension InProcessTransport {
297301
}
298302
}
299303

304+
let clientContext = ClientContext(
305+
descriptor: descriptor,
306+
remotePeer: self.peer,
307+
localPeer: self.peer
308+
)
309+
300310
switch acceptStream {
301311
case .success(let streamID):
302312
let streamHandlingResult: Result<T, any Error>
303313
do {
304-
let result = try await closure(clientStream)
314+
let result = try await closure(clientStream, clientContext)
305315
streamHandlingResult = .success(result)
306316
} catch {
307317
streamHandlingResult = .failure(error)

0 commit comments

Comments
 (0)