Skip to content

Commit eb3c0b6

Browse files
committed
feat: Add optional Task support for BatchExportProcessor and PeriodicExportingMetricReader
This change adds the optional support for threads for BatchExportProcessor and PeriodicExportingMetricReader, in order to support .NET's WebAssembly single threaded mode.
1 parent f8dc06a commit eb3c0b6

15 files changed

+1189
-235
lines changed

src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ OpenTelemetry.Batch<T>.Enumerator.Enumerator() -> void
3131
OpenTelemetry.Batch<T>.Enumerator.MoveNext() -> bool
3232
OpenTelemetry.Batch<T>.Enumerator.Reset() -> void
3333
OpenTelemetry.BatchActivityExportProcessor
34-
OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter<System.Diagnostics.Activity!>! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
34+
OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter<System.Diagnostics.Activity!>! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void
3535
OpenTelemetry.BatchExportProcessor<T>
36-
OpenTelemetry.BatchExportProcessor<T>.BatchExportProcessor(OpenTelemetry.BaseExporter<T!>! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
36+
OpenTelemetry.BatchExportProcessor<T>.BatchExportProcessor(OpenTelemetry.BaseExporter<T!>! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void
3737
OpenTelemetry.BatchExportProcessorOptions<T>
3838
OpenTelemetry.BatchExportProcessorOptions<T>.BatchExportProcessorOptions() -> void
3939
OpenTelemetry.BatchExportProcessorOptions<T>.ExporterTimeoutMilliseconds.get -> int
@@ -45,7 +45,7 @@ OpenTelemetry.BatchExportProcessorOptions<T>.MaxQueueSize.set -> void
4545
OpenTelemetry.BatchExportProcessorOptions<T>.ScheduledDelayMilliseconds.get -> int
4646
OpenTelemetry.BatchExportProcessorOptions<T>.ScheduledDelayMilliseconds.set -> void
4747
OpenTelemetry.BatchLogRecordExportProcessor
48-
OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
48+
OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void
4949
OpenTelemetry.CompositeProcessor<T>
5050
OpenTelemetry.CompositeProcessor<T>.AddProcessor(OpenTelemetry.BaseProcessor<T>! processor) -> OpenTelemetry.CompositeProcessor<T>!
5151
OpenTelemetry.CompositeProcessor<T>.CompositeProcessor(System.Collections.Generic.IEnumerable<OpenTelemetry.BaseProcessor<T>!>! processors) -> void
@@ -256,7 +256,7 @@ OpenTelemetry.Metrics.MetricType.LongSum = 26 -> OpenTelemetry.Metrics.MetricTyp
256256
OpenTelemetry.Metrics.MetricType.LongSumNonMonotonic = 138 -> OpenTelemetry.Metrics.MetricType
257257
OpenTelemetry.Metrics.MetricTypeExtensions
258258
OpenTelemetry.Metrics.PeriodicExportingMetricReader
259-
OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter<OpenTelemetry.Metrics.Metric!>! exporter, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void
259+
OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter<OpenTelemetry.Metrics.Metric!>! exporter, int exportIntervalMilliseconds, int exportTimeoutMilliseconds) -> void
260260
OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions
261261
OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.get -> int?
262262
OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.set -> void
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter<System.Diagnostics.Activity!>! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
2+
OpenTelemetry.BatchExportProcessor<T>.BatchExportProcessor(OpenTelemetry.BaseExporter<T!>! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
3+
OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void
4+
OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter<OpenTelemetry.Metrics.Metric!>! exporter, bool useThreads = true, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void

src/OpenTelemetry/BatchExportProcessor.cs

Lines changed: 55 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
2020
internal const int DefaultMaxExportBatchSize = 512;
2121

2222
internal readonly int MaxExportBatchSize;
23+
2324
internal readonly int ScheduledDelayMilliseconds;
2425
internal readonly int ExporterTimeoutMilliseconds;
2526

2627
private readonly CircularBuffer<T> circularBuffer;
27-
private readonly Thread exporterThread;
28-
private readonly AutoResetEvent exportTrigger = new(false);
29-
private readonly ManualResetEvent dataExportedNotification = new(false);
30-
private readonly ManualResetEvent shutdownTrigger = new(false);
31-
private long shutdownDrainTarget = long.MaxValue;
32-
private long droppedCount;
28+
private readonly BatchExportWorker<T> worker;
29+
private readonly bool useThreads;
3330
private bool disposed;
3431

3532
/// <summary>
@@ -42,6 +39,26 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
4239
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512.</param>
4340
protected BatchExportProcessor(
4441
BaseExporter<T> exporter,
42+
int maxQueueSize,
43+
int scheduledDelayMilliseconds,
44+
int exporterTimeoutMilliseconds,
45+
int maxExportBatchSize)
46+
: this(exporter, true, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize)
47+
{
48+
}
49+
50+
/// <summary>
51+
/// Initializes a new instance of the <see cref="BatchExportProcessor{T}"/> class.
52+
/// </summary>
53+
/// <param name="exporter">Exporter instance.</param>
54+
/// <param name="useThreads">Enables the use of <see cref="Thread" /> when true, <see cref="Task"/> when false.</param>
55+
/// <param name="maxQueueSize">The maximum queue size. After the size is reached data are dropped. The default value is 2048.</param>
56+
/// <param name="scheduledDelayMilliseconds">The delay interval in milliseconds between two consecutive exports. The default value is 5000.</param>
57+
/// <param name="exporterTimeoutMilliseconds">How long the export can run before it is cancelled. The default value is 30000.</param>
58+
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512.</param>
59+
protected BatchExportProcessor(
60+
BaseExporter<T> exporter,
61+
bool useThreads = true,
4562
int maxQueueSize = DefaultMaxQueueSize,
4663
int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds,
4764
int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds,
@@ -57,20 +74,16 @@ protected BatchExportProcessor(
5774
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
5875
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
5976
this.MaxExportBatchSize = maxExportBatchSize;
60-
this.exporterThread = new Thread(this.ExporterProc)
61-
{
62-
IsBackground = true,
63-
#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1
64-
Name = $"OpenTelemetry-{nameof(BatchExportProcessor<T>)}-{exporter.GetType().Name}",
65-
#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1
66-
};
67-
this.exporterThread.Start();
77+
this.useThreads = useThreads;
78+
79+
this.worker = this.CreateWorker();
80+
this.worker.Start();
6881
}
6982

7083
/// <summary>
7184
/// Gets the number of telemetry objects dropped by the processor.
7285
/// </summary>
73-
internal long DroppedCount => Volatile.Read(ref this.droppedCount);
86+
internal long DroppedCount => this.worker.DroppedCount;
7487

7588
/// <summary>
7689
/// Gets the number of telemetry objects received by the processor.
@@ -89,20 +102,14 @@ internal bool TryExport(T data)
89102
{
90103
if (this.circularBuffer.Count >= this.MaxExportBatchSize)
91104
{
92-
try
93-
{
94-
this.exportTrigger.Set();
95-
}
96-
catch (ObjectDisposedException)
97-
{
98-
}
105+
this.worker.TriggerExport();
99106
}
100107

101108
return true; // enqueue succeeded
102109
}
103110

104111
// either the queue is full or exceeded the spin limit, drop the item on the floor
105-
Interlocked.Increment(ref this.droppedCount);
112+
this.worker.IncrementDroppedCount();
106113

107114
return false;
108115
}
@@ -116,113 +123,29 @@ protected override void OnExport(T data)
116123
/// <inheritdoc/>
117124
protected override bool OnForceFlush(int timeoutMilliseconds)
118125
{
119-
var tail = this.circularBuffer.RemovedCount;
120-
var head = this.circularBuffer.AddedCount;
121-
122-
if (head == tail)
123-
{
124-
return true; // nothing to flush
125-
}
126-
127-
try
128-
{
129-
this.exportTrigger.Set();
130-
}
131-
catch (ObjectDisposedException)
132-
{
133-
return false;
134-
}
135-
136-
if (timeoutMilliseconds == 0)
137-
{
138-
return false;
139-
}
140-
141-
var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };
142-
143-
var sw = timeoutMilliseconds == Timeout.Infinite
144-
? null
145-
: Stopwatch.StartNew();
146-
147-
// There is a chance that the export thread finished processing all the data from the queue,
148-
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
149-
const int pollingMilliseconds = 1000;
150-
151-
while (true)
152-
{
153-
if (sw == null)
154-
{
155-
try
156-
{
157-
WaitHandle.WaitAny(triggers, pollingMilliseconds);
158-
}
159-
catch (ObjectDisposedException)
160-
{
161-
return false;
162-
}
163-
}
164-
else
165-
{
166-
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
167-
168-
if (timeout <= 0)
169-
{
170-
return this.circularBuffer.RemovedCount >= head;
171-
}
172-
173-
try
174-
{
175-
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds));
176-
}
177-
catch (ObjectDisposedException)
178-
{
179-
return false;
180-
}
181-
}
182-
183-
if (this.circularBuffer.RemovedCount >= head)
184-
{
185-
return true;
186-
}
187-
188-
if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue)
189-
{
190-
return false;
191-
}
192-
}
126+
return this.worker.WaitForExport(timeoutMilliseconds);
193127
}
194128

195129
/// <inheritdoc/>
196130
protected override bool OnShutdown(int timeoutMilliseconds)
197131
{
198-
Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount);
199-
200-
try
201-
{
202-
this.shutdownTrigger.Set();
203-
}
204-
catch (ObjectDisposedException)
205-
{
206-
return false;
207-
}
132+
var result = this.worker.Shutdown(timeoutMilliseconds);
208133

209134
OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount);
210135

211136
if (timeoutMilliseconds == Timeout.Infinite)
212137
{
213-
this.exporterThread.Join();
214-
return this.exporter.Shutdown();
138+
return this.exporter.Shutdown() && result;
215139
}
216140

217141
if (timeoutMilliseconds == 0)
218142
{
219-
return this.exporter.Shutdown(0);
143+
return this.exporter.Shutdown(0) && result;
220144
}
221145

222146
var sw = Stopwatch.StartNew();
223-
this.exporterThread.Join(timeoutMilliseconds);
224147
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
225-
return this.exporter.Shutdown((int)Math.Max(timeout, 0));
148+
return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result;
226149
}
227150

228151
/// <inheritdoc/>
@@ -232,9 +155,7 @@ protected override void Dispose(bool disposing)
232155
{
233156
if (disposing)
234157
{
235-
this.exportTrigger.Dispose();
236-
this.dataExportedNotification.Dispose();
237-
this.shutdownTrigger.Dispose();
158+
this.worker?.Dispose();
238159
}
239160

240161
this.disposed = true;
@@ -243,49 +164,27 @@ protected override void Dispose(bool disposing)
243164
base.Dispose(disposing);
244165
}
245166

246-
private void ExporterProc()
167+
private BatchExportWorker<T> CreateWorker()
247168
{
248-
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };
249-
250-
while (true)
169+
#if NET
170+
// Use task-based worker for browser platform where threading may be limited
171+
if (OperatingSystem.IsBrowser() || !this.useThreads)
251172
{
252-
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
253-
if (this.circularBuffer.Count < this.MaxExportBatchSize)
254-
{
255-
try
256-
{
257-
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
258-
}
259-
catch (ObjectDisposedException)
260-
{
261-
// the exporter is somehow disposed before the worker thread could finish its job
262-
return;
263-
}
264-
}
265-
266-
if (this.circularBuffer.Count > 0)
267-
{
268-
using (var batch = new Batch<T>(this.circularBuffer, this.MaxExportBatchSize))
269-
{
270-
this.exporter.Export(batch);
271-
}
272-
273-
try
274-
{
275-
this.dataExportedNotification.Set();
276-
this.dataExportedNotification.Reset();
277-
}
278-
catch (ObjectDisposedException)
279-
{
280-
// the exporter is somehow disposed before the worker thread could finish its job
281-
return;
282-
}
283-
}
284-
285-
if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
286-
{
287-
return;
288-
}
173+
return new BatchExportTaskWorker<T>(
174+
this.circularBuffer,
175+
this.exporter,
176+
this.MaxExportBatchSize,
177+
this.ScheduledDelayMilliseconds,
178+
this.ExporterTimeoutMilliseconds);
289179
}
180+
#endif
181+
182+
// Use thread-based worker for all other platforms
183+
return new BatchExportThreadWorker<T>(
184+
this.circularBuffer,
185+
this.exporter,
186+
this.MaxExportBatchSize,
187+
this.ScheduledDelayMilliseconds,
188+
this.ExporterTimeoutMilliseconds);
290189
}
291190
}

0 commit comments

Comments
 (0)