Skip to content

Commit 89b90e7

Browse files
committed
Add option to include metadata in server interceptor
1 parent 13ea905 commit 89b90e7

File tree

4 files changed

+266
-10
lines changed

4 files changed

+266
-10
lines changed

Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,11 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
162162
let messageReceivedCounter = Atomic(1)
163163
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { part in
164164
switch part {
165-
case .message(let message):
165+
case .message:
166166
var event = SpanEvent(name: "rpc.message")
167167
event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED"
168168
event.attributes[GRPCTracingKeys.rpcMessageID] =
169-
messageReceivedCounter
169+
messageReceivedCounter
170170
.wrappingAdd(1, ordering: .sequentiallyConsistent)
171171
.oldValue
172172
span.addEvent(event)

Sources/GRPCOTelTracingInterceptors/Tracing/ServerOTelTracingInterceptor.swift

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ package import Tracing
3030
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
3131
public struct ServerOTelTracingInterceptor: ServerInterceptor {
3232
private let extractor: ServerRequestExtractor
33-
private let traceEachMessage: Bool
3433
private var serverHostname: String
3534
private var networkTransportMethod: String
3635

36+
private let traceEachMessage: Bool
37+
private var includeRequestMetadata: Bool
38+
private var includeResponseMetadata: Bool
39+
3740
/// Create a new instance of a ``ServerOTelTracingInterceptor``.
3841
///
3942
/// - Parameters:
@@ -42,15 +45,24 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
4245
/// `network.transport` attribute in spans.
4346
/// - traceEachMessage: If `true`, each response part sent and request part received will be recorded as a separate
4447
/// event in a tracing span.
48+
/// - includeRequestMetadata: if `true`, **all** metadata keys with string values included in the request will be added to the span as attributes.
49+
/// - includeResponseMetadata: if `true`, **all** metadata keys with string values included in the response will be added to the span as attributes.
50+
///
51+
/// - Important: Be careful when setting `includeRequestMetadata` or `includeResponseMetadata` to `true`,
52+
/// as including all request/response metadata can be a security risk.
4553
public init(
4654
serverHostname: String,
4755
networkTransportMethod: String,
48-
traceEachMessage: Bool = true
56+
traceEachMessage: Bool = true,
57+
includeRequestMetadata: Bool = false,
58+
includeResponseMetadata: Bool = false
4959
) {
5060
self.extractor = ServerRequestExtractor()
5161
self.traceEachMessage = traceEachMessage
5262
self.serverHostname = serverHostname
5363
self.networkTransportMethod = networkTransportMethod
64+
self.includeRequestMetadata = includeRequestMetadata
65+
self.includeResponseMetadata = includeResponseMetadata
5466
}
5567

5668
/// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the
@@ -109,6 +121,10 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
109121
networkTransportMethod: self.networkTransportMethod
110122
)
111123

124+
if self.includeRequestMetadata {
125+
span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata)
126+
}
127+
112128
var request = request
113129
if self.traceEachMessage {
114130
let messageReceivedCounter = Atomic(1)
@@ -128,6 +144,10 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
128144

129145
var response = try await next(request, context)
130146

147+
if self.includeResponseMetadata {
148+
span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata)
149+
}
150+
131151
switch response.accepted {
132152
case .success(var success):
133153
let wrappedProducer = success.producer
@@ -148,11 +168,15 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
148168
}
149169
)
150170

151-
let wrappedResult = try await wrappedProducer(
171+
let trailingMetadata = try await wrappedProducer(
152172
RPCWriter(wrapping: eventEmittingWriter)
153173
)
154174

155-
return wrappedResult
175+
if self.includeResponseMetadata {
176+
span.setMetadataStringAttributesAsResponseSpanAttributes(trailingMetadata)
177+
}
178+
179+
return trailingMetadata
156180
}
157181
}
158182

Sources/GRPCOTelTracingInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ enum GRPCTracingKeys {
3737
static let networkPeerPort = "network.peer.port"
3838

3939
fileprivate static let requestMetadataPrefix = "rpc.grpc.request.metadata."
40+
fileprivate static let responseMetadataPrefix = "rpc.grpc.response.metadata."
4041
}
4142

4243
extension Span {
@@ -134,6 +135,13 @@ extension Span {
134135
)
135136
}
136137

138+
func setMetadataStringAttributesAsResponseSpanAttributes(_ metadata: Metadata) {
139+
self.setMetadataStringAttributesAsSpanAttributes(
140+
metadata,
141+
prefix: GRPCTracingKeys.responseMetadataPrefix
142+
)
143+
}
144+
137145
private func setMetadataStringAttributesAsSpanAttributes(_ metadata: Metadata, prefix: String) {
138146
for (key, value) in metadata {
139147
switch value {

Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift

Lines changed: 228 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ struct OTelTracingClientInterceptorTests {
241241
bodyParts: RPCAsyncSequence(
242242
wrapping: AsyncThrowingStream<StreamingClientResponse.Contents.BodyPart, any Error> {
243243
$0.yield(.message(["response"]))
244+
$0.yield(
245+
.trailingMetadata([
246+
"some-repeated-response-metadata": "some-repeated-response-value1",
247+
"some-repeated-response-metadata": "some-repeated-response-value2",
248+
])
249+
)
244250
$0.finish()
245251
}
246252
)
@@ -349,10 +355,12 @@ struct OTelTracingClientInterceptorTests {
349355
bodyParts: RPCAsyncSequence(
350356
wrapping: AsyncThrowingStream<StreamingClientResponse.Contents.BodyPart, any Error> {
351357
$0.yield(.message(["response"]))
352-
$0.yield(.trailingMetadata([
353-
"some-repeated-response-metadata": "some-repeated-response-value1",
354-
"some-repeated-response-metadata": "some-repeated-response-value2"
355-
]))
358+
$0.yield(
359+
.trailingMetadata([
360+
"some-repeated-response-metadata": "some-repeated-response-value1",
361+
"some-repeated-response-metadata": "some-repeated-response-value2",
362+
])
363+
)
356364
$0.finish()
357365
}
358366
)
@@ -845,6 +853,222 @@ struct OTelTracingServerInterceptorTests {
845853
}
846854
}
847855

856+
@Test("All string-valued request metadata is included if opted-in")
857+
func testRequestMetadataOptIn() async throws {
858+
let methodDescriptor = MethodDescriptor(
859+
fullyQualifiedService: "OTelTracingServerInterceptorTests",
860+
method: "testRequestMetadataOptIn"
861+
)
862+
let interceptor = ServerOTelTracingInterceptor(
863+
serverHostname: "someserver.com",
864+
networkTransportMethod: "tcp",
865+
includeRequestMetadata: true
866+
)
867+
let request = ServerRequest(
868+
metadata: [
869+
"some-request-metadata": "some-request-value",
870+
"some-repeated-request-metadata": "some-repeated-request-value1",
871+
"some-repeated-request-metadata": "some-repeated-request-value2",
872+
"some-request-metadata-bin": .binary([1]),
873+
],
874+
message: [UInt8]()
875+
)
876+
let response = try await interceptor.intercept(
877+
tracer: self.tracer,
878+
request: .init(single: request),
879+
context: ServerContext(
880+
descriptor: methodDescriptor,
881+
remotePeer: "ipv4:10.1.2.80:567",
882+
localPeer: "ipv4:10.1.2.90:123",
883+
cancellation: .init()
884+
)
885+
) { request, _ in
886+
for try await _ in request.messages {
887+
// We need to iterate over the messages for the span to be able to record the events.
888+
}
889+
890+
return StreamingServerResponse<String>(
891+
accepted: .success(
892+
.init(
893+
metadata: [
894+
"some-response-metadata": "some-response-value",
895+
"some-response-metadata-bin": .binary([2]),
896+
],
897+
producer: { writer in
898+
try await writer.write("response1")
899+
try await writer.write("response2")
900+
return [
901+
"some-repeated-response-metadata": "some-repeated-response-value1",
902+
"some-repeated-response-metadata": "some-repeated-response-value2",
903+
]
904+
}
905+
)
906+
)
907+
)
908+
}
909+
910+
// Get the response out into a response stream, and assert its contents
911+
let (responseStream, responseStreamContinuation) = AsyncStream<String>.makeStream()
912+
let responseContents = try response.accepted.get()
913+
let trailingMetadata = try await responseContents.producer(
914+
RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation))
915+
)
916+
responseStreamContinuation.finish()
917+
918+
#expect(
919+
trailingMetadata == [
920+
"some-repeated-response-metadata": "some-repeated-response-value1",
921+
"some-repeated-response-metadata": "some-repeated-response-value2",
922+
]
923+
)
924+
await assertStreamContentsEqual(["response1", "response2"], responseStream)
925+
926+
assertTestSpanComponents(forMethod: methodDescriptor, tracer: self.tracer) { events in
927+
#expect(
928+
events == [
929+
// Recorded when request is received
930+
TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]),
931+
// Recorded when `response1` is sent
932+
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]),
933+
// Recorded when `response2` is sent
934+
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]),
935+
]
936+
)
937+
} assertAttributes: { attributes in
938+
#expect(
939+
attributes == [
940+
"rpc.system": "grpc",
941+
"rpc.method": .string(methodDescriptor.method),
942+
"rpc.service": .string(methodDescriptor.service.fullyQualifiedService),
943+
"server.address": "someserver.com",
944+
"server.port": 123,
945+
"network.peer.address": "10.1.2.90",
946+
"network.peer.port": 123,
947+
"network.transport": "tcp",
948+
"network.type": "ipv4",
949+
"client.address": "10.1.2.80",
950+
"client.port": 567,
951+
"rpc.grpc.request.metadata.some-request-metadata": "some-request-value",
952+
"rpc.grpc.request.metadata.some-repeated-request-metadata": .stringArray([
953+
"some-repeated-request-value1", "some-repeated-request-value2",
954+
]),
955+
]
956+
)
957+
} assertStatus: { status in
958+
#expect(status == nil)
959+
} assertErrors: { errors in
960+
#expect(errors.isEmpty)
961+
}
962+
}
963+
964+
@Test("All string-valued response metadata is included if opted-in")
965+
func testResponseMetadataOptIn() async throws {
966+
let methodDescriptor = MethodDescriptor(
967+
fullyQualifiedService: "OTelTracingServerInterceptorTests",
968+
method: "testResponseMetadataOptIn"
969+
)
970+
let interceptor = ServerOTelTracingInterceptor(
971+
serverHostname: "someserver.com",
972+
networkTransportMethod: "tcp",
973+
includeResponseMetadata: true
974+
)
975+
let request = ServerRequest(
976+
metadata: [
977+
"some-request-metadata": "some-request-value",
978+
"some-repeated-request-metadata": "some-repeated-request-value1",
979+
"some-repeated-request-metadata": "some-repeated-request-value2",
980+
"some-request-metadata-bin": .binary([1]),
981+
],
982+
message: [UInt8]()
983+
)
984+
let response = try await interceptor.intercept(
985+
tracer: self.tracer,
986+
request: .init(single: request),
987+
context: ServerContext(
988+
descriptor: methodDescriptor,
989+
remotePeer: "ipv4:10.1.2.80:567",
990+
localPeer: "ipv4:10.1.2.90:123",
991+
cancellation: .init()
992+
)
993+
) { request, _ in
994+
for try await _ in request.messages {
995+
// We need to iterate over the messages for the span to be able to record the events.
996+
}
997+
998+
return StreamingServerResponse<String>(
999+
accepted: .success(
1000+
.init(
1001+
metadata: [
1002+
"some-response-metadata": "some-response-value",
1003+
"some-response-metadata-bin": .binary([2]),
1004+
],
1005+
producer: { writer in
1006+
try await writer.write("response1")
1007+
try await writer.write("response2")
1008+
return [
1009+
"some-repeated-response-metadata": "some-repeated-response-value1",
1010+
"some-repeated-response-metadata": "some-repeated-response-value2",
1011+
]
1012+
}
1013+
)
1014+
)
1015+
)
1016+
}
1017+
1018+
// Get the response out into a response stream, and assert its contents
1019+
let (responseStream, responseStreamContinuation) = AsyncStream<String>.makeStream()
1020+
let responseContents = try response.accepted.get()
1021+
let trailingMetadata = try await responseContents.producer(
1022+
RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation))
1023+
)
1024+
responseStreamContinuation.finish()
1025+
1026+
#expect(
1027+
trailingMetadata == [
1028+
"some-repeated-response-metadata": "some-repeated-response-value1",
1029+
"some-repeated-response-metadata": "some-repeated-response-value2",
1030+
]
1031+
)
1032+
await assertStreamContentsEqual(["response1", "response2"], responseStream)
1033+
1034+
assertTestSpanComponents(forMethod: methodDescriptor, tracer: self.tracer) { events in
1035+
#expect(
1036+
events == [
1037+
// Recorded when request is received
1038+
TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]),
1039+
// Recorded when `response1` is sent
1040+
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]),
1041+
// Recorded when `response2` is sent
1042+
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]),
1043+
]
1044+
)
1045+
} assertAttributes: { attributes in
1046+
#expect(
1047+
attributes == [
1048+
"rpc.system": "grpc",
1049+
"rpc.method": .string(methodDescriptor.method),
1050+
"rpc.service": .string(methodDescriptor.service.fullyQualifiedService),
1051+
"server.address": "someserver.com",
1052+
"server.port": 123,
1053+
"network.peer.address": "10.1.2.90",
1054+
"network.peer.port": 123,
1055+
"network.transport": "tcp",
1056+
"network.type": "ipv4",
1057+
"client.address": "10.1.2.80",
1058+
"client.port": 567,
1059+
"rpc.grpc.response.metadata.some-response-metadata": "some-response-value",
1060+
"rpc.grpc.response.metadata.some-repeated-response-metadata": .stringArray([
1061+
"some-repeated-response-value1", "some-repeated-response-value2",
1062+
]),
1063+
]
1064+
)
1065+
} assertStatus: { status in
1066+
#expect(status == nil)
1067+
} assertErrors: { errors in
1068+
#expect(errors.isEmpty)
1069+
}
1070+
}
1071+
8481072
@Test("RPC that throws is correctly recorded")
8491073
func testThrowingRPC() async throws {
8501074
let methodDescriptor = MethodDescriptor(

0 commit comments

Comments
 (0)