Skip to content

Commit ec42fcd

Browse files
authored
passes timeout to export (#454)
1 parent f800114 commit ec42fcd

File tree

2 files changed

+121
-121
lines changed

2 files changed

+121
-121
lines changed

Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private class BatchWorker : Thread {
9797
logRecordsCopy = logRecordList
9898
logRecordList.removeAll()
9999
cond.unlock()
100-
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: nil)
100+
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
101101
}
102102
} while true
103103
}
@@ -114,7 +114,7 @@ private class BatchWorker : Thread {
114114

115115

116116
public func shutdown() {
117-
forceFlush(explicitTimeout: nil)
117+
forceFlush(explicitTimeout: exportTimeout)
118118
_ = logRecordExporter.shutdown()
119119
}
120120

Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift

Lines changed: 119 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -15,139 +15,139 @@ import OpenTelemetryApi
1515
/// exports the spans to wake up and start a new export cycle.
1616
/// This batchSpanProcessor can cause high contention in a very high traffic service.
1717
public struct BatchSpanProcessor: SpanProcessor {
18-
fileprivate var worker: BatchWorker
19-
20-
public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
21-
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
22-
{
23-
worker = BatchWorker(spanExporter: spanExporter,
24-
scheduleDelay: scheduleDelay,
25-
exportTimeout: exportTimeout,
26-
maxQueueSize: maxQueueSize,
27-
maxExportBatchSize: maxExportBatchSize,
28-
willExportCallback: willExportCallback)
29-
worker.start()
30-
}
31-
32-
public let isStartRequired = false
33-
public let isEndRequired = true
34-
35-
public func onStart(parentContext: SpanContext?, span: ReadableSpan) {}
36-
37-
public func onEnd(span: ReadableSpan) {
38-
if !span.context.traceFlags.sampled {
39-
return
40-
}
41-
worker.addSpan(span: span)
42-
}
43-
44-
public func shutdown() {
45-
worker.cancel()
46-
worker.shutdown()
47-
}
48-
49-
public func forceFlush(timeout: TimeInterval? = nil) {
50-
worker.forceFlush(explicitTimeout: timeout)
18+
fileprivate var worker: BatchWorker
19+
20+
public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
21+
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
22+
{
23+
worker = BatchWorker(spanExporter: spanExporter,
24+
scheduleDelay: scheduleDelay,
25+
exportTimeout: exportTimeout,
26+
maxQueueSize: maxQueueSize,
27+
maxExportBatchSize: maxExportBatchSize,
28+
willExportCallback: willExportCallback)
29+
worker.start()
30+
}
31+
32+
public let isStartRequired = false
33+
public let isEndRequired = true
34+
35+
public func onStart(parentContext: SpanContext?, span: ReadableSpan) {}
36+
37+
public func onEnd(span: ReadableSpan) {
38+
if !span.context.traceFlags.sampled {
39+
return
5140
}
41+
worker.addSpan(span: span)
42+
}
43+
44+
public func shutdown() {
45+
worker.cancel()
46+
worker.shutdown()
47+
}
48+
49+
public func forceFlush(timeout: TimeInterval? = nil) {
50+
worker.forceFlush(explicitTimeout: timeout)
51+
}
5252
}
5353

5454
/// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export
5555
/// the data.
5656
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
5757
private class BatchWorker: Thread {
58-
let spanExporter: SpanExporter
59-
let scheduleDelay: TimeInterval
60-
let maxQueueSize: Int
61-
let exportTimeout: TimeInterval
62-
let maxExportBatchSize: Int
63-
let willExportCallback: ((inout [SpanData]) -> Void)?
64-
let halfMaxQueueSize: Int
65-
private let cond = NSCondition()
66-
var spanList = [ReadableSpan]()
67-
var queue: OperationQueue
68-
69-
init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
70-
self.spanExporter = spanExporter
71-
self.scheduleDelay = scheduleDelay
72-
self.exportTimeout = exportTimeout
73-
self.maxQueueSize = maxQueueSize
74-
halfMaxQueueSize = maxQueueSize >> 1
75-
self.maxExportBatchSize = maxExportBatchSize
76-
self.willExportCallback = willExportCallback
77-
queue = OperationQueue()
78-
queue.name = "BatchWorker Queue"
79-
queue.maxConcurrentOperationCount = 1
58+
let spanExporter: SpanExporter
59+
let scheduleDelay: TimeInterval
60+
let maxQueueSize: Int
61+
let exportTimeout: TimeInterval
62+
let maxExportBatchSize: Int
63+
let willExportCallback: ((inout [SpanData]) -> Void)?
64+
let halfMaxQueueSize: Int
65+
private let cond = NSCondition()
66+
var spanList = [ReadableSpan]()
67+
var queue: OperationQueue
68+
69+
init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
70+
self.spanExporter = spanExporter
71+
self.scheduleDelay = scheduleDelay
72+
self.exportTimeout = exportTimeout
73+
self.maxQueueSize = maxQueueSize
74+
halfMaxQueueSize = maxQueueSize >> 1
75+
self.maxExportBatchSize = maxExportBatchSize
76+
self.willExportCallback = willExportCallback
77+
queue = OperationQueue()
78+
queue.name = "BatchWorker Queue"
79+
queue.maxConcurrentOperationCount = 1
80+
}
81+
82+
func addSpan(span: ReadableSpan) {
83+
cond.lock()
84+
defer { cond.unlock() }
85+
86+
if spanList.count == maxQueueSize {
87+
// TODO: Record a counter for dropped spans.
88+
return
8089
}
81-
82-
func addSpan(span: ReadableSpan) {
83-
cond.lock()
84-
defer { cond.unlock() }
85-
86-
if spanList.count == maxQueueSize {
87-
// TODO: Record a counter for dropped spans.
88-
return
89-
}
90-
// TODO: Record a gauge for referenced spans.
91-
spanList.append(span)
92-
// Notify the worker thread that at half of the queue is available. It will take
93-
// time anyway for the thread to wake up.
94-
if spanList.count >= halfMaxQueueSize {
95-
cond.broadcast()
96-
}
97-
}
98-
99-
override func main() {
100-
repeat {
101-
autoreleasepool {
102-
var spansCopy: [ReadableSpan]
103-
cond.lock()
104-
if spanList.count < maxExportBatchSize {
105-
repeat {
106-
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
107-
} while spanList.isEmpty
108-
}
109-
spansCopy = spanList
110-
spanList.removeAll()
111-
cond.unlock()
112-
self.exportBatch(spanList: spansCopy, explicitTimeout: nil)
113-
}
114-
} while true
90+
// TODO: Record a gauge for referenced spans.
91+
spanList.append(span)
92+
// Notify the worker thread that at half of the queue is available. It will take
93+
// time anyway for the thread to wake up.
94+
if spanList.count >= halfMaxQueueSize {
95+
cond.broadcast()
11596
}
116-
117-
func shutdown() {
118-
forceFlush(explicitTimeout: nil)
119-
spanExporter.shutdown()
120-
}
121-
122-
public func forceFlush(explicitTimeout: TimeInterval?) {
97+
}
98+
99+
override func main() {
100+
repeat {
101+
autoreleasepool {
123102
var spansCopy: [ReadableSpan]
124103
cond.lock()
104+
if spanList.count < maxExportBatchSize {
105+
repeat {
106+
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
107+
} while spanList.isEmpty
108+
}
125109
spansCopy = spanList
126110
spanList.removeAll()
127111
cond.unlock()
128-
// Execute the batch export outside the synchronized to not block all producers.
129-
exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout)
130-
}
131-
132-
private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) {
133-
let exportOperation = BlockOperation { [weak self] in
134-
self?.exportAction(spanList: spanList)
135-
}
136-
let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
137-
timeoutTimer.setEventHandler { exportOperation.cancel() }
138-
let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
139-
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1))
140-
timeoutTimer.activate()
141-
queue.addOperation(exportOperation)
142-
queue.waitUntilAllOperationsAreFinished()
143-
timeoutTimer.cancel()
112+
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
113+
}
114+
} while true
115+
}
116+
117+
func shutdown() {
118+
forceFlush(explicitTimeout: self.exportTimeout)
119+
spanExporter.shutdown()
120+
}
121+
122+
public func forceFlush(explicitTimeout: TimeInterval?) {
123+
var spansCopy: [ReadableSpan]
124+
cond.lock()
125+
spansCopy = spanList
126+
spanList.removeAll()
127+
cond.unlock()
128+
// Execute the batch export outside the synchronized to not block all producers.
129+
exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout)
130+
}
131+
132+
private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval?) {
133+
let exportOperation = BlockOperation { [weak self] in
134+
self?.exportAction(spanList: spanList)
144135
}
145-
146-
private func exportAction(spanList: [ReadableSpan]) {
147-
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
148-
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
149-
willExportCallback?(&spansToExport)
150-
spanExporter.export(spans: spansToExport)
151-
}
136+
let timeoutTimer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
137+
timeoutTimer.setEventHandler { exportOperation.cancel() }
138+
let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
139+
timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1))
140+
timeoutTimer.activate()
141+
queue.addOperation(exportOperation)
142+
queue.waitUntilAllOperationsAreFinished()
143+
timeoutTimer.cancel()
144+
}
145+
146+
private func exportAction(spanList: [ReadableSpan]) {
147+
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
148+
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
149+
willExportCallback?(&spansToExport)
150+
spanExporter.export(spans: spansToExport)
152151
}
152+
}
153153
}

0 commit comments

Comments
 (0)