Skip to content

Commit fe7ff10

Browse files
committed
Adopt ClientContext changes
1 parent 92ad3c0 commit fe7ff10

File tree

7 files changed

+100
-46
lines changed

7 files changed

+100
-46
lines changed

Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,14 @@ extension Connection {
421421

422422
private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
423423

424+
var peerInfo: String {
425+
self.http2Stream.channel.getRemoteAddressInfo()
426+
}
427+
428+
var localInfo: String {
429+
self.http2Stream.channel.getLocalAddressInfo()
430+
}
431+
424432
init(
425433
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
426434
descriptor: MethodDescriptor

Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,11 @@ package final class GRPCChannel: ClientTransport {
198198
self.input.continuation.yield(.close)
199199
}
200200

201-
/// Opens a stream using the transport, and uses it as input into a user-provided closure.
201+
/// Opens a stream using the transport, and uses it as input into a user-provided closure, alongside the client's context.
202202
package func withStream<T: Sendable>(
203203
descriptor: MethodDescriptor,
204204
options: CallOptions,
205-
_ closure: (_ stream: RPCStream<Inbound, Outbound>) async throws -> T
205+
_ closure: (_ stream: RPCStream<Inbound, Outbound>, _ context: ClientContext) async throws -> T
206206
) async throws -> T {
207207
// Merge options from the call with those from the service config.
208208
let methodConfig = self.config(forMethod: descriptor)
@@ -218,7 +218,14 @@ package final class GRPCChannel: ClientTransport {
218218
inbound: RPCAsyncSequence<RPCResponsePart, any Error>(wrapping: inbound),
219219
outbound: RPCWriter.Closable(wrapping: outbound)
220220
)
221-
return try await closure(rpcStream)
221+
let context = ClientContext(
222+
descriptor: descriptor,
223+
remotePeer: stream.peerInfo ,
224+
localPeer: stream.localInfo,
225+
serverHostname: self.authority ?? "<unknown>",
226+
networkTransportMethod: "tcp"
227+
)
228+
return try await closure(rpcStream, context)
222229
}
223230

224231
case .tryAgain(let error):
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2025, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import NIOCore
18+
19+
extension Channel {
20+
func getRemoteAddressInfo() -> String {
21+
guard let remote = self.remoteAddress else {
22+
return "<unknown>"
23+
}
24+
25+
switch remote {
26+
case .v4(let address):
27+
// '!' is safe, v4 always has a port.
28+
return "ipv4:\(address.host):\(remote.port!)"
29+
30+
case .v6(let address):
31+
// '!' is safe, v6 always has a port.
32+
return "ipv6:[\(address.host)]:\(remote.port!)"
33+
34+
case .unixDomainSocket:
35+
// The pathname will be on the local address.
36+
guard let local = self.localAddress else {
37+
// UDS but no local address; this shouldn't ever happen but at least note the transport
38+
// as being UDS.
39+
return "unix:<unknown>"
40+
}
41+
42+
switch local {
43+
case .unixDomainSocket:
44+
// '!' is safe, UDS always has a path.
45+
return "unix:\(local.pathname!)"
46+
47+
case .v4, .v6:
48+
// Remote address is UDS but local isn't. This shouldn't ever happen.
49+
return "unix:<unknown>"
50+
}
51+
}
52+
}
53+
54+
func getLocalAddressInfo() -> String {
55+
guard let local = self.localAddress else {
56+
return "<unknown>"
57+
}
58+
59+
switch local {
60+
case .v4(let address):
61+
// '!' is safe, v4 always has a port.
62+
return "ipv4:\(address.host):\(local.port!)"
63+
64+
case .v6(let address):
65+
// '!' is safe, v6 always has a port.
66+
return "ipv6:[\(address.host)]:\(local.port!)"
67+
68+
case .unixDomainSocket:
69+
// '!' is safe, UDS always has a path.
70+
return "unix:\(local.pathname!)"
71+
}
72+
}
73+
}

Sources/GRPCNIOTransportCore/Server/CommonHTTP2ServerTransport.swift

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -191,40 +191,6 @@ package final class CommonHTTP2ServerTransport<
191191
}
192192
}
193193

194-
private func peerInfo(channel: any Channel) -> String {
195-
guard let remote = channel.remoteAddress else {
196-
return "<unknown>"
197-
}
198-
199-
switch remote {
200-
case .v4(let address):
201-
// '!' is safe, v4 always has a port.
202-
return "ipv4:\(address.host):\(remote.port!)"
203-
204-
case .v6(let address):
205-
// '!' is safe, v6 always has a port.
206-
return "ipv6:[\(address.host)]:\(remote.port!)"
207-
208-
case .unixDomainSocket:
209-
// The pathname will be on the local address.
210-
guard let local = channel.localAddress else {
211-
// UDS but no local address; this shouldn't ever happen but at least note the transport
212-
// as being UDS.
213-
return "unix:<unknown>"
214-
}
215-
216-
switch local {
217-
case .unixDomainSocket:
218-
// '!' is safe, UDS always has a path.
219-
return "unix:\(local.pathname!)"
220-
221-
case .v4, .v6:
222-
// Remote address is UDS but local isn't. This shouldn't ever happen.
223-
return "unix:<unknown>"
224-
}
225-
}
226-
}
227-
228194
private func handleConnection(
229195
_ connection: NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
230196
multiplexer: ChannelPipeline.SynchronousOperations.HTTP2StreamMultiplexer,
@@ -233,7 +199,7 @@ package final class CommonHTTP2ServerTransport<
233199
_ context: ServerContext
234200
) async -> Void
235201
) async throws {
236-
let peer = self.peerInfo(channel: connection.channel)
202+
let peer = connection.channel.getRemoteAddressInfo()
237203
try await connection.executeThenClose { inbound, _ in
238204
await withDiscardingTaskGroup { group in
239205
group.addTask {

Sources/GRPCNIOTransportHTTP2Posix/HTTP2ClientTransport+Posix.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ extension HTTP2ClientTransport {
120120
public func withStream<T: Sendable>(
121121
descriptor: MethodDescriptor,
122122
options: CallOptions,
123-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
123+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
124124
) async throws -> T {
125125
try await self.channel.withStream(descriptor: descriptor, options: options, closure)
126126
}

Sources/GRPCNIOTransportHTTP2TransportServices/HTTP2ClientTransport+TransportServices.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ extension HTTP2ClientTransport {
118118
public func withStream<T: Sendable>(
119119
descriptor: MethodDescriptor,
120120
options: CallOptions,
121-
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
121+
_ closure: (RPCStream<Inbound, Outbound>, ClientContext) async throws -> T
122122
) async throws -> T {
123123
try await self.channel.withStream(descriptor: descriptor, options: options, closure)
124124
}

Tests/GRPCNIOTransportCoreTests/Client/Connection/GRPCChannelTests.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ final class GRPCChannelTests: XCTestCase {
353353
await channel.connect()
354354
}
355355

356-
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
356+
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
357357
try await stream.outbound.write(.metadata([:]))
358358

359359
var iterator = stream.inbound.makeAsyncIterator()
@@ -441,7 +441,7 @@ final class GRPCChannelTests: XCTestCase {
441441
// be queued though.
442442
for _ in 1 ... 100 {
443443
group.addTask {
444-
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
444+
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
445445
try await stream.outbound.write(.metadata([:]))
446446
await stream.outbound.finish()
447447

@@ -510,7 +510,7 @@ final class GRPCChannelTests: XCTestCase {
510510
options.waitForReady = false
511511

512512
await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
513-
try await channel.withStream(descriptor: .echoGet, options: options) { _ in
513+
try await channel.withStream(descriptor: .echoGet, options: options) { _, _ in
514514
XCTFail("Unexpected stream")
515515
}
516516
} errorHandler: { error in
@@ -780,7 +780,7 @@ final class GRPCChannelTests: XCTestCase {
780780

781781
// Try to open a new stream.
782782
await XCTAssertThrowsErrorAsync(ofType: RPCError.self) {
783-
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
783+
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
784784
XCTFail("Unexpected new stream")
785785
}
786786
} errorHandler: { error in
@@ -823,7 +823,7 @@ final class GRPCChannelTests: XCTestCase {
823823
}
824824

825825
func doAnRPC() async throws {
826-
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
826+
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream, _ in
827827
try await stream.outbound.write(.metadata([:]))
828828
await stream.outbound.finish()
829829

@@ -873,7 +873,7 @@ extension GRPCChannel {
873873
let values: Metadata.StringValues? = try await self.withStream(
874874
descriptor: .echoGet,
875875
options: .defaults
876-
) { stream in
876+
) { stream, _ in
877877
try await stream.outbound.write(.metadata([:]))
878878
await stream.outbound.finish()
879879

0 commit comments

Comments
 (0)