1616
1717public import GRPCCore
1818internal import Synchronization
19- internal import Tracing
19+ package import Tracing
2020
2121/// A server interceptor that extracts tracing information from the request.
2222///
@@ -30,7 +30,7 @@ internal import Tracing
3030/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
3131public struct ServerOTelTracingInterceptor : ServerInterceptor {
3232 private let extractor : ServerRequestExtractor
33- private let emitEventOnEachWrite : Bool
33+ private let traceEachMessage : Bool
3434 private var serverHostname : String
3535 private var networkTransportMethod : String
3636
@@ -40,15 +40,15 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
4040 /// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
4141 /// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
4242 /// `network.transport` attribute in spans.
43- /// - emitEventOnEachWrite : If `true`, each response part sent and request part received will be recorded as a separate
44- /// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
43+ /// - traceEachMessage : If `true`, each response part sent and request part received will be recorded as a separate
44+ /// event in a tracing span.
4545 public init (
4646 serverHostname: String ,
4747 networkTransportMethod: String ,
48- emitEventOnEachWrite : Bool = false
48+ traceEachMessage : Bool = true
4949 ) {
5050 self . extractor = ServerRequestExtractor ( )
51- self . emitEventOnEachWrite = emitEventOnEachWrite
51+ self . traceEachMessage = traceEachMessage
5252 self . serverHostname = serverHostname
5353 self . networkTransportMethod = networkTransportMethod
5454 }
@@ -65,13 +65,28 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
6565 /// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
6666 /// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
6767 public func intercept< Input, Output> (
68+ request: StreamingServerRequest < Input > ,
69+ context: ServerContext ,
70+ next: @Sendable ( StreamingServerRequest < Input > , ServerContext ) async throws ->
71+ StreamingServerResponse < Output >
72+ ) async throws -> StreamingServerResponse < Output > where Input: Sendable , Output: Sendable {
73+ try await self . intercept (
74+ tracer: InstrumentationSystem . tracer,
75+ request: request,
76+ context: context,
77+ next: next
78+ )
79+ }
80+
81+ /// Same as ``intercept(request:context:next:)``, but allows specifying a `Tracer` for testing purposes.
82+ package func intercept< Input, Output> (
83+ tracer: any Tracer ,
6884 request: StreamingServerRequest < Input > ,
6985 context: ServerContext ,
7086 next: @Sendable ( StreamingServerRequest < Input > , ServerContext ) async throws ->
7187 StreamingServerResponse < Output >
7288 ) async throws -> StreamingServerResponse < Output > where Input: Sendable , Output: Sendable {
7389 var serviceContext = ServiceContext . topLevel
74- let tracer = InstrumentationSystem . tracer
7590
7691 tracer. extract (
7792 request. metadata,
@@ -94,17 +109,14 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
94109 networkTransportMethod: self . networkTransportMethod
95110 )
96111
97- span. addEvent ( " Received request " )
98-
99112 var request = request
100- if self . emitEventOnEachWrite {
113+ if self . traceEachMessage {
101114 let messageReceivedCounter = Atomic ( 1 )
102115 request. messages = RPCAsyncSequence (
103116 wrapping: request. messages. map { element in
104117 var event = SpanEvent ( name: " rpc.message " )
105- event. attributes. rpc. messageType = " RECEIVED "
106- event. attributes. rpc. messageID =
107- messageReceivedCounter
118+ event. attributes [ GRPCTracingKeys . rpcMessageType] = " RECEIVED "
119+ event. attributes [ GRPCTracingKeys . rpcMessageID] = messageReceivedCounter
108120 . wrappingAdd ( 1 , ordering: . sequentiallyConsistent)
109121 . oldValue
110122 span. addEvent ( event)
@@ -115,22 +127,19 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
115127
116128 var response = try await next ( request, context)
117129
118- span. addEvent ( " Finished processing request " )
119-
120130 switch response. accepted {
121131 case . success( var success) :
122132 let wrappedProducer = success. producer
123133
124- if self . emitEventOnEachWrite {
134+ if self . traceEachMessage {
125135 success. producer = { writer in
126136 let messageSentCounter = Atomic ( 1 )
127137 let eventEmittingWriter = HookedWriter (
128138 wrapping: writer,
129139 afterEachWrite: {
130140 var event = SpanEvent ( name: " rpc.message " )
131- event. attributes. rpc. messageType = " SENT "
132- event. attributes. rpc. messageID =
133- messageSentCounter
141+ event. attributes [ GRPCTracingKeys . rpcMessageType] = " SENT "
142+ event. attributes [ GRPCTracingKeys . rpcMessageID] = messageSentCounter
134143 . wrappingAdd ( 1 , ordering: . sequentiallyConsistent)
135144 . oldValue
136145 span. addEvent ( event)
@@ -141,23 +150,19 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
141150 RPCWriter ( wrapping: eventEmittingWriter)
142151 )
143152
144- span. addEvent ( " Sent response end " )
145153 return wrappedResult
146154 }
147155 } else {
148156 success. producer = { writer in
149- let wrappedResult = try await wrappedProducer ( writer)
150- span. addEvent ( " Sent response end " )
151- return wrappedResult
157+ return try await wrappedProducer ( writer)
152158 }
153159 }
154160
155161 response = . init( accepted: . success( success) )
156162
157163 case . failure( let error) :
158- span. attributes. rpc . grpcStatusCode = error. code. rawValue
164+ span. attributes [ GRPCTracingKeys . grpcStatusCode] = error. code. rawValue
159165 span. setStatus ( SpanStatus ( code: . error) )
160- span. addEvent ( " Sent error response " )
161166 span. recordError ( error)
162167 }
163168
0 commit comments