Skip to content

Commit 01186b1

Browse files
committed
PR changes
1 parent 34bbf9f commit 01186b1

File tree

6 files changed

+226
-258
lines changed

6 files changed

+226
-258
lines changed

Sources/GRPCInterceptors/HookedAsyncSequence.swift

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,23 @@ where Wrapped.Element: Sendable {
1919
private let wrapped: Wrapped
2020

2121
private let forEachElement: @Sendable (Wrapped.Element) -> Void
22-
private let onFinish: @Sendable () -> Void
23-
private let onFailure: @Sendable (any Error) -> Void
22+
private let onFinish: @Sendable ((any Error)?) -> Void
2423

2524
init(
2625
wrapping sequence: Wrapped,
2726
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
28-
onFinish: @escaping @Sendable () -> Void,
29-
onFailure: @escaping @Sendable (any Error) -> Void
27+
onFinish: @escaping @Sendable ((any Error)?) -> Void
3028
) {
3129
self.wrapped = sequence
3230
self.forEachElement = forEachElement
3331
self.onFinish = onFinish
34-
self.onFailure = onFailure
3532
}
3633

3734
func makeAsyncIterator() -> HookedAsyncIterator {
3835
HookedAsyncIterator(
3936
self.wrapped,
4037
forEachElement: self.forEachElement,
41-
onFinish: self.onFinish,
42-
onFailure: self.onFailure
38+
onFinish: self.onFinish
4339
)
4440
}
4541

@@ -48,19 +44,16 @@ where Wrapped.Element: Sendable {
4844

4945
private var wrapped: Wrapped.AsyncIterator
5046
private let forEachElement: @Sendable (Wrapped.Element) -> Void
51-
private let onFinish: @Sendable () -> Void
52-
private let onFailure: @Sendable (any Error) -> Void
47+
private let onFinish: @Sendable ((any Error)?) -> Void
5348

5449
init(
5550
_ sequence: Wrapped,
5651
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
57-
onFinish: @escaping @Sendable () -> Void,
58-
onFailure: @escaping @Sendable (any Error) -> Void
52+
onFinish: @escaping @Sendable ((any Error)?) -> Void
5953
) {
6054
self.wrapped = sequence.makeAsyncIterator()
6155
self.forEachElement = forEachElement
6256
self.onFinish = onFinish
63-
self.onFailure = onFailure
6457
}
6558

6659
mutating func next(
@@ -70,29 +63,18 @@ where Wrapped.Element: Sendable {
7063
if let element = try await self.wrapped.next(isolation: actor) {
7164
self.forEachElement(element)
7265
return element
66+
} else {
67+
self.onFinish(nil)
68+
return nil
7369
}
74-
75-
self.onFinish()
76-
return nil
7770
} catch {
78-
self.onFailure(error)
71+
self.onFinish(error)
7972
throw error
8073
}
8174
}
8275

8376
mutating func next() async throws -> Wrapped.Element? {
84-
do {
85-
if let element = try await self.wrapped.next() {
86-
self.forEachElement(element)
87-
return element
88-
}
89-
90-
self.onFinish()
91-
return nil
92-
} catch {
93-
self.onFailure(error)
94-
throw error
95-
}
77+
try await self.next(isolation: nil)
9678
}
9779
}
9880
}

Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
3434
///
3535
/// - Parameters:
3636
/// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
37-
/// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
37+
/// - networkTransportMethod: The transport in use (e.g. "tcp", "unix"). This will be the value for the
3838
/// `network.transport` attribute in spans.
3939
/// - traceEachMessage: If `true`, each request part sent and response part received will be recorded as a separate
4040
/// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
@@ -113,8 +113,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
113113
wrapping: writer,
114114
afterEachWrite: {
115115
var event = SpanEvent(name: "rpc.message")
116-
event.attributes.rpc.messageType = "SENT"
117-
event.attributes.rpc.messageID =
116+
event.attributes[GRPCTracingKeys.rpcMessageType] = "SENT"
117+
event.attributes[GRPCTracingKeys.rpcMessageID] =
118118
messageSentCounter
119119
.wrappingAdd(1, ordering: .sequentiallyConsistent)
120120
.oldValue
@@ -136,40 +136,44 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
136136
let messageReceivedCounter = Atomic(1)
137137
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
138138
var event = SpanEvent(name: "rpc.message")
139-
event.attributes.rpc.messageType = "RECEIVED"
140-
event.attributes.rpc.messageID =
139+
event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED"
140+
event.attributes[GRPCTracingKeys.rpcMessageID] =
141141
messageReceivedCounter
142142
.wrappingAdd(1, ordering: .sequentiallyConsistent)
143143
.oldValue
144144
span.addEvent(event)
145-
} onFinish: {
146-
span.attributes.rpc.grpcStatusCode = 0
147-
} onFailure: { error in
148-
if let rpcError = error as? RPCError {
149-
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
145+
} onFinish: { error in
146+
if let error {
147+
if let errorCode = error.grpcErrorCode {
148+
span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue
149+
}
150+
span.setStatus(SpanStatus(code: .error))
151+
span.recordError(error)
152+
} else {
153+
span.attributes[GRPCTracingKeys.grpcStatusCode] = 0
150154
}
151-
span.setStatus(SpanStatus(code: .error))
152-
span.recordError(error)
153155
}
154156
} else {
155157
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
156158
// Nothing to do if traceEachMessage is false
157-
} onFinish: {
158-
span.attributes.rpc.grpcStatusCode = 0
159-
} onFailure: { error in
160-
if let rpcError = error as? RPCError {
161-
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
159+
} onFinish: { error in
160+
if let error {
161+
if let errorCode = error.grpcErrorCode {
162+
span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue
163+
}
164+
span.setStatus(SpanStatus(code: .error))
165+
span.recordError(error)
166+
} else {
167+
span.attributes[GRPCTracingKeys.grpcStatusCode] = 0
162168
}
163-
span.setStatus(SpanStatus(code: .error))
164-
span.recordError(error)
165169
}
166170
}
167171

