Skip to content

Commit 4bcdd3b

Browse files
committed
Add OTel attributes and rename to ClienOTelTracingInterceptor
1 parent efd8b66 commit 4bcdd3b

File tree

3 files changed

+254
-140
lines changed

3 files changed

+254
-140
lines changed

Sources/GRPCInterceptors/ClientTracingInterceptor.swift

Lines changed: 0 additions & 140 deletions
This file was deleted.
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
public import GRPCCore
18+
internal import Tracing
19+
internal import Synchronization
20+
21+
/// A client interceptor that injects tracing information into the request.
22+
///
23+
/// The tracing information is taken from the current `ServiceContext`, and injected into the request's
24+
/// metadata. It will then be picked up by the server-side ``ServerTracingInterceptor``.
25+
///
26+
/// For more information, refer to the documentation for `swift-distributed-tracing`.
27+
public struct ClientOTelTracingInterceptor: ClientInterceptor {
28+
private let injector: ClientRequestInjector
29+
private let emitEventOnEachWrite: Bool
30+
private var serverHostname: String
31+
private var networkTransportMethod: String
32+
33+
/// Create a new instance of a ``ClientOTelTracingInterceptor``.
34+
///
35+
/// - Parameters:
36+
/// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
37+
/// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
38+
/// `network.transport` attribute in spans.
39+
/// - emitEventOnEachWrite: If `true`, each request part sent and response part received will be recorded as a separate
40+
/// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
41+
public init(
42+
serverHostname: String,
43+
networkTransportMethod: String,
44+
emitEventOnEachWrite: Bool = false
45+
) {
46+
self.injector = ClientRequestInjector()
47+
self.serverHostname = serverHostname
48+
self.networkTransportMethod = networkTransportMethod
49+
self.emitEventOnEachWrite = emitEventOnEachWrite
50+
}
51+
52+
/// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs
53+
/// have been made available by the tracing implementation bootstrapped in your application.
54+
///
55+
/// Which key-value pairs are injected will depend on the specific tracing implementation
56+
/// that has been configured when bootstrapping `swift-distributed-tracing` in your application.
57+
///
58+
/// It will also inject all required and recommended span and event attributes, and set span status, as defined by OpenTelemetry's
59+
/// documentation on:
60+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
61+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
62+
public func intercept<Input, Output>(
63+
request: StreamingClientRequest<Input>,
64+
context: ClientContext,
65+
next: (
66+
StreamingClientRequest<Input>,
67+
ClientContext
68+
) async throws -> StreamingClientResponse<Output>
69+
) async throws -> StreamingClientResponse<Output> where Input: Sendable, Output: Sendable {
70+
var request = request
71+
let tracer = InstrumentationSystem.tracer
72+
let serviceContext = ServiceContext.current ?? .topLevel
73+
74+
tracer.inject(
75+
serviceContext,
76+
into: &request.metadata,
77+
using: self.injector
78+
)
79+
80+
return try await tracer.withSpan(
81+
context.descriptor.fullyQualifiedMethod,
82+
context: serviceContext,
83+
ofKind: .client
84+
) { span in
85+
self.setOTelSpanAttributes(into: span, context: context)
86+
87+
span.addEvent("Request started")
88+
89+
let wrappedProducer = request.producer
90+
if self.emitEventOnEachWrite {
91+
request.producer = { writer in
92+
let messageSentCounter = Atomic(1)
93+
let eventEmittingWriter = HookedWriter(
94+
wrapping: writer,
95+
beforeEachWrite: {},
96+
afterEachWrite: {
97+
var event = SpanEvent(name: "rpc.message")
98+
event.attributes.rpc.messageType = "SENT"
99+
event.attributes.rpc.messageID = messageSentCounter
100+
.wrappingAdd(1, ordering: .sequentiallyConsistent)
101+
.oldValue
102+
span.addEvent(event)
103+
}
104+
)
105+
try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter))
106+
span.addEvent("Request ended")
107+
}
108+
} else {
109+
request.producer = { writer in
110+
try await wrappedProducer(RPCWriter(wrapping: writer))
111+
span.addEvent("Request ended")
112+
}
113+
}
114+
115+
var response = try await next(request, context)
116+
switch response.accepted {
117+
case .success(var success):
118+
span.addEvent("Received response start")
119+
span.attributes.rpc.grpcStatusCode = 0
120+
if self.emitEventOnEachWrite {
121+
let messageReceivedCounter = Atomic(1)
122+
let onEachPartRecordingSequence = success.bodyParts.map { element in
123+
var event = SpanEvent(name: "rpc.message")
124+
event.attributes.rpc.messageType = "RECEIVED"
125+
event.attributes.rpc.messageID = messageReceivedCounter
126+
.wrappingAdd(1, ordering: .sequentiallyConsistent)
127+
.oldValue
128+
span.addEvent(event)
129+
return element
130+
}
131+
132+
let onFinishRecordingSequence = OnFinishAsyncSequence(
133+
wrapping: onEachPartRecordingSequence
134+
) {
135+
span.addEvent("Received response end")
136+
}
137+
138+
success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
139+
response.accepted = .success(success)
140+
} else {
141+
let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) {
142+
span.addEvent("Received response end")
143+
}
144+
145+
success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
146+
response.accepted = .success(success)
147+
}
148+
149+
case .failure(let error):
150+
span.attributes.rpc.grpcStatusCode = error.code.rawValue
151+
span.setStatus(SpanStatus(code: .error))
152+
span.addEvent("Received error response")
153+
span.recordError(error)
154+
}
155+
156+
return response
157+
}
158+
}
159+
160+
private func setOTelSpanAttributes(into span: any Span, context: ClientContext) {
161+
span.attributes.rpc.system = "grpc"
162+
span.attributes.rpc.service = context.descriptor.service.fullyQualifiedService
163+
span.attributes.rpc.method = context.descriptor.method
164+
span.attributes.rpc.serverAddress = self.serverHostname
165+
span.attributes.rpc.networkTransport = self.networkTransportMethod
166+
167+
let peer = context.remotePeer
168+
// We expect this address to be of either of these two formats:
169+
// - <type>:<host>:<port> for ipv4 and ipv6 addresses
170+
// - unix:<uds-pathname> for UNIX domain sockets
171+
let components = peer.split(separator: ":")
172+
if components.count == 2 {
173+
// This is the UDS case
174+
span.attributes.rpc.networkType = String(components[0])
175+
span.attributes.rpc.networkPeerAddress = String(components[1])
176+
} else if components.count == 3 {
177+
// This is the ipv4 or ipv6 case
178+
span.attributes.rpc.networkType = String(components[0])
179+
span.attributes.rpc.networkPeerAddress = String(components[1])
180+
span.attributes.rpc.networkPeerPort = Int(components[2])
181+
span.attributes.rpc.serverPort = Int(components[2])
182+
}
183+
}
184+
}
185+
186+
/// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into
187+
/// the request metadata.
188+
struct ClientRequestInjector: Instrumentation.Injector {
189+
typealias Carrier = Metadata
190+
191+
func inject(_ value: String, forKey key: String, into carrier: inout Carrier) {
192+
carrier.addString(value, forKey: key)
193+
}
194+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2025, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Tracing
18+
19+
@dynamicMemberLookup
20+
package struct RPCAttributes: SpanAttributeNamespace {
21+
var attributes: SpanAttributes
22+
23+
init(attributes: SpanAttributes) {
24+
self.attributes = attributes
25+
}
26+
27+
struct NestedSpanAttributes: NestedSpanAttributesProtocol {
28+
init() {}
29+
30+
var system: Key<String> { "rpc.system" }
31+
var method: Key<String> { "rpc.method" }
32+
var service: Key<String> { "rpc.service" }
33+
var messageID: Key<Int> { "rpc.message.id" }
34+
var messageType: Key<String> { "rpc.message.type" }
35+
var grpcStatusCode: Key<Int> { "rpc.grpc.status_code" }
36+
37+
var serverAddress: Key<String>{ "server.address" }
38+
var serverPort: Key<Int> { "server.port" }
39+
40+
var clientAddress: Key<String> { "client.address" }
41+
var clientPort: Key<Int> { "client.port" }
42+
43+
var networkTransport: Key<String> { "network.transport" }
44+
var networkType: Key<String> { "network.type" }
45+
var networkPeerAddress: Key<String> { "network.peer.address" }
46+
var networkPeerPort: Key<Int> { "network.peer.port" }
47+
}
48+
}
49+
50+
package extension SpanAttributes {
51+
/// Semantic conventions for RPC spans.
52+
var rpc: RPCAttributes {
53+
get {
54+
.init(attributes: self)
55+
}
56+
set {
57+
self = newValue.attributes
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)