@@ -15,21 +15,36 @@ import OpenTelemetryApi
1515/// exports the spans to wake up and start a new export cycle.
1616/// This batchSpanProcessor can cause high contention in a very high traffic service.
1717public struct BatchSpanProcessor : SpanProcessor {
18+ fileprivate static let SPAN_PROCESSOR_TYPE_LABEL : String = " processorType "
19+ fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL : String = " dropped "
20+ fileprivate static let SPAN_PROCESSOR_TYPE_VALUE : String = BatchSpanProcessor . name
21+
22+ fileprivate var worker : BatchWorker
1823
19-
20- fileprivate var worker : BatchWorker
21-
22- public init ( spanExporter: SpanExporter , scheduleDelay: TimeInterval = 5 , exportTimeout: TimeInterval = 30 ,
23- maxQueueSize: Int = 2048 , maxExportBatchSize: Int = 512 , willExportCallback: ( ( inout [ SpanData ] ) -> Void ) ? = nil )
24- {
25- worker = BatchWorker ( spanExporter: spanExporter,
26- scheduleDelay: scheduleDelay,
27- exportTimeout: exportTimeout,
28- maxQueueSize: maxQueueSize,
29- maxExportBatchSize: maxExportBatchSize,
30- willExportCallback: willExportCallback)
31- worker. start ( )
32- }
24+ public static var name : String {
25+ String ( describing: Self . self)
26+ }
27+
28+ public init (
29+ spanExporter: SpanExporter ,
30+ meterProvider: StableMeterProvider ,
31+ scheduleDelay: TimeInterval = 5 ,
32+ exportTimeout: TimeInterval = 30 ,
33+ maxQueueSize: Int = 2048 ,
34+ maxExportBatchSize: Int = 512 ,
35+ willExportCallback: ( ( inout [ SpanData ] ) -> Void ) ? = nil
36+ ) {
37+ worker = BatchWorker (
38+ spanExporter: spanExporter,
39+ meterProvider: meterProvider,
40+ scheduleDelay: scheduleDelay,
41+ exportTimeout: exportTimeout,
42+ maxQueueSize: maxQueueSize,
43+ maxExportBatchSize: maxExportBatchSize,
44+ willExportCallback: willExportCallback
45+ )
46+ worker. start ( )
47+ }
3348
3449 public let isStartRequired = false
3550 public let isEndRequired = true
@@ -57,40 +72,105 @@ public struct BatchSpanProcessor: SpanProcessor {
5772/// the data.
5873/// The list of batched data is protected by a NSCondition which ensures full concurrency.
5974private class BatchWorker : Thread {
60- let spanExporter : SpanExporter
61- let scheduleDelay : TimeInterval
62- let maxQueueSize : Int
63- let exportTimeout : TimeInterval
64- let maxExportBatchSize : Int
65- let willExportCallback : ( ( inout [ SpanData ] ) -> Void ) ?
66- let halfMaxQueueSize : Int
67- private let cond = NSCondition ( )
68- var spanList = [ ReadableSpan] ( )
69- var queue : OperationQueue
70-
71- init ( spanExporter: SpanExporter , scheduleDelay: TimeInterval , exportTimeout: TimeInterval , maxQueueSize: Int , maxExportBatchSize: Int , willExportCallback: ( ( inout [ SpanData ] ) -> Void ) ? ) {
72- self . spanExporter = spanExporter
73- self . scheduleDelay = scheduleDelay
74- self . exportTimeout = exportTimeout
75- self . maxQueueSize = maxQueueSize
76- halfMaxQueueSize = maxQueueSize >> 1
77- self . maxExportBatchSize = maxExportBatchSize
78- self . willExportCallback = willExportCallback
79- queue = OperationQueue ( )
80- queue. name = " BatchWorker Queue "
81- queue. maxConcurrentOperationCount = 1
82- }
75+ let spanExporter : SpanExporter
76+ let meterProvider : StableMeterProvider
77+ let scheduleDelay : TimeInterval
78+ let maxQueueSize : Int
79+ let exportTimeout : TimeInterval
80+ let maxExportBatchSize : Int
81+ let willExportCallback : ( ( inout [ SpanData ] ) -> Void ) ?
82+ let halfMaxQueueSize : Int
83+ private let cond = NSCondition ( )
84+ var spanList = [ ReadableSpan] ( )
85+ var queue : OperationQueue
86+
87+ private var queueSizeGauge : ObservableLongGauge ?
88+ private var spanGaugeObserver : ObservableLongGauge ?
89+
90+ private var processedSpansCounter : LongCounter ?
91+ private let droppedAttrs : [ String : AttributeValue ]
92+ private let exportedAttrs : [ String : AttributeValue ]
93+ private let spanGaugeBuilder : LongGaugeBuilder
94+ init (
95+ spanExporter: SpanExporter ,
96+ meterProvider: StableMeterProvider ,
97+ scheduleDelay: TimeInterval ,
98+ exportTimeout: TimeInterval ,
99+ maxQueueSize: Int ,
100+ maxExportBatchSize: Int ,
101+ willExportCallback: ( ( inout [ SpanData ] ) -> Void ) ?
102+ ) {
103+ self . spanExporter = spanExporter
104+ self . meterProvider = meterProvider
105+ self . scheduleDelay = scheduleDelay
106+ self . exportTimeout = exportTimeout
107+ self . maxQueueSize = maxQueueSize
108+ halfMaxQueueSize = maxQueueSize >> 1
109+ self . maxExportBatchSize = maxExportBatchSize
110+ self . willExportCallback = willExportCallback
111+ queue = OperationQueue ( )
112+ queue. name = " BatchWorker Queue "
113+ queue. maxConcurrentOperationCount = 1
114+
115+ let meter = meterProvider. meterBuilder ( name: " io.opentelemetry.sdk.trace " ) . build ( )
116+
117+ var longGaugeSdk = meter. gaugeBuilder ( name: " queueSize " ) . ofLongs ( ) as? LongGaugeBuilderSdk
118+ longGaugeSdk = longGaugeSdk? . setDescription ( " The number of items queued " )
119+ longGaugeSdk = longGaugeSdk? . setUnit ( " 1 " )
120+ self . queueSizeGauge = longGaugeSdk? . buildWithCallback { result in
121+ result. record (
122+ value: maxQueueSize,
123+ attributes: [
124+ BatchSpanProcessor . SPAN_PROCESSOR_TYPE_LABEL: . string( BatchSpanProcessor . SPAN_PROCESSOR_TYPE_VALUE)
125+ ]
126+ )
127+ }
128+
129+ self . spanGaugeBuilder = meter. gaugeBuilder ( name: " spanSize " )
130+ . ofLongs ( )
131+
132+ var longCounterSdk = meter. counterBuilder ( name: " processedSpans " ) as? LongCounterMeterBuilderSdk
133+ longCounterSdk = longCounterSdk? . setUnit ( " 1 " )
134+ longCounterSdk = longCounterSdk? . setDescription ( " The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput] " )
135+ processedSpansCounter = longCounterSdk? . build ( )
136+
137+ droppedAttrs = [
138+ BatchSpanProcessor . SPAN_PROCESSOR_TYPE_LABEL: . string( BatchSpanProcessor . SPAN_PROCESSOR_TYPE_VALUE) ,
139+ BatchSpanProcessor . SPAN_PROCESSOR_DROPPED_LABEL: . bool( true )
140+ ]
141+ exportedAttrs = [
142+ BatchSpanProcessor . SPAN_PROCESSOR_TYPE_LABEL: . string( BatchSpanProcessor . SPAN_PROCESSOR_TYPE_VALUE) ,
143+ BatchSpanProcessor . SPAN_PROCESSOR_DROPPED_LABEL: . bool( false )
144+ ]
145+
146+ // Subscribe to new gauge observer
147+ self . spanGaugeObserver = self . spanGaugeBuilder
148+ . buildWithCallback { [ count = spanList. count] result in
149+ result. record (
150+ value: count,
151+ attributes: [
152+ BatchSpanProcessor . SPAN_PROCESSOR_TYPE_LABEL: . string( BatchSpanProcessor . SPAN_PROCESSOR_TYPE_VALUE)
153+ ]
154+ )
155+ }
156+ }
83157
158+ deinit {
159+ // Cleanup all gauge observer
160+ self . queueSizeGauge? . close ( )
161+ self . spanGaugeObserver? . close ( )
162+ }
163+
84164 func addSpan( span: ReadableSpan ) {
85165 cond. lock ( )
86166 defer { cond. unlock ( ) }
87167
88168 if spanList. count == maxQueueSize {
89- // TODO: Record a counter for dropped spans.
169+ processedSpansCounter ? . add ( value : 1 , attribute : droppedAttrs )
90170 return
91171 }
92- // TODO: Record a gauge for referenced spans.
93172 spanList. append ( span)
173+
94174 // Notify the worker thread that at half of the queue is available. It will take
95175 // time anyway for the thread to wake up.
96176 if spanList. count >= halfMaxQueueSize {
@@ -148,11 +228,16 @@ private class BatchWorker: Thread {
148228 timeoutTimer. cancel ( )
149229 }
150230
151- private func exportAction( spanList: [ ReadableSpan ] , explicitTimeout: TimeInterval ? = nil ) {
152- stride ( from: 0 , to: spanList. endIndex, by: maxExportBatchSize) . forEach {
153- var spansToExport = spanList [ $0 ..< min ( $0 + maxExportBatchSize, spanList. count) ] . map { $0. toSpanData ( ) }
154- willExportCallback ? ( & spansToExport)
155- spanExporter. export ( spans: spansToExport, explicitTimeout: explicitTimeout)
231+ private func exportAction( spanList: [ ReadableSpan ] , explicitTimeout: TimeInterval ? = nil ) {
232+ stride ( from: 0 , to: spanList. endIndex, by: maxExportBatchSize) . forEach {
233+ var spansToExport = spanList [ $0 ..< min ( $0 + maxExportBatchSize, spanList. count) ] . map { $0. toSpanData ( ) }
234+ willExportCallback ? ( & spansToExport)
235+ let result = spanExporter. export ( spans: spansToExport, explicitTimeout: explicitTimeout)
236+ if result == . success {
237+ cond. lock ( )
238+ processedSpansCounter? . add ( value: spanList. count, attribute: exportedAttrs)
239+ cond. unlock ( )
240+ }
241+ }
156242 }
157- }
158243}
0 commit comments