Skip to content

Commit 236138b

Browse files
committed
Add OTel attributes to server tracing interceptor and rename
1 parent 1d1f018 commit 236138b

File tree

5 files changed

+760
-219
lines changed

5 files changed

+760
-219
lines changed

Sources/GRPCInterceptors/HookedWriter.swift

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,22 @@ internal import Tracing
1818

1919
struct HookedWriter<Element: Sendable>: RPCWriterProtocol {
2020
private let writer: any RPCWriterProtocol<Element>
21-
private let beforeEachWrite: @Sendable () -> Void
2221
private let afterEachWrite: @Sendable () -> Void
2322

2423
init(
2524
wrapping other: some RPCWriterProtocol<Element>,
26-
beforeEachWrite: @Sendable @escaping () -> Void,
2725
afterEachWrite: @Sendable @escaping () -> Void
2826
) {
2927
self.writer = other
30-
self.beforeEachWrite = beforeEachWrite
3128
self.afterEachWrite = afterEachWrite
3229
}
3330

3431
func write(_ element: Element) async throws {
35-
self.beforeEachWrite()
3632
try await self.writer.write(element)
3733
self.afterEachWrite()
3834
}
3935

4036
func write(contentsOf elements: some Sequence<Element>) async throws {
41-
self.beforeEachWrite()
4237
try await self.writer.write(contentsOf: elements)
4338
self.afterEachWrite()
4439
}

Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024, gRPC Authors All rights reserved.
2+
* Copyright 2024-2025, gRPC Authors All rights reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,9 +21,14 @@ internal import Tracing
2121
/// A client interceptor that injects tracing information into the request.
2222
///
2323
/// The tracing information is taken from the current `ServiceContext`, and injected into the request's
24-
/// metadata. It will then be picked up by the server-side ``ServerTracingInterceptor``.
24+
/// metadata. It will then be picked up by the server-side ``ServerOTelTracingInterceptor``.
2525
///
2626
/// For more information, refer to the documentation for `swift-distributed-tracing`.
27+
///
28+
/// This interceptor will also inject all required and recommended span and event attributes, and set span status, as defined by
29+
/// OpenTelemetry's documentation on:
30+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
31+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
2732
public struct ClientOTelTracingInterceptor: ClientInterceptor {
2833
private let injector: ClientRequestInjector
2934
private let emitEventOnEachWrite: Bool
@@ -82,7 +87,11 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
8287
context: serviceContext,
8388
ofKind: .client
8489
) { span in
85-
self.setOTelSpanAttributes(into: span, context: context)
90+
span.setOTelClientSpanGRPCAttributes(
91+
context: context,
92+
serverHostname: self.serverHostname,
93+
networkTransportMethod: self.networkTransportMethod
94+
)
8695

8796
span.addEvent("Request started")
8897

@@ -92,7 +101,6 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
92101
let messageSentCounter = Atomic(1)
93102
let eventEmittingWriter = HookedWriter(
94103
wrapping: writer,
95-
beforeEachWrite: {},
96104
afterEachWrite: {
97105
var event = SpanEvent(name: "rpc.message")
98106
event.attributes.rpc.messageType = "SENT"
@@ -158,31 +166,6 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
158166
return response
159167
}
160168
}
161-
162-
private func setOTelSpanAttributes(into span: any Span, context: ClientContext) {
163-
span.attributes.rpc.system = "grpc"
164-
span.attributes.rpc.service = context.descriptor.service.fullyQualifiedService
165-
span.attributes.rpc.method = context.descriptor.method
166-
span.attributes.rpc.serverAddress = self.serverHostname
167-
span.attributes.rpc.networkTransport = self.networkTransportMethod
168-
169-
let peer = context.remotePeer
170-
// We expect this address to be of either of these two formats:
171-
// - <type>:<host>:<port> for ipv4 and ipv6 addresses
172-
// - unix:<uds-pathname> for UNIX domain sockets
173-
let components = peer.split(separator: ":")
174-
if components.count == 2 {
175-
// This is the UDS case
176-
span.attributes.rpc.networkType = String(components[0])
177-
span.attributes.rpc.networkPeerAddress = String(components[1])
178-
} else if components.count == 3 {
179-
// This is the ipv4 or ipv6 case
180-
span.attributes.rpc.networkType = String(components[0])
181-
span.attributes.rpc.networkPeerAddress = String(components[1])
182-
span.attributes.rpc.networkPeerPort = Int(components[2])
183-
span.attributes.rpc.serverPort = Int(components[2])
184-
}
185-
}
186169
}
187170

188171
/// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into

Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift renamed to Sources/GRPCInterceptors/Tracing/ServerOTelTracingInterceptor.swift

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,41 @@
1616

1717
public import GRPCCore
1818
internal import Tracing
19+
internal import Synchronization
1920

2021
/// A server interceptor that extracts tracing information from the request.
2122
///
2223
/// The extracted tracing information is made available to user code via the current `ServiceContext`.
24+
///
2325
/// For more information, refer to the documentation for `swift-distributed-tracing`.
24-
public struct ServerTracingInterceptor: ServerInterceptor {
26+
///
27+
/// This interceptor will also inject all required and recommended span and event attributes, and set span status, as defined by
28+
/// OpenTelemetry's documentation on:
29+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
30+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
31+
public struct ServerOTelTracingInterceptor: ServerInterceptor {
2532
private let extractor: ServerRequestExtractor
2633
private let emitEventOnEachWrite: Bool
34+
private var serverHostname: String
35+
private var networkTransportMethod: String
2736

28-
/// Create a new instance of a ``ServerTracingInterceptor``.
37+
/// Create a new instance of a ``ServerOTelTracingInterceptor``.
2938
///
30-
/// - Parameter emitEventOnEachWrite: If `true`, each response part sent and request part
31-
/// received will be recorded as a separate event in a tracing span. Otherwise, only the request/response
32-
/// start and end will be recorded as events.
33-
public init(emitEventOnEachWrite: Bool = false) {
39+
/// - Parameters:
40+
/// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
41+
/// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
42+
/// `network.transport` attribute in spans.
43+
/// - emitEventOnEachWrite: If `true`, each response part sent and request part received will be recorded as a separate
44+
/// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
45+
public init(
46+
serverHostname: String,
47+
networkTransportMethod: String,
48+
emitEventOnEachWrite: Bool = false
49+
) {
3450
self.extractor = ServerRequestExtractor()
3551
self.emitEventOnEachWrite = emitEventOnEachWrite
52+
self.serverHostname = serverHostname
53+
self.networkTransportMethod = networkTransportMethod
3654
}
3755

3856
/// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the
@@ -41,6 +59,11 @@ public struct ServerTracingInterceptor: ServerInterceptor {
4159
///
4260
/// Which key-value pairs are extracted and made available will depend on the specific tracing implementation
4361
/// that has been configured when bootstrapping `swift-distributed-tracing` in your application.
62+
///
63+
/// It will also inject all required and recommended span and event attributes, and set span status, as defined by OpenTelemetry's
64+
/// documentation on:
65+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
66+
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
4467
public func intercept<Input, Output>(
4568
request: StreamingServerRequest<Input>,
4669
context: ServerContext,
@@ -65,36 +88,52 @@ public struct ServerTracingInterceptor: ServerInterceptor {
6588
context: serviceContext,
6689
ofKind: .server
6790
) { span in
68-
span.addEvent("Received request start")
91+
span.setOTelServerSpanGRPCAttributes(
92+
context: context,
93+
serverHostname: self.serverHostname,
94+
networkTransportMethod: self.networkTransportMethod
95+
)
6996

70-
var request = request
97+
span.addEvent("Received request")
7198

99+
var request = request
72100
if self.emitEventOnEachWrite {
101+
let messageReceivedCounter = Atomic(1)
73102
request.messages = RPCAsyncSequence(
74103
wrapping: request.messages.map { element in
75-
span.addEvent("Received request part")
104+
var event = SpanEvent(name: "rpc.message")
105+
event.attributes.rpc.messageType = "RECEIVED"
106+
event.attributes.rpc.messageID =
107+
messageReceivedCounter
108+
.wrappingAdd(1, ordering: .sequentiallyConsistent)
109+
.oldValue
110+
span.addEvent(event)
76111
return element
77112
}
78113
)
79114
}
80115

81116
var response = try await next(request, context)
82117

83-
span.addEvent("Received request end")
118+
span.addEvent("Finished processing request")
84119

85120
switch response.accepted {
86121
case .success(var success):
87122
let wrappedProducer = success.producer
88123

89124
if self.emitEventOnEachWrite {
90125
success.producer = { writer in
126+
let messageSentCounter = Atomic(1)
91127
let eventEmittingWriter = HookedWriter(
92128
wrapping: writer,
93-
beforeEachWrite: {
94-
span.addEvent("Sending response part")
95-
},
96129
afterEachWrite: {
97-
span.addEvent("Sent response part")
130+
var event = SpanEvent(name: "rpc.message")
131+
event.attributes.rpc.messageType = "SENT"
132+
event.attributes.rpc.messageID =
133+
messageSentCounter
134+
.wrappingAdd(1, ordering: .sequentiallyConsistent)
135+
.oldValue
136+
span.addEvent(event)
98137
}
99138
)
100139

@@ -115,8 +154,11 @@ public struct ServerTracingInterceptor: ServerInterceptor {
115154

116155
response = .init(accepted: .success(success))
117156

118-
case .failure:
157+
case .failure(let error):
158+
span.attributes.rpc.grpcStatusCode = error.code.rawValue
159+
span.setStatus(SpanStatus(code: .error))
119160
span.addEvent("Sent error response")
161+
span.recordError(error)
120162
}
121163

122164
return response

Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17-
import Tracing
17+
internal import Tracing
18+
internal import GRPCCore
1819

1920
@dynamicMemberLookup
2021
package struct RPCAttributes: SpanAttributeNamespace {
@@ -58,3 +59,149 @@ extension SpanAttributes {
5859
}
5960
}
6061
}
62+
63+
extension Span {
64+
// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/
65+
func setOTelClientSpanGRPCAttributes(
66+
context: ClientContext,
67+
serverHostname: String,
68+
networkTransportMethod: String
69+
) {
70+
self.attributes.rpc.system = "grpc"
71+
self.attributes.rpc.serverAddress = serverHostname
72+
self.attributes.rpc.networkTransport = networkTransportMethod
73+
self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService
74+
self.attributes.rpc.method = context.descriptor.method
75+
76+
// Set server address information
77+
switch PeerAddress(context.remotePeer) {
78+
case .ipv4(let address, let port):
79+
self.attributes.rpc.networkType = "ipv4"
80+
self.attributes.rpc.networkPeerAddress = address
81+
self.attributes.rpc.networkPeerPort = port
82+
self.attributes.rpc.serverPort = port
83+
84+
case .ipv6(let address, let port):
85+
self.attributes.rpc.networkType = "ipv6"
86+
self.attributes.rpc.networkPeerAddress = address
87+
self.attributes.rpc.networkPeerPort = port
88+
self.attributes.rpc.serverPort = port
89+
90+
case .unixDomainSocket(let path):
91+
self.attributes.rpc.networkType = "unix"
92+
self.attributes.rpc.networkPeerAddress = path
93+
94+
case .other(let address):
95+
// We can't nicely format the span attributes to contain the appropriate information here,
96+
// so include the whole thing as part of the server address.
97+
self.attributes.rpc.serverAddress = address
98+
}
99+
}
100+
101+
func setOTelServerSpanGRPCAttributes(
102+
context: ServerContext,
103+
serverHostname: String,
104+
networkTransportMethod: String
105+
) {
106+
self.attributes.rpc.system = "grpc"
107+
self.attributes.rpc.serverAddress = serverHostname
108+
self.attributes.rpc.networkTransport = networkTransportMethod
109+
self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService
110+
self.attributes.rpc.method = context.descriptor.method
111+
112+
// Set server address information
113+
switch PeerAddress(context.localPeer) {
114+
case .ipv4(let address, let port):
115+
self.attributes.rpc.networkType = "ipv4"
116+
self.attributes.rpc.networkPeerAddress = address
117+
self.attributes.rpc.networkPeerPort = port
118+
self.attributes.rpc.serverPort = port
119+
120+
case .ipv6(let address, let port):
121+
self.attributes.rpc.networkType = "ipv6"
122+
self.attributes.rpc.networkPeerAddress = address
123+
self.attributes.rpc.networkPeerPort = port
124+
self.attributes.rpc.serverPort = port
125+
126+
case .unixDomainSocket(let path):
127+
self.attributes.rpc.networkType = "unix"
128+
self.attributes.rpc.networkPeerAddress = path
129+
130+
case .other(let address):
131+
// We can't nicely format the span attributes to contain the appropriate information here,
132+
// so include the whole thing as part of the server address.
133+
self.attributes.rpc.serverAddress = address
134+
}
135+
136+
switch PeerAddress(context.remotePeer) {
137+
case .ipv4(let address, let port):
138+
self.attributes.rpc.clientAddress = address
139+
self.attributes.rpc.clientPort = port
140+
141+
case .ipv6(let address, let port):
142+
self.attributes.rpc.clientAddress = address
143+
self.attributes.rpc.clientPort = port
144+
145+
case .unixDomainSocket(let path):
146+
self.attributes.rpc.clientAddress = path
147+
148+
case .other(let address):
149+
self.attributes.rpc.clientAddress = address
150+
}
151+
}
152+
}
153+
154+
private enum PeerAddress {
155+
case ipv4(address: String, port: Int?)
156+
case ipv6(address: String, port: Int?)
157+
case unixDomainSocket(path: String)
158+
case other(String)
159+
160+
init(_ address: String) {
161+
// We expect this address to be of one of these formats:
162+
// - ipv4:<host>:<port> for ipv4 addresses
163+
// - ipv6:[<host>]:<port> for ipv6 addresses
164+
// - unix:<uds-pathname> for UNIX domain sockets
165+
let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false)
166+
167+
guard addressComponents.count > 1 else {
168+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
169+
self = .other(address)
170+
return
171+
}
172+
173+
// Check what type the transport is...
174+
switch addressComponents[0] {
175+
case "ipv4":
176+
guard addressComponents.count == 3, let port = Int(addressComponents[2]) else {
177+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
178+
self = .other(address)
179+
return
180+
}
181+
self = .ipv4(address: String(addressComponents[1]), port: port)
182+
183+
case "ipv6":
184+
guard addressComponents.count > 2, let port = Int(addressComponents.last!) else {
185+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
186+
self = .other(address)
187+
return
188+
}
189+
self = .ipv6(
190+
address: String(addressComponents[1..<addressComponents.count-1].joined(separator: ":")),
191+
port: port
192+
)
193+
194+
case "unix":
195+
guard addressComponents.count == 2 else {
196+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
197+
self = .other(address)
198+
return
199+
}
200+
self = .unixDomainSocket(path: String(addressComponents[1]))
201+
202+
default:
203+
// This is some unexpected/unknown format, so we have no way of splitting it up nicely.
204+
self = .other(address)
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)