168172
success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence)
169173
response.accepted = .success(success)
170174

171175
case .failure(let error):
172-
span.attributes.rpc.grpcStatusCode = error.code.rawValue
176+
span.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue
173177
span.setStatus(SpanStatus(code: .error))
174178
span.recordError(error)
175179
}
@@ -188,3 +192,15 @@ struct ClientRequestInjector: Instrumentation.Injector {
188192
carrier.addString(value, forKey: key)
189193
}
190194
}
195+
196+
extension Error {
197+
var grpcErrorCode: RPCError.Code? {
198+
if let rpcError = self as? RPCError {
199+
return rpcError.code
200+
} else if let rpcError = self as? any RPCErrorConvertible {
201+
return rpcError.rpcErrorCode
202+
} else {
203+
return nil
204+
}
205+
}
206+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright 2025, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
internal import GRPCCore
18+
internal import Tracing
19+
20+
public enum GRPCTracingKeys {
21+
static let rpcSystem = "rpc.system"
22+
static let rpcMethod = "rpc.method"
23+
static let rpcService = "rpc.service"
24+
static let rpcMessageID = "rpc.message.id"
25+
static let rpcMessageType = "rpc.message.type"
26+
static let grpcStatusCode = "rpc.grpc.status_code"
27+
28+
static let serverAddress = "server.address"
29+
static let serverPort = "server.port"
30+
31+
static let clientAddress = "client.address"
32+
static let clientPort = "client.port"
33+
34+
static let networkTransport = "network.transport"
35+
static let networkType = "network.type"
36+
static let networkPeerAddress = "network.peer.address"
37+
static let networkPeerPort = "network.peer.port"
38+
}
39+
40+
extension Span {
41+
// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/
42+
func setOTelClientSpanGRPCAttributes(
43+
context: ClientContext,
44+
serverHostname: String,
45+
networkTransportMethod: String
46+
) {
47+
self.attributes[GRPCTracingKeys.rpcSystem] = "grpc"
48+
self.attributes[GRPCTracingKeys.serverAddress] = serverHostname
49+
self.attributes[GRPCTracingKeys.networkTransport] = networkTransportMethod
50+
self.attributes[GRPCTracingKeys.rpcService] = context.descriptor.service.fullyQualifiedService
51+
self.attributes[GRPCTracingKeys.rpcMethod] = context.descriptor.method
52+
53+
// Set server address information
54+
switch PeerAddress(context.remotePeer) {
55+
case .ipv4(let address, let port):
56+
self.attributes[GRPCTracingKeys.networkType] = "ipv4"
57+
self.attributes[GRPCTracingKeys.networkPeerAddress] = address
58+
self.attributes[GRPCTracingKeys.networkPeerPort] = port
59+
self.attributes[GRPCTracingKeys.serverPort] = port
60+
61+
case .ipv6(let address, let port):
62+
self.attributes[GRPCTracingKeys.networkType] = "ipv6"
63+
self.attributes[GRPCTracingKeys.networkPeerAddress] = address
64+
self.attributes[GRPCTracingKeys.networkPeerPort] = port
65+
self.attributes[GRPCTracingKeys.serverPort] = port
66+
67+
case .unixDomainSocket(let path):
68+
self.attributes[GRPCTracingKeys.networkType] = "unix"
69+
self.attributes[GRPCTracingKeys.networkPeerAddress] = path
70+
71+
case .other(let address):
72+
// We can't nicely format the span attributes to contain the appropriate information here,
73+
// so include the whole thing as part of the peer address.
74+
self.attributes[GRPCTracingKeys.networkPeerAddress] = address
75+
}
76+
}
77+
}
78+
79+
package enum PeerAddress: Equatable {
80+
case ipv4(address: String, port: Int?)
81+
case ipv6(address: String, port: Int?)
82+
case unixDomainSocket(path: String)
83+
case other(String)
84+
85+
package init(_ address: String) {
86+
// We expect this address to be of one of these formats:
87+
// - ipv4:<host>:<port> for ipv4 addresses
88+
// - ipv6:[<host>]:<port> for ipv6 addresses
89+
// - unix:<uds-pathname> for UNIX domain sockets
90+
let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false)
91+
92+
guard addressComponents.count > 1 else {
93+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
94+
self = .other(address)
95+
return
96+
}
97+
98+
// Check what type the transport is...
99+
switch addressComponents[0] {
100+
case "ipv4":
101+
guard addressComponents.count == 3, let port = Int(addressComponents[2]) else {
102+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
103+
self = .other(address)
104+
return
105+
}
106+
self = .ipv4(address: String(addressComponents[1]), port: port)
107+
108+
case "ipv6":
109+
guard addressComponents.count > 2, let port = Int(addressComponents.last!) else {
110+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
111+
self = .other(address)
112+
return
113+
}
114+
self = .ipv6(
115+
address: String(
116+
addressComponents[1 ..< addressComponents.count - 1].joined(separator: ":")
117+
),
118+
port: port
119+
)
120+
121+
case "unix":
122+
guard addressComponents.count == 2 else {
123+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
124+
self = .other(address)
125+
return
126+
}
127+
self = .unixDomainSocket(path: String(addressComponents[1]))
128+
129+
default:
130+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
131+
self = .other(address)
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)