1515 */
1616
1717public import GRPCCore
18- internal import Tracing
18+ internal import Synchronization
19+ package import Tracing
1920
2021/// A server interceptor that extracts tracing information from the request.
2122///
2223/// The extracted tracing information is made available to user code via the current `ServiceContext`.
24+ ///
2325/// For more information, refer to the documentation for `swift-distributed-tracing`.
24- public struct ServerTracingInterceptor : ServerInterceptor {
26+ ///
27+ /// This interceptor will also inject all required and recommended span and event attributes, and set span status, as defined by
28+ /// OpenTelemetry's documentation on:
29+ /// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
30+ /// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
31+ public struct ServerOTelTracingInterceptor : ServerInterceptor {
2532 private let extractor : ServerRequestExtractor
26- private let emitEventOnEachWrite : Bool
33+ private let traceEachMessage : Bool
34+ private var serverHostname : String
35+ private var networkTransportMethod : String
2736
28- /// Create a new instance of a ``ServerTracingInterceptor ``.
37+ /// Create a new instance of a ``ServerOTelTracingInterceptor ``.
2938 ///
30- /// - Parameter emitEventOnEachWrite: If `true`, each response part sent and request part
31- /// received will be recorded as a separate event in a tracing span. Otherwise, only the request/response
32- /// start and end will be recorded as events.
33- public init ( emitEventOnEachWrite: Bool = false ) {
39+ /// - Parameters:
40+ /// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
41+ /// - networkTransportMethod: The transport in use (e.g. "tcp", "unix"). This will be the value for the
42+ /// `network.transport` attribute in spans.
43+ /// - traceEachMessage: If `true`, each response part sent and request part received will be recorded as a separate
44+ /// event in a tracing span.
45+ public init (
46+ serverHostname: String ,
47+ networkTransportMethod: String ,
48+ traceEachMessage: Bool = true
49+ ) {
3450 self . extractor = ServerRequestExtractor ( )
35- self . emitEventOnEachWrite = emitEventOnEachWrite
51+ self . traceEachMessage = traceEachMessage
52+ self . serverHostname = serverHostname
53+ self . networkTransportMethod = networkTransportMethod
3654 }
3755
3856 /// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the
@@ -41,14 +59,34 @@ public struct ServerTracingInterceptor: ServerInterceptor {
4159 ///
4260 /// Which key-value pairs are extracted and made available will depend on the specific tracing implementation
4361 /// that has been configured when bootstrapping `swift-distributed-tracing` in your application.
62+ ///
63+ /// It will also inject all required and recommended span and event attributes, and set span status, as defined by OpenTelemetry's
64+ /// documentation on:
65+ /// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans
66+ /// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
4467 public func intercept< Input, Output> (
4568 request: StreamingServerRequest < Input > ,
4669 context: ServerContext ,
4770 next: @Sendable ( StreamingServerRequest < Input > , ServerContext ) async throws ->
4871 StreamingServerResponse < Output >
72+ ) async throws -> StreamingServerResponse < Output > where Input: Sendable , Output: Sendable {
73+ try await self . intercept (
74+ tracer: InstrumentationSystem . tracer,
75+ request: request,
76+ context: context,
77+ next: next
78+ )
79+ }
80+
81+ /// Same as ``intercept(request:context:next:)``, but allows specifying a `Tracer` for testing purposes.
82+ package func intercept< Input, Output> (
83+ tracer: any Tracer ,
84+ request: StreamingServerRequest < Input > ,
85+ context: ServerContext ,
86+ next: @Sendable ( StreamingServerRequest < Input > , ServerContext ) async throws ->
87+ StreamingServerResponse < Output >
4988 ) async throws -> StreamingServerResponse < Output > where Input: Sendable , Output: Sendable {
5089 var serviceContext = ServiceContext . topLevel
51- let tracer = InstrumentationSystem . tracer
5290
5391 tracer. extract (
5492 request. metadata,
@@ -65,55 +103,65 @@ public struct ServerTracingInterceptor: ServerInterceptor {
65103 context: serviceContext,
66104 ofKind: . server
67105 ) { span in
68- span. addEvent ( " Received request start " )
106+ span. setOTelServerSpanGRPCAttributes (
107+ context: context,
108+ serverHostname: self . serverHostname,
109+ networkTransportMethod: self . networkTransportMethod
110+ )
69111
70112 var request = request
71-
72- if self . emitEventOnEachWrite {
113+ if self . traceEachMessage {
114+ let messageReceivedCounter = Atomic ( 1 )
73115 request. messages = RPCAsyncSequence (
74116 wrapping: request. messages. map { element in
75- span. addEvent ( " Received request part " )
117+ var event = SpanEvent ( name: " rpc.message " )
118+ event. attributes [ GRPCTracingKeys . rpcMessageType] = " RECEIVED "
119+ event. attributes [ GRPCTracingKeys . rpcMessageID] =
120+ messageReceivedCounter
121+ . wrappingAdd ( 1 , ordering: . sequentiallyConsistent)
122+ . oldValue
123+ span. addEvent ( event)
76124 return element
77125 }
78126 )
79127 }
80128
81129 var response = try await next ( request, context)
82130
83- span. addEvent ( " Received request end " )
84-
85131 switch response. accepted {
86132 case . success( var success) :
87133 let wrappedProducer = success. producer
88134
89- if self . emitEventOnEachWrite {
135+ if self . traceEachMessage {
90136 success. producer = { writer in
137+ let messageSentCounter = Atomic ( 1 )
91138 let eventEmittingWriter = HookedWriter (
92139 wrapping: writer,
93140 afterEachWrite: {
94- span. addEvent ( " Sent response part " )
141+ var event = SpanEvent ( name: " rpc.message " )
142+ event. attributes [ GRPCTracingKeys . rpcMessageType] = " SENT "
143+ event. attributes [ GRPCTracingKeys . rpcMessageID] =
144+ messageSentCounter
145+ . wrappingAdd ( 1 , ordering: . sequentiallyConsistent)
146+ . oldValue
147+ span. addEvent ( event)
95148 }
96149 )
97150
98151 let wrappedResult = try await wrappedProducer (
99152 RPCWriter ( wrapping: eventEmittingWriter)
100153 )
101154
102- span. addEvent ( " Sent response end " )
103- return wrappedResult
104- }
105- } else {
106- success. producer = { writer in
107- let wrappedResult = try await wrappedProducer ( writer)
108- span. addEvent ( " Sent response end " )
109155 return wrappedResult
110156 }
111157 }
112158
113159 response = . init( accepted: . success( success) )
114160
115- case . failure:
116- span. addEvent ( " Sent error response " )
161+ case . failure( let error) :
162+ span. attributes [ GRPCTracingKeys . grpcStatusCode] = error. code. rawValue
163+ span. setStatus ( SpanStatus ( code: . error) )
164+ span. recordError ( error)
117165 }
118166
119167 return response
0 commit comments