From af3e82ba2ecce7c719a997c3f1ed658ce3bcb7f3 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Mon, 14 Jul 2025 23:25:20 -0400 Subject: [PATCH 01/14] feat: Add optional Task support for BatchExportProcessor and PeriodicExportingMetricReader This change adds the optional support for threads for BatchExportProcessor and PeriodicExportingMetricReader, in order to support .NET's WebAssembly single threaded mode. --- .../.publicApi/Stable/PublicAPI.Shipped.txt | 8 +- .../.publicApi/Stable/PublicAPI.Unshipped.txt | 4 + src/OpenTelemetry/BatchExportProcessor.cs | 211 ++++----------- .../Internal/BatchExportTaskWorker.cs | 254 ++++++++++++++++++ .../Internal/BatchExportThreadWorker.cs | 223 +++++++++++++++ .../Internal/BatchExportWorker.cs | 133 +++++++++ ...PeriodicExportingMetricReaderTaskWorker.cs | 178 ++++++++++++ ...riodicExportingMetricReaderThreadWorker.cs | 140 ++++++++++ .../PeriodicExportingMetricReaderWorker.cs | 67 +++++ .../BatchLogRecordExportProcessor.cs | 27 ++ .../Reader/PeriodicExportingMetricReader.cs | 111 ++++---- .../Processor/BatchActivityExportProcessor.cs | 37 ++- .../PeriodicExportingMetricReaderHelper.cs | 5 +- ...eriodicExportingMetricReaderHelperTests.cs | 16 +- .../BatchLogRecordExportProcessorTests.cs | 10 +- 15 files changed, 1189 insertions(+), 235 deletions(-) create mode 100644 src/OpenTelemetry/Internal/BatchExportTaskWorker.cs create mode 100644 src/OpenTelemetry/Internal/BatchExportThreadWorker.cs create mode 100644 src/OpenTelemetry/Internal/BatchExportWorker.cs create mode 100644 src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs create mode 100644 src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs create mode 100644 src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt index 43b209cd7e6..00fdde4298e 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt @@ -31,9 +31,9 @@ OpenTelemetry.Batch.Enumerator.Enumerator() -> void OpenTelemetry.Batch.Enumerator.MoveNext() -> bool OpenTelemetry.Batch.Enumerator.Reset() -> void OpenTelemetry.BatchActivityExportProcessor -OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void OpenTelemetry.BatchExportProcessor -OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void OpenTelemetry.BatchExportProcessorOptions OpenTelemetry.BatchExportProcessorOptions.BatchExportProcessorOptions() -> void OpenTelemetry.BatchExportProcessorOptions.ExporterTimeoutMilliseconds.get -> int @@ -45,7 +45,7 @@ OpenTelemetry.BatchExportProcessorOptions.MaxQueueSize.set -> void OpenTelemetry.BatchExportProcessorOptions.ScheduledDelayMilliseconds.get -> int OpenTelemetry.BatchExportProcessorOptions.ScheduledDelayMilliseconds.set -> void OpenTelemetry.BatchLogRecordExportProcessor -OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void OpenTelemetry.CompositeProcessor OpenTelemetry.CompositeProcessor.AddProcessor(OpenTelemetry.BaseProcessor! processor) -> OpenTelemetry.CompositeProcessor! OpenTelemetry.CompositeProcessor.CompositeProcessor(System.Collections.Generic.IEnumerable!>! processors) -> void @@ -256,7 +256,7 @@ OpenTelemetry.Metrics.MetricType.LongSum = 26 -> OpenTelemetry.Metrics.MetricTyp OpenTelemetry.Metrics.MetricType.LongSumNonMonotonic = 138 -> OpenTelemetry.Metrics.MetricType OpenTelemetry.Metrics.MetricTypeExtensions OpenTelemetry.Metrics.PeriodicExportingMetricReader -OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds, int exportTimeoutMilliseconds) -> void OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.get -> int? OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.set -> void diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index e69de29bb2d..e6f4ea1ced1 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -0,0 +1,4 @@ +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 46a64b8e042..047fc3cfbc4 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -20,16 +20,13 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal const int DefaultMaxExportBatchSize = 512; internal readonly int MaxExportBatchSize; + internal readonly int ScheduledDelayMilliseconds; internal readonly int ExporterTimeoutMilliseconds; private readonly CircularBuffer circularBuffer; - private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent dataExportedNotification = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); - private long shutdownDrainTarget = long.MaxValue; - private long droppedCount; + private readonly BatchExportWorker worker; + private readonly bool useThreads; private bool disposed; /// @@ -42,6 +39,26 @@ public abstract class BatchExportProcessor : BaseExportProcessor /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. protected BatchExportProcessor( BaseExporter exporter, + int maxQueueSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds, + int maxExportBatchSize) + : this(exporter, true, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance. + /// Enables the use of when true, when false. + /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. + /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. + /// How long the export can run before it is cancelled. The default value is 30000. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + protected BatchExportProcessor( + BaseExporter exporter, + bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, @@ -57,20 +74,16 @@ protected BatchExportProcessor( this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; this.MaxExportBatchSize = maxExportBatchSize; - this.exporterThread = new Thread(this.ExporterProc) - { - IsBackground = true, -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - }; - this.exporterThread.Start(); + this.useThreads = useThreads; + + this.worker = this.CreateWorker(); + this.worker.Start(); } /// /// Gets the number of telemetry objects dropped by the processor. /// - internal long DroppedCount => Volatile.Read(ref this.droppedCount); + internal long DroppedCount => this.worker.DroppedCount; /// /// Gets the number of telemetry objects received by the processor. @@ -89,20 +102,14 @@ internal bool TryExport(T data) { if (this.circularBuffer.Count >= this.MaxExportBatchSize) { - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - } + this.worker.TriggerExport(); } return true; // enqueue succeeded } // either the queue is full or exceeded the spin limit, drop the item on the floor - Interlocked.Increment(ref this.droppedCount); + this.worker.IncrementDroppedCount(); return false; } @@ -116,113 +123,29 @@ protected override void OnExport(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) { - var tail = this.circularBuffer.RemovedCount; - var head = this.circularBuffer.AddedCount; - - if (head == tail) - { - return true; // nothing to flush - } - - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } - - if (timeoutMilliseconds == 0) - { - return false; - } - - var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; - - var sw = timeoutMilliseconds == Timeout.Infinite - ? null - : Stopwatch.StartNew(); - - // There is a chance that the export thread finished processing all the data from the queue, - // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. - const int pollingMilliseconds = 1000; - - while (true) - { - if (sw == null) - { - try - { - WaitHandle.WaitAny(triggers, pollingMilliseconds); - } - catch (ObjectDisposedException) - { - return false; - } - } - else - { - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - - if (timeout <= 0) - { - return this.circularBuffer.RemovedCount >= head; - } - - try - { - WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); - } - catch (ObjectDisposedException) - { - return false; - } - } - - if (this.circularBuffer.RemovedCount >= head) - { - return true; - } - - if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) - { - return false; - } - } + return this.worker.WaitForExport(timeoutMilliseconds); } /// protected override bool OnShutdown(int timeoutMilliseconds) { - Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); - - try - { - this.shutdownTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } + var result = this.worker.Shutdown(timeoutMilliseconds); OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); if (timeoutMilliseconds == Timeout.Infinite) { - this.exporterThread.Join(); - return this.exporter.Shutdown(); + return this.exporter.Shutdown() && result; } if (timeoutMilliseconds == 0) { - return this.exporter.Shutdown(0); + return this.exporter.Shutdown(0) && result; } var sw = Stopwatch.StartNew(); - this.exporterThread.Join(timeoutMilliseconds); var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - return this.exporter.Shutdown((int)Math.Max(timeout, 0)); + return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; } /// @@ -232,9 +155,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - this.exportTrigger.Dispose(); - this.dataExportedNotification.Dispose(); - this.shutdownTrigger.Dispose(); + this.worker?.Dispose(); } this.disposed = true; @@ -243,49 +164,27 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private void ExporterProc() + private BatchExportWorker CreateWorker() { - var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; - - while (true) +#if NET + // Use task-based worker for browser platform where threading may be limited + if (OperatingSystem.IsBrowser() || !this.useThreads) { - // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously - if (this.circularBuffer.Count < this.MaxExportBatchSize) - { - try - { - WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); - } - catch (ObjectDisposedException) - { - // the exporter is somehow disposed before the worker thread could finish its job - return; - } - } - - if (this.circularBuffer.Count > 0) - { - using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) - { - this.exporter.Export(batch); - } - - try - { - this.dataExportedNotification.Set(); - this.dataExportedNotification.Reset(); - } - catch (ObjectDisposedException) - { - // the exporter is somehow disposed before the worker thread could finish its job - return; - } - } - - if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) - { - return; - } + return new BatchExportTaskWorker( + this.circularBuffer, + this.exporter, + this.MaxExportBatchSize, + this.ScheduledDelayMilliseconds, + this.ExporterTimeoutMilliseconds); } +#endif + + // Use thread-based worker for all other platforms + return new BatchExportThreadWorker( + this.circularBuffer, + this.exporter, + this.MaxExportBatchSize, + this.ScheduledDelayMilliseconds, + this.ExporterTimeoutMilliseconds); } } diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs new file mode 100644 index 00000000000..7b0bf500a5f --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -0,0 +1,254 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; + +namespace OpenTelemetry.Internal; + +/// +/// Task-based implementation of batch export worker for environments where threading may be limited. +/// +/// The type of telemetry object to be exported. +internal sealed class BatchExportTaskWorker : BatchExportWorker + where T : class +{ + private readonly CancellationTokenSource cancellationTokenSource = new(); + private readonly SemaphoreSlim exportTrigger = new(0, 1); + private readonly TaskCompletionSource shutdownCompletionSource = new(); + private TaskCompletionSource dataExportedNotification = new(); + private Task? workerTask; + private volatile bool isShutdownRequested; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + public BatchExportTaskWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + : base(circularBuffer, exporter, maxExportBatchSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds) + { + } + + /// + public override void Start() + { + this.workerTask = Task.Factory.StartNew( + this.ExporterProcAsync, + CancellationToken.None, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap(); + } + + /// + public override bool TriggerExport() + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return false; + } + + try + { + this.exportTrigger.Release(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + catch (SemaphoreFullException) + { + // Semaphore is already signaled, export is pending + return true; + } + } + + /// + public override bool WaitForExport(int timeoutMilliseconds) + { + var tail = this.circularBuffer.RemovedCount; + var head = this.circularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + if (!this.TriggerExport()) + { + return false; + } + + if (timeoutMilliseconds == 0) + { + return false; + } + + return this.WaitForExportAsync(timeoutMilliseconds, head).GetAwaiter().GetResult(); + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + this.SetShutdownDrainTarget(this.circularBuffer.AddedCount); + this.isShutdownRequested = true; + + try + { + this.cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (this.workerTask == null) + { + return true; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.workerTask.Wait(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.workerTask.Wait(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!this.disposed) + { + this.disposed = true; + + this.cancellationTokenSource.Dispose(); + this.exportTrigger.Dispose(); + } + } + + private async Task WaitForExportAsync(int timeoutMilliseconds, long targetHead) + { + var sw = timeoutMilliseconds == Timeout.Infinite + ? null + : Stopwatch.StartNew(); + + // There is a chance that the export task finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMilliseconds = 1000; + + while (true) + { + var timeout = pollingMilliseconds; + if (sw != null) + { + var remaining = timeoutMilliseconds - sw.ElapsedMilliseconds; + if (remaining <= 0) + { + return this.circularBuffer.RemovedCount >= targetHead; + } + + timeout = Math.Min((int)remaining, pollingMilliseconds); + } + + try + { + using var cts = new CancellationTokenSource(timeout); + using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( + cts.Token, + this.cancellationTokenSource.Token); + + await Task.WhenAny( + this.dataExportedNotification.Task, + this.shutdownCompletionSource.Task, + Task.Delay(timeout, combinedTokenSource.Token)).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Expected when timeout or shutdown occurs + } + + if (this.circularBuffer.RemovedCount >= targetHead) + { + return true; + } + + if (this.ShutdownDrainTarget != long.MaxValue) + { + return false; + } + } + } + + private async Task ExporterProcAsync() + { + var cancellationToken = this.cancellationTokenSource.Token; + + try + { + while (!cancellationToken.IsCancellationRequested && !this.isShutdownRequested) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.circularBuffer.Count < this.maxExportBatchSize) + { + try + { + await Task.WhenAny( + this.exportTrigger.WaitAsync(cancellationToken), + Task.Delay(this.scheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + if (this.isShutdownRequested) + { + break; + } + + // Continue to check if there's data to export before exiting + } + } + + this.PerformExport(); + + // Signal that data has been exported + var previousTcs = this.dataExportedNotification; + var newTcs = new TaskCompletionSource(); + if (Interlocked.CompareExchange(ref this.dataExportedNotification, newTcs, previousTcs) == previousTcs) + { + previousTcs.TrySetResult(true); + } + + if (this.ShouldShutdown()) + { + break; + } + } + } + catch (Exception ex) + { + // Log the exception if needed + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExporterProcAsync), ex); + } + finally + { + this.shutdownCompletionSource.TrySetResult(true); + } + } +} diff --git a/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs new file mode 100644 index 00000000000..95b3328406e --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs @@ -0,0 +1,223 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; + +namespace OpenTelemetry.Internal; + +/// +/// Thread-based implementation of batch export worker. +/// +/// The type of telemetry object to be exported. +internal sealed class BatchExportThreadWorker : BatchExportWorker + where T : class +{ + private readonly Thread exporterThread; + private readonly AutoResetEvent exportTrigger = new(false); + private readonly ManualResetEvent dataExportedNotification = new(false); + private readonly ManualResetEvent shutdownTrigger = new(false); + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + public BatchExportThreadWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + : base(circularBuffer, exporter, maxExportBatchSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds) + { + this.exporterThread = new Thread(this.ExporterProc) + { + IsBackground = true, +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + }; + } + + /// + public override void Start() + { + this.exporterThread.Start(); + } + + /// + public override bool TriggerExport() + { + try + { + this.exportTrigger.Set(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + } + + /// + public override bool WaitForExport(int timeoutMilliseconds) + { + var tail = this.circularBuffer.RemovedCount; + var head = this.circularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + if (!this.TriggerExport()) + { + return false; + } + + if (timeoutMilliseconds == 0) + { + return false; + } + + var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; + + var sw = timeoutMilliseconds == Timeout.Infinite + ? null + : Stopwatch.StartNew(); + + // There is a chance that the export thread finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMilliseconds = 1000; + + while (true) + { + if (sw == null) + { + try + { + WaitHandle.WaitAny(triggers, pollingMilliseconds); + } + catch (ObjectDisposedException) + { + return false; + } + } + else + { + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return this.circularBuffer.RemovedCount >= head; + } + + try + { + WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); + } + catch (ObjectDisposedException) + { + return false; + } + } + + if (this.circularBuffer.RemovedCount >= head) + { + return true; + } + + if (this.ShutdownDrainTarget != long.MaxValue) + { + return false; + } + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + this.SetShutdownDrainTarget(this.circularBuffer.AddedCount); + + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.exporterThread.Join(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.exporterThread.Join(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!this.disposed) + { + this.disposed = true; + + this.exportTrigger.Dispose(); + this.dataExportedNotification.Dispose(); + this.shutdownTrigger.Dispose(); + } + } + + private void ExporterProc() + { + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.circularBuffer.Count < this.maxExportBatchSize) + { + try + { + WaitHandle.WaitAny(triggers, this.scheduledDelayMilliseconds); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + this.PerformExport(); + + try + { + this.dataExportedNotification.Set(); + this.dataExportedNotification.Reset(); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + + if (this.ShouldShutdown()) + { + return; + } + } + } +} diff --git a/src/OpenTelemetry/Internal/BatchExportWorker.cs b/src/OpenTelemetry/Internal/BatchExportWorker.cs new file mode 100644 index 00000000000..1ff5c08cdd5 --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportWorker.cs @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Internal; + +/// +/// Abstract base class for batch export workers that handle the threading and synchronization logic for batch export processors. +/// +/// The type of telemetry object to be exported. +internal abstract class BatchExportWorker : IDisposable + where T : class +{ + protected readonly CircularBuffer circularBuffer; + protected readonly BaseExporter exporter; + protected readonly int maxExportBatchSize; + protected readonly int scheduledDelayMilliseconds; + protected readonly int exporterTimeoutMilliseconds; + + private long shutdownDrainTarget = long.MaxValue; + private long droppedCount; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + protected BatchExportWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + { + this.circularBuffer = circularBuffer; + this.exporter = exporter; + this.maxExportBatchSize = maxExportBatchSize; + this.scheduledDelayMilliseconds = scheduledDelayMilliseconds; + this.exporterTimeoutMilliseconds = exporterTimeoutMilliseconds; + } + + /// + /// Gets the number of telemetry objects dropped by the processor. + /// + internal long DroppedCount => Volatile.Read(ref this.droppedCount); + + /// + /// Gets the shutdown drain target. + /// + protected long ShutdownDrainTarget => Volatile.Read(ref this.shutdownDrainTarget); + + /// + /// Starts the worker. + /// + public abstract void Start(); + + /// + /// Triggers an export operation. + /// + /// True if the export was triggered successfully; otherwise, false. + public abstract bool TriggerExport(); + + /// + /// Waits for export to complete. + /// + /// The timeout in milliseconds. + /// True if the export completed within the timeout; otherwise, false. + public abstract bool WaitForExport(int timeoutMilliseconds); + + /// + /// Initiates shutdown and waits for completion. + /// + /// The timeout in milliseconds. + /// True if shutdown completed within the timeout; otherwise, false. + public abstract bool Shutdown(int timeoutMilliseconds); + + /// + /// Increments the dropped count. + /// + public void IncrementDroppedCount() + { + Interlocked.Increment(ref this.droppedCount); + } + + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Sets the shutdown drain target. + /// + /// The target count. + protected void SetShutdownDrainTarget(long target) + { + Volatile.Write(ref this.shutdownDrainTarget, target); + } + + /// + /// Performs the export operation. + /// + protected void PerformExport() + { + if (this.circularBuffer.Count > 0) + { + using (var batch = new Batch(this.circularBuffer, this.maxExportBatchSize)) + { + this.exporter.Export(batch); + } + } + } + + /// + /// Checks if shutdown should occur. + /// + /// True if shutdown should occur; otherwise, false. + protected bool ShouldShutdown() + { + return this.circularBuffer.RemovedCount >= this.ShutdownDrainTarget; + } + + /// + /// Releases the unmanaged resources used by this class and optionally releases the managed resources. + /// + /// True to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs new file mode 100644 index 00000000000..a119b8d446d --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs @@ -0,0 +1,178 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Task-based implementation of periodic exporting metric reader worker for environments where threading may be limited. +/// +internal sealed class PeriodicExportingMetricReaderTaskWorker : PeriodicExportingMetricReaderWorker +{ + private readonly CancellationTokenSource cancellationTokenSource = new(); + private readonly SemaphoreSlim exportTrigger = new(0, 1); + private readonly TaskCompletionSource shutdownCompletionSource = new(); + private Task? workerTask; + private volatile bool isShutdownRequested; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + public PeriodicExportingMetricReaderTaskWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + : base(metricReader, exportIntervalMilliseconds, exportTimeoutMilliseconds) + { + } + + /// + public override void Start() + { + this.workerTask = Task.Factory.StartNew( + this.ExporterProcAsync, + CancellationToken.None, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap(); + } + + /// + public override bool TriggerExport() + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return false; + } + + try + { + this.exportTrigger.Release(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + catch (SemaphoreFullException) + { + // Semaphore is already signaled, export is pending + return true; + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + this.isShutdownRequested = true; + + try + { + this.cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (this.workerTask == null) + { + return true; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.workerTask.Wait(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.workerTask.Wait(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.cancellationTokenSource.Dispose(); + this.exportTrigger.Dispose(); + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private async Task ExporterProcAsync() + { + var cancellationToken = this.cancellationTokenSource.Token; + var sw = Stopwatch.StartNew(); + + try + { + while (!cancellationToken.IsCancellationRequested && !this.isShutdownRequested) + { + var timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); + + try + { + await Task.WhenAny( + this.exportTrigger.WaitAsync(cancellationToken), + Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + if (this.isShutdownRequested) + { + break; + } + + // Continue to check if shutdown was requested + } + + if (cancellationToken.IsCancellationRequested || this.isShutdownRequested) + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); + this.metricReader.Collect(this.exportTimeoutMilliseconds); + break; + } + + // Check if the trigger was signaled by trying to acquire it with a timeout of 0 + var wasTriggered = await this.exportTrigger.WaitAsync(0, cancellationToken).ConfigureAwait(false); + + if (wasTriggered) + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); + } + else + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); + } + + this.metricReader.Collect(this.exportTimeoutMilliseconds); + } + } + catch (Exception ex) + { + // Log the exception if needed + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExporterProcAsync), ex); + } + finally + { + this.shutdownCompletionSource.TrySetResult(true); + } + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs new file mode 100644 index 00000000000..f04c56c3565 --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Thread-based implementation of periodic exporting metric reader worker. +/// +internal sealed class PeriodicExportingMetricReaderThreadWorker : PeriodicExportingMetricReaderWorker +{ + private readonly Thread exporterThread; + private readonly AutoResetEvent exportTrigger = new(false); + private readonly ManualResetEvent shutdownTrigger = new(false); + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + public PeriodicExportingMetricReaderThreadWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + : base(metricReader, exportIntervalMilliseconds, exportTimeoutMilliseconds) + { + this.exporterThread = new Thread(this.ExporterProc) + { + IsBackground = true, +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Name = $"OpenTelemetry-{nameof(PeriodicExportingMetricReader)}-{metricReader.GetType().Name}", +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + }; + } + + /// + public override void Start() + { + this.exporterThread.Start(); + } + + /// + public override bool TriggerExport() + { + try + { + this.exportTrigger.Set(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.exporterThread.Join(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.exporterThread.Join(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.exportTrigger.Dispose(); + this.shutdownTrigger.Dispose(); + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private void ExporterProc() + { + int index; + int timeout; + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + var sw = Stopwatch.StartNew(); + + while (true) + { + timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); + + try + { + index = WaitHandle.WaitAny(triggers, timeout); + } + catch (ObjectDisposedException) + { + return; + } + + switch (index) + { + case 0: // export + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); + this.metricReader.Collect(this.exportTimeoutMilliseconds); + break; + case 1: // shutdown + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); + this.metricReader.Collect(this.exportTimeoutMilliseconds); + return; + case WaitHandle.WaitTimeout: // timer + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); + this.metricReader.Collect(this.exportTimeoutMilliseconds); + break; + } + } + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs new file mode 100644 index 00000000000..a5ff61cc84e --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Abstract base class for periodic exporting metric reader workers that handle the threading and synchronization logic. +/// +internal abstract class PeriodicExportingMetricReaderWorker : IDisposable +{ + protected readonly BaseExportingMetricReader metricReader; + protected readonly int exportIntervalMilliseconds; + protected readonly int exportTimeoutMilliseconds; + + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + protected PeriodicExportingMetricReaderWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + { + this.metricReader = metricReader; + this.exportIntervalMilliseconds = exportIntervalMilliseconds; + this.exportTimeoutMilliseconds = exportTimeoutMilliseconds; + } + + /// + /// Starts the worker. + /// + public abstract void Start(); + + /// + /// Triggers an export operation. + /// + /// True if the export was triggered successfully; otherwise, false. + public abstract bool TriggerExport(); + + /// + /// Initiates shutdown and waits for completion. + /// + /// The timeout in milliseconds. + /// True if the shutdown completed within the timeout; otherwise, false. + public abstract bool Shutdown(int timeoutMilliseconds); + + /// + /// Disposes of the worker and its resources. + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by this class and optionally releases the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + } +} diff --git a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs index c1e341585a8..7c1980bb9c4 100644 --- a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs @@ -21,12 +21,39 @@ public class BatchLogRecordExportProcessor : BatchExportProcessor /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. public BatchLogRecordExportProcessor( BaseExporter exporter, + int maxQueueSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds, + int maxExportBatchSize) + : this( + exporter, + true, + maxQueueSize, + scheduledDelayMilliseconds, + exporterTimeoutMilliseconds, + maxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Log record exporter. + /// Enables the use of when true, when false. + /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. + /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. + /// How long the export can run before it is cancelled. The default value is 30000. + /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + public BatchLogRecordExportProcessor( + BaseExporter exporter, + bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) : base( exporter, + useThreads, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 0596e18af18..2d2b5a7f89b 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -18,9 +18,8 @@ public class PeriodicExportingMetricReader : BaseExportingMetricReader internal readonly int ExportIntervalMilliseconds; internal readonly int ExportTimeoutMilliseconds; - private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); + private readonly PeriodicExportingMetricReaderWorker worker; + private readonly bool useThreads; private bool disposed; /// @@ -31,6 +30,22 @@ public class PeriodicExportingMetricReader : BaseExportingMetricReader /// How long the export can run before it is cancelled. The default value is 30000. public PeriodicExportingMetricReader( BaseExporter exporter, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + : this(exporter, true, exportIntervalMilliseconds, exportTimeoutMilliseconds) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance to export Metrics to. + /// Enables the use of when true, when false. + /// The interval in milliseconds between two consecutive exports. The default value is 60000. + /// How long the export can run before it is cancelled. The default value is 30000. + public PeriodicExportingMetricReader( + BaseExporter exporter, + bool useThreads = true, int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) : base(exporter) @@ -46,45 +61,30 @@ public PeriodicExportingMetricReader( this.ExportIntervalMilliseconds = exportIntervalMilliseconds; this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; + this.useThreads = useThreads; - this.exporterThread = new Thread(this.ExporterProc) - { - IsBackground = true, -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Name = $"OpenTelemetry-{nameof(PeriodicExportingMetricReader)}-{exporter.GetType().Name}", -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - }; - this.exporterThread.Start(); + this.worker = this.CreateWorker(); + this.worker.Start(); } /// protected override bool OnShutdown(int timeoutMilliseconds) { - var result = true; - - try - { - this.shutdownTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } + var result = this.worker.Shutdown(timeoutMilliseconds); if (timeoutMilliseconds == Timeout.Infinite) { - this.exporterThread.Join(); - result = this.exporter.Shutdown() && result; + return this.exporter.Shutdown() && result; } - else + + if (timeoutMilliseconds == 0) { - var sw = Stopwatch.StartNew(); - result = this.exporterThread.Join(timeoutMilliseconds) && result; - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - result = this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; + return this.exporter.Shutdown(0) && result; } - return result; + var sw = Stopwatch.StartNew(); + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; } /// @@ -94,8 +94,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - this.exportTrigger.Dispose(); - this.shutdownTrigger.Dispose(); + this.worker?.Dispose(); } this.disposed = true; @@ -104,41 +103,25 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private void ExporterProc() +#pragma warning disable CA1859 // Change return type of method 'CreateWorker' from 'PeriodicExportingMetricReaderWorker' to 'PeriodicExportingMetricReaderThreadWorker' for improved performance + private PeriodicExportingMetricReaderWorker CreateWorker() +#pragma warning disable CA1859 { - int index; - int timeout; - var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; - var sw = Stopwatch.StartNew(); - - while (true) +#if NET + // Use task-based worker for browser platform where threading may be limited + if (OperatingSystem.IsBrowser() || !this.useThreads) { - timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); - - try - { - index = WaitHandle.WaitAny(triggers, timeout); - } - catch (ObjectDisposedException) - { - return; - } - - switch (index) - { - case 0: // export - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); - this.Collect(this.ExportTimeoutMilliseconds); - break; - case 1: // shutdown - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); - this.Collect(this.ExportTimeoutMilliseconds); // TODO: do we want to use the shutdown timeout here? - return; - case WaitHandle.WaitTimeout: // timer - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); - this.Collect(this.ExportTimeoutMilliseconds); - break; - } + return new PeriodicExportingMetricReaderTaskWorker( + this, + this.ExportIntervalMilliseconds, + this.ExportTimeoutMilliseconds); } +#endif + + // Use thread-based worker for all other platforms + return new PeriodicExportingMetricReaderThreadWorker( + this, + this.ExportIntervalMilliseconds, + this.ExportTimeoutMilliseconds); } } diff --git a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs index 73e7968c78f..f68208c82b0 100644 --- a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs +++ b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs @@ -14,19 +14,46 @@ public class BatchActivityExportProcessor : BatchExportProcessor /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// - /// + /// + /// + /// + /// + /// public BatchActivityExportProcessor( BaseExporter exporter, + int maxQueueSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds, + int maxExportBatchSize) + : this( + exporter, + true, + maxQueueSize, + scheduledDelayMilliseconds, + exporterTimeoutMilliseconds, + maxExportBatchSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// + /// + /// + public BatchActivityExportProcessor( + BaseExporter exporter, + bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) : base( exporter, + useThreads, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, diff --git a/src/Shared/PeriodicExportingMetricReaderHelper.cs b/src/Shared/PeriodicExportingMetricReaderHelper.cs index e7dc244a94c..d71f87c37af 100644 --- a/src/Shared/PeriodicExportingMetricReaderHelper.cs +++ b/src/Shared/PeriodicExportingMetricReaderHelper.cs @@ -12,7 +12,8 @@ internal static PeriodicExportingMetricReader CreatePeriodicExportingMetricReade BaseExporter exporter, MetricReaderOptions options, int defaultExportIntervalMilliseconds = DefaultExportIntervalMilliseconds, - int defaultExportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) + int defaultExportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds, + bool useThreads = true) { var exportInterval = options.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds ?? defaultExportIntervalMilliseconds; @@ -20,7 +21,7 @@ internal static PeriodicExportingMetricReader CreatePeriodicExportingMetricReade var exportTimeout = options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds; - var metricReader = new PeriodicExportingMetricReader(exporter, exportInterval, exportTimeout) + var metricReader = new PeriodicExportingMetricReader(exporter, useThreads, exportInterval, exportTimeout) { TemporalityPreference = options.TemporalityPreference, }; diff --git a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs index 236cbe1e727..b04d0783ec4 100644 --- a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs +++ b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs @@ -33,6 +33,18 @@ public void CreatePeriodicExportingMetricReader_Defaults() Assert.Equal(MetricReaderTemporalityPreference.Cumulative, reader.TemporalityPreference); } + [Fact] + public void CreatePeriodicExportingMetricReader_Defaults_WithTask() + { +#pragma warning disable CA2000 // Dispose objects before losing scope + var reader = CreatePeriodicExportingMetricReader(useThreads: false); +#pragma warning restore CA2000 // Dispose objects before losing scope + + Assert.Equal(60000, reader.ExportIntervalMilliseconds); + Assert.Equal(30000, reader.ExportTimeoutMilliseconds); + Assert.Equal(MetricReaderTemporalityPreference.Cumulative, reader.TemporalityPreference); + } + [Fact] public void CreatePeriodicExportingMetricReader_TemporalityPreference_FromOptions() { @@ -130,13 +142,13 @@ private static void ClearEnvVars() } private static PeriodicExportingMetricReader CreatePeriodicExportingMetricReader( - MetricReaderOptions? options = null) + MetricReaderOptions? options = null, bool useThreads = true) { options ??= new(); #pragma warning disable CA2000 // Dispose objects before losing scope var dummyMetricExporter = new InMemoryExporter(Array.Empty()); #pragma warning restore CA2000 // Dispose objects before losing scope - return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options); + return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options, useThreads: true); } } diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index cd76a4c8880..f21d1bf5369 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -10,8 +10,10 @@ namespace OpenTelemetry.Logs.Tests; public sealed class BatchLogRecordExportProcessorTests { - [Fact] - public void StateValuesAndScopeBufferingTest() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void StateValuesAndScopeBufferingTest(bool useThread) { var scopeProvider = new LoggerExternalScopeProvider(); @@ -21,6 +23,10 @@ public void StateValuesAndScopeBufferingTest() #pragma warning disable CA2000 // Dispose objects before losing scope new InMemoryExporter(exportedItems), #pragma warning restore CA2000 // Dispose objects before losing scope + useThreads: useThread, + maxQueueSize: BatchLogRecordExportProcessor.DefaultMaxQueueSize, + maxExportBatchSize: BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + exporterTimeoutMilliseconds: BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, scheduledDelayMilliseconds: int.MaxValue); using var scope = scopeProvider.Push(exportedItems); From d0282c627622f45f0817e4f6104fee2869a60695 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Tue, 15 Jul 2025 13:42:16 -0400 Subject: [PATCH 02/14] chore: Adjust parameters ordering to avoid unnamed parameters breaking change --- .../.publicApi/Stable/PublicAPI.Unshipped.txt | 8 +++---- src/OpenTelemetry/BatchExportProcessor.cs | 6 ++--- .../BatchLogRecordExportProcessor.cs | 12 +++++----- .../Reader/PeriodicExportingMetricReader.cs | 6 ++--- .../Processor/BatchActivityExportProcessor.cs | 24 +++++++++---------- .../PeriodicExportingMetricReaderHelper.cs | 2 +- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index e6f4ea1ced1..b005d35e671 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -1,4 +1,4 @@ -OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void -OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void -OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void -OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, bool useThreads = true, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000, bool useThreads = true) -> void diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 047fc3cfbc4..17495d2542f 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -43,7 +43,7 @@ protected BatchExportProcessor( int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) - : this(exporter, true, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize) + : this(exporter, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize, true) { } @@ -58,11 +58,11 @@ protected BatchExportProcessor( /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. protected BatchExportProcessor( BaseExporter exporter, - bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize) + int maxExportBatchSize = DefaultMaxExportBatchSize, + bool useThreads = true) : base(exporter) { Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); diff --git a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs index 7c1980bb9c4..fa959cc9dd4 100644 --- a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs @@ -27,11 +27,11 @@ public BatchLogRecordExportProcessor( int maxExportBatchSize) : this( exporter, - true, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, - maxExportBatchSize) + maxExportBatchSize, + true) { } @@ -46,18 +46,18 @@ public BatchLogRecordExportProcessor( /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. public BatchLogRecordExportProcessor( BaseExporter exporter, - bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize) + int maxExportBatchSize = DefaultMaxExportBatchSize, + bool useThreads = true) : base( exporter, - useThreads, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, - maxExportBatchSize) + maxExportBatchSize, + useThreads) { } diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 2d2b5a7f89b..2f482ac1f2d 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -32,7 +32,7 @@ public PeriodicExportingMetricReader( BaseExporter exporter, int exportIntervalMilliseconds, int exportTimeoutMilliseconds) - : this(exporter, true, exportIntervalMilliseconds, exportTimeoutMilliseconds) + : this(exporter, exportIntervalMilliseconds, exportTimeoutMilliseconds, true) { } @@ -45,9 +45,9 @@ public PeriodicExportingMetricReader( /// How long the export can run before it is cancelled. The default value is 30000. public PeriodicExportingMetricReader( BaseExporter exporter, - bool useThreads = true, int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, - int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) + int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds, + bool useThreads = true) : base(exporter) { Guard.ThrowIfInvalidTimeout(exportIntervalMilliseconds); diff --git a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs index f68208c82b0..89760afcea9 100644 --- a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs +++ b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs @@ -27,37 +27,37 @@ public BatchActivityExportProcessor( int maxExportBatchSize) : this( exporter, - true, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, - maxExportBatchSize) + maxExportBatchSize, + true) { } /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// - /// - /// + /// + /// + /// + /// + /// + /// public BatchActivityExportProcessor( BaseExporter exporter, - bool useThreads = true, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize) + int maxExportBatchSize = DefaultMaxExportBatchSize, + bool useThreads = true) : base( exporter, - useThreads, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, - maxExportBatchSize) + maxExportBatchSize, + useThreads) { } diff --git a/src/Shared/PeriodicExportingMetricReaderHelper.cs b/src/Shared/PeriodicExportingMetricReaderHelper.cs index d71f87c37af..3ccdc0dc1db 100644 --- a/src/Shared/PeriodicExportingMetricReaderHelper.cs +++ b/src/Shared/PeriodicExportingMetricReaderHelper.cs @@ -21,7 +21,7 @@ internal static PeriodicExportingMetricReader CreatePeriodicExportingMetricReade var exportTimeout = options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds; - var metricReader = new PeriodicExportingMetricReader(exporter, useThreads, exportInterval, exportTimeout) + var metricReader = new PeriodicExportingMetricReader(exporter, exportInterval, exportTimeout, useThreads) { TemporalityPreference = options.TemporalityPreference, }; From d01884e60b5d38ca23fac5d7c9f1a2ebee9f9138 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Tue, 15 Jul 2025 21:22:53 -0400 Subject: [PATCH 03/14] chore: Add processor dispose validation test --- .../Internal/BatchExportTaskWorker.cs | 14 ++++--- .../BatchLogRecordExportProcessorTests.cs | 39 +++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs index 7b0bf500a5f..dc11fa54045 100644 --- a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -180,6 +180,10 @@ await Task.WhenAny( this.shutdownCompletionSource.Task, Task.Delay(timeout, combinedTokenSource.Token)).ConfigureAwait(false); } + catch (ObjectDisposedException) + { + return false; // The worker has been disposed + } catch (OperationCanceledException) { // Expected when timeout or shutdown occurs @@ -216,13 +220,13 @@ await Task.WhenAny( } catch (OperationCanceledException) { - if (this.isShutdownRequested) - { - break; - } - // Continue to check if there's data to export before exiting } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } } this.PerformExport(); diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index f21d1bf5369..59f94848414 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -154,5 +154,44 @@ public void LogRecordAddedToBatchIfNotFromAnyPoolTest() Assert.Single(exportedItems); Assert.Same(logRecord, exportedItems[0]); } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void DisposeWithoutShutdown(bool useThread) + { + var scopeProvider = new LoggerExternalScopeProvider(); + + List exportedItems = new(); + + var processor = new BatchLogRecordExportProcessor( +#pragma warning disable CA2000 // Dispose objects before losing scope + new InMemoryExporter(exportedItems), +#pragma warning restore CA2000 // Dispose objects before losing scope + useThreads: useThread, + maxQueueSize: BatchLogRecordExportProcessor.DefaultMaxQueueSize, + maxExportBatchSize: BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + exporterTimeoutMilliseconds: BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, + scheduledDelayMilliseconds: int.MaxValue); + + processor.Dispose(); + + using var scope = scopeProvider.Push(exportedItems); + + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); + + var state = new LogRecordTests.DisposingState("Hello world"); + + logRecord.ILoggerData.ScopeProvider = scopeProvider; + logRecord.StateValues = state; + + processor.OnEnd(logRecord); + + state.Dispose(); + + Assert.Empty(exportedItems); + } } #endif From da9d6f47bb0e73ea02c774e87e2bedce1a1c4b00 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Wed, 16 Jul 2025 21:04:30 -0400 Subject: [PATCH 04/14] chore: Remove early exit of the exporter Allow for slow thread startup to process exports if the processor is shut down quickly --- src/OpenTelemetry/Internal/BatchExportTaskWorker.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs index dc11fa54045..76d56f08c89 100644 --- a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -17,7 +17,6 @@ internal sealed class BatchExportTaskWorker : BatchExportWorker private readonly TaskCompletionSource shutdownCompletionSource = new(); private TaskCompletionSource dataExportedNotification = new(); private Task? workerTask; - private volatile bool isShutdownRequested; private bool disposed; /// @@ -100,7 +99,6 @@ public override bool WaitForExport(int timeoutMilliseconds) public override bool Shutdown(int timeoutMilliseconds) { this.SetShutdownDrainTarget(this.circularBuffer.AddedCount); - this.isShutdownRequested = true; try { @@ -207,7 +205,7 @@ private async Task ExporterProcAsync() try { - while (!cancellationToken.IsCancellationRequested && !this.isShutdownRequested) + while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously if (this.circularBuffer.Count < this.maxExportBatchSize) From f9d705939768f3f4b497e4b4c2aa292d15bb5f01 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 07:49:01 -0400 Subject: [PATCH 05/14] chore: Adjust exit condition for ExporterProcAsync This will avoid exiting on an exceptional flow on Task.WhenAny --- src/OpenTelemetry/Internal/BatchExportTaskWorker.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs index 76d56f08c89..476c3ac3cee 100644 --- a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -237,7 +237,7 @@ await Task.WhenAny( previousTcs.TrySetResult(true); } - if (this.ShouldShutdown()) + if (this.ShouldShutdown() || cancellationToken.IsCancellationRequested) { break; } From 32c5eb3cfdb08390db84a51e9a0d4ffb7d53cb39 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 07:50:08 -0400 Subject: [PATCH 06/14] chore: Reorder test parameters in xml doc --- src/OpenTelemetry/BatchExportProcessor.cs | 2 +- .../Logs/Processor/BatchLogRecordExportProcessor.cs | 2 +- .../Metrics/Reader/PeriodicExportingMetricReader.cs | 4 ++-- .../Trace/Processor/BatchActivityExportProcessor.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 17495d2542f..4c68a5f5546 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -51,11 +51,11 @@ protected BatchExportProcessor( /// Initializes a new instance of the class. /// /// Exporter instance. - /// Enables the use of when true, when false. /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. /// How long the export can run before it is cancelled. The default value is 30000. /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + /// Enables the use of when true, when false. protected BatchExportProcessor( BaseExporter exporter, int maxQueueSize = DefaultMaxQueueSize, diff --git a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs index fa959cc9dd4..1888c60b948 100644 --- a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs @@ -39,11 +39,11 @@ public BatchLogRecordExportProcessor( /// Initializes a new instance of the class. /// /// Log record exporter. - /// Enables the use of when true, when false. /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. /// How long the export can run before it is cancelled. The default value is 30000. /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. + /// Enables the use of when true, when false. public BatchLogRecordExportProcessor( BaseExporter exporter, int maxQueueSize = DefaultMaxQueueSize, diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 2f482ac1f2d..8ceb16483f0 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -40,9 +40,9 @@ public PeriodicExportingMetricReader( /// Initializes a new instance of the class. /// /// Exporter instance to export Metrics to. - /// Enables the use of when true, when false. /// The interval in milliseconds between two consecutive exports. The default value is 60000. /// How long the export can run before it is cancelled. The default value is 30000. + /// Enables the use of when true, when false. public PeriodicExportingMetricReader( BaseExporter exporter, int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, @@ -105,7 +105,7 @@ protected override void Dispose(bool disposing) #pragma warning disable CA1859 // Change return type of method 'CreateWorker' from 'PeriodicExportingMetricReaderWorker' to 'PeriodicExportingMetricReaderThreadWorker' for improved performance private PeriodicExportingMetricReaderWorker CreateWorker() -#pragma warning disable CA1859 +#pragma warning restore CA1859 { #if NET // Use task-based worker for browser platform where threading may be limited diff --git a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs index 89760afcea9..41fd5fff2a7 100644 --- a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs +++ b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs @@ -39,11 +39,11 @@ public BatchActivityExportProcessor( /// Initializes a new instance of the class. /// /// - /// /// /// /// /// + /// public BatchActivityExportProcessor( BaseExporter exporter, int maxQueueSize = DefaultMaxQueueSize, From 8e6585af497f54a26b7bdc277417c27df4815bb8 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 07:52:28 -0400 Subject: [PATCH 07/14] chore: Use Task.WhenAny's output to avoid a double await Also enable tests for PeriodicExportingMetricReaderTaskWorker to improve coverage --- .../PeriodicExportingMetricReaderTaskWorker.cs | 11 +++++++---- .../PeriodicExportingMetricReaderHelperTests.cs | 2 +- .../Metrics/MetricPointReclaimTests.cs | 11 +++++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs index a119b8d446d..7af3ae8e252 100644 --- a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs @@ -127,10 +127,13 @@ private async Task ExporterProcAsync() { var timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); + var exportTriggerTask = this.exportTrigger.WaitAsync(cancellationToken); + Task? triggeredTask = null; + try { - await Task.WhenAny( - this.exportTrigger.WaitAsync(cancellationToken), + triggeredTask = await Task.WhenAny( + exportTriggerTask, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); } catch (OperationCanceledException) @@ -151,9 +154,9 @@ await Task.WhenAny( } // Check if the trigger was signaled by trying to acquire it with a timeout of 0 - var wasTriggered = await this.exportTrigger.WaitAsync(0, cancellationToken).ConfigureAwait(false); + var exportWasTriggered = triggeredTask == exportTriggerTask; - if (wasTriggered) + if (exportWasTriggered) { OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); } diff --git a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs index b04d0783ec4..4a375126cc0 100644 --- a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs +++ b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs @@ -149,6 +149,6 @@ private static PeriodicExportingMetricReader CreatePeriodicExportingMetricReader #pragma warning disable CA2000 // Dispose objects before losing scope var dummyMetricExporter = new InMemoryExporter(Array.Empty()); #pragma warning restore CA2000 // Dispose objects before losing scope - return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options, useThreads: true); + return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options, useThreads: useThreads); } } diff --git a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs index 3ffcd82d6d9..8352969c84d 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 using System.Diagnostics.Metrics; +using Microsoft.ApplicationInsights.Extensibility.Implementation; using OpenTelemetry.Tests; using Xunit; @@ -10,9 +11,11 @@ namespace OpenTelemetry.Metrics.Tests; public class MetricPointReclaimTests { [Theory] - [InlineData(false)] - [InlineData(true)] - public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions) + [InlineData(false, false)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions, bool useThreads) { using var meter = new Meter(Utils.GetCurrentMethodName()); var counter = meter.CreateCounter("MyFruitCounter"); @@ -21,7 +24,7 @@ public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions) int maxNumberofDistinctMetricPoints = 4000; // Default max MetricPoints * 2 using var exporter = new CustomExporter(assertNoDroppedMeasurements: true); - using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10) + using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10, useThreads: useThreads) { TemporalityPreference = MetricReaderTemporalityPreference.Delta, }; From 74bc141ccf16216732e7e7f993a5342ee92f421d Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 09:18:52 -0400 Subject: [PATCH 08/14] chore: Adjust volatile for potential cross-thread access In theory, there should not be multiple threads access to this field, but this won't hurt to add this qualifier. --- src/OpenTelemetry/Internal/BatchExportTaskWorker.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs index 476c3ac3cee..87dba034af5 100644 --- a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -15,7 +15,7 @@ internal sealed class BatchExportTaskWorker : BatchExportWorker private readonly CancellationTokenSource cancellationTokenSource = new(); private readonly SemaphoreSlim exportTrigger = new(0, 1); private readonly TaskCompletionSource shutdownCompletionSource = new(); - private TaskCompletionSource dataExportedNotification = new(); + private volatile TaskCompletionSource dataExportedNotification = new(); private Task? workerTask; private bool disposed; From 124f871778f6f6e8bb93f286f5bdc2a83d377fb5 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 09:19:27 -0400 Subject: [PATCH 09/14] chore: Remove isShutdownRequested It's a duplicate of cancellationToken.IsCancellationRequested --- .../PeriodicExportingMetricReaderTaskWorker.cs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs index 7af3ae8e252..5b5882ed3f0 100644 --- a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs @@ -15,7 +15,6 @@ internal sealed class PeriodicExportingMetricReaderTaskWorker : PeriodicExportin private readonly SemaphoreSlim exportTrigger = new(0, 1); private readonly TaskCompletionSource shutdownCompletionSource = new(); private Task? workerTask; - private volatile bool isShutdownRequested; private bool disposed; /// @@ -69,8 +68,6 @@ public override bool TriggerExport() /// public override bool Shutdown(int timeoutMilliseconds) { - this.isShutdownRequested = true; - try { this.cancellationTokenSource.Cancel(); @@ -123,7 +120,7 @@ private async Task ExporterProcAsync() try { - while (!cancellationToken.IsCancellationRequested && !this.isShutdownRequested) + while (!cancellationToken.IsCancellationRequested) { var timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); @@ -138,15 +135,10 @@ private async Task ExporterProcAsync() } catch (OperationCanceledException) { - if (this.isShutdownRequested) - { - break; - } - // Continue to check if shutdown was requested } - if (cancellationToken.IsCancellationRequested || this.isShutdownRequested) + if (cancellationToken.IsCancellationRequested) { OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); this.metricReader.Collect(this.exportTimeoutMilliseconds); From 554fe623a53bca134278c3d175315699dcafb127 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 09:19:48 -0400 Subject: [PATCH 10/14] chore; Explain the reason for the disabled warning --- .../Metrics/Reader/PeriodicExportingMetricReader.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 8ceb16483f0..7f49eae457e 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -103,7 +103,10 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } + // The pragma is required by the fact that this method is compiled on both .NET Framework and .NET Core, and the CA1859 warning is only relevant for .NET Framework. + // The warning suggests changing the return type to PeriodicExportingMetricReaderThreadWorker for improved performance, but we want to keep the method signature consistent across platforms. #pragma warning disable CA1859 // Change return type of method 'CreateWorker' from 'PeriodicExportingMetricReaderWorker' to 'PeriodicExportingMetricReaderThreadWorker' for improved performance + private PeriodicExportingMetricReaderWorker CreateWorker() #pragma warning restore CA1859 { From 823622a6301148973d906965e32f69ecf99e385d Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Thu, 17 Jul 2025 09:42:51 -0400 Subject: [PATCH 11/14] chore: Remove unused namespace --- test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs index 8352969c84d..9f12cbf3110 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 using System.Diagnostics.Metrics; -using Microsoft.ApplicationInsights.Extensibility.Implementation; using OpenTelemetry.Tests; using Xunit; From 8dcf5aa52bab7ead90fe8065ebf34c943d70957e Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Mon, 21 Jul 2025 23:20:29 -0400 Subject: [PATCH 12/14] chore: Move to BatchLogRecordExportProcessorOptions and PeriodicExportingMetricReaderOptions This avoids introducing a potentially ambiguous constructor, restores the original shipped API signatures. --- .../.publicApi/Stable/PublicAPI.Shipped.txt | 8 +-- .../.publicApi/Stable/PublicAPI.Unshipped.txt | 12 +++-- src/OpenTelemetry/BatchExportProcessor.cs | 52 ++++++++++--------- .../BatchExportProcessorOptions.cs | 5 ++ .../BatchLogRecordExportProcessor.cs | 41 ++++++--------- .../Reader/PeriodicExportingMetricReader.cs | 28 ++++++---- .../PeriodicExportingMetricReaderOptions.cs | 5 ++ .../Processor/BatchActivityExportProcessor.cs | 43 ++++++--------- .../PeriodicExportingMetricReaderHelper.cs | 13 ++--- .../BatchLogRecordExportProcessorTests.cs | 28 ++++++---- .../Metrics/MetricPointReclaimTests.cs | 8 ++- 11 files changed, 130 insertions(+), 113 deletions(-) diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt index 00fdde4298e..43b209cd7e6 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Shipped.txt @@ -31,9 +31,9 @@ OpenTelemetry.Batch.Enumerator.Enumerator() -> void OpenTelemetry.Batch.Enumerator.MoveNext() -> bool OpenTelemetry.Batch.Enumerator.Reset() -> void OpenTelemetry.BatchActivityExportProcessor -OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void OpenTelemetry.BatchExportProcessor -OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void OpenTelemetry.BatchExportProcessorOptions OpenTelemetry.BatchExportProcessorOptions.BatchExportProcessorOptions() -> void OpenTelemetry.BatchExportProcessorOptions.ExporterTimeoutMilliseconds.get -> int @@ -45,7 +45,7 @@ OpenTelemetry.BatchExportProcessorOptions.MaxQueueSize.set -> void OpenTelemetry.BatchExportProcessorOptions.ScheduledDelayMilliseconds.get -> int OpenTelemetry.BatchExportProcessorOptions.ScheduledDelayMilliseconds.set -> void OpenTelemetry.BatchLogRecordExportProcessor -OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize, int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds, int maxExportBatchSize) -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512) -> void OpenTelemetry.CompositeProcessor OpenTelemetry.CompositeProcessor.AddProcessor(OpenTelemetry.BaseProcessor! processor) -> OpenTelemetry.CompositeProcessor! OpenTelemetry.CompositeProcessor.CompositeProcessor(System.Collections.Generic.IEnumerable!>! processors) -> void @@ -256,7 +256,7 @@ OpenTelemetry.Metrics.MetricType.LongSum = 26 -> OpenTelemetry.Metrics.MetricTyp OpenTelemetry.Metrics.MetricType.LongSumNonMonotonic = 138 -> OpenTelemetry.Metrics.MetricType OpenTelemetry.Metrics.MetricTypeExtensions OpenTelemetry.Metrics.PeriodicExportingMetricReader -OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds, int exportTimeoutMilliseconds) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000) -> void OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.get -> int? OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds.set -> void diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index b005d35e671..50c99d5b1d4 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -1,4 +1,8 @@ -OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void -OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void -OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, int maxQueueSize = 2048, int scheduledDelayMilliseconds = 5000, int exporterTimeoutMilliseconds = 30000, int maxExportBatchSize = 512, bool useThreads = true) -> void -OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, int exportIntervalMilliseconds = 60000, int exportTimeoutMilliseconds = 30000, bool useThreads = true) -> void +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.BatchExportProcessorOptions.UseThreads.get -> bool +OpenTelemetry.BatchExportProcessorOptions.UseThreads.set -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions! options) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.get -> bool +OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.set -> void diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 4c68a5f5546..c302f252322 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -39,11 +39,18 @@ public abstract class BatchExportProcessor : BaseExportProcessor /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. protected BatchExportProcessor( BaseExporter exporter, - int maxQueueSize, - int scheduledDelayMilliseconds, - int exporterTimeoutMilliseconds, - int maxExportBatchSize) - : this(exporter, maxQueueSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds, maxExportBatchSize, true) + int maxQueueSize = DefaultMaxQueueSize, + int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, + int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, + int maxExportBatchSize = DefaultMaxExportBatchSize) + : this(exporter, new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) { } @@ -51,30 +58,25 @@ protected BatchExportProcessor( /// Initializes a new instance of the class. /// /// Exporter instance. - /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. - /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. - /// How long the export can run before it is cancelled. The default value is 30000. - /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. - /// Enables the use of when true, when false. + /// Configuration options for the batch export processor. protected BatchExportProcessor( BaseExporter exporter, - int maxQueueSize = DefaultMaxQueueSize, - int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, - int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize, - bool useThreads = true) + BatchExportProcessorOptions options) : base(exporter) { - Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); - Guard.ThrowIfOutOfRange(maxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(maxQueueSize)); - Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1); - Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0); - - this.circularBuffer = new CircularBuffer(maxQueueSize); - this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; - this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; - this.MaxExportBatchSize = maxExportBatchSize; - this.useThreads = useThreads; + Guard.ThrowIfNull(options); +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Guard.ThrowIfOutOfRange(options.MaxQueueSize, min: 1); +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Guard.ThrowIfOutOfRange(options.MaxExportBatchSize, min: 1, max: options.MaxQueueSize, maxName: nameof(options.MaxQueueSize)); + Guard.ThrowIfOutOfRange(options.ScheduledDelayMilliseconds, min: 1); + Guard.ThrowIfOutOfRange(options.ExporterTimeoutMilliseconds, min: 0); + + this.circularBuffer = new CircularBuffer(options.MaxQueueSize); + this.ScheduledDelayMilliseconds = options.ScheduledDelayMilliseconds; + this.ExporterTimeoutMilliseconds = options.ExporterTimeoutMilliseconds; + this.MaxExportBatchSize = options.MaxExportBatchSize; + this.useThreads = options.UseThreads; this.worker = this.CreateWorker(); this.worker.Start(); diff --git a/src/OpenTelemetry/BatchExportProcessorOptions.cs b/src/OpenTelemetry/BatchExportProcessorOptions.cs index 6695c0c6a56..ecd967f102c 100644 --- a/src/OpenTelemetry/BatchExportProcessorOptions.cs +++ b/src/OpenTelemetry/BatchExportProcessorOptions.cs @@ -29,4 +29,9 @@ public class BatchExportProcessorOptions /// Gets or sets the maximum batch size of every export. It must be smaller or equal to . The default value is 512. /// public int MaxExportBatchSize { get; set; } = BatchExportProcessor.DefaultMaxExportBatchSize; + + /// + /// Gets or sets a value indicating whether to use threads. Enables the use of when true, when false. The default value is true. + /// + public bool UseThreads { get; set; } = true; } diff --git a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs index 1888c60b948..9dceeefe119 100644 --- a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs @@ -21,17 +21,20 @@ public class BatchLogRecordExportProcessor : BatchExportProcessor /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. public BatchLogRecordExportProcessor( BaseExporter exporter, - int maxQueueSize, - int scheduledDelayMilliseconds, - int exporterTimeoutMilliseconds, - int maxExportBatchSize) + int maxQueueSize = DefaultMaxQueueSize, + int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, + int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, + int maxExportBatchSize = DefaultMaxExportBatchSize) : this( exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize, - true) + new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) { } @@ -39,25 +42,11 @@ public BatchLogRecordExportProcessor( /// Initializes a new instance of the class. /// /// Log record exporter. - /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. - /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. - /// How long the export can run before it is cancelled. The default value is 30000. - /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. - /// Enables the use of when true, when false. + /// Configuration options for the batch export processor. public BatchLogRecordExportProcessor( BaseExporter exporter, - int maxQueueSize = DefaultMaxQueueSize, - int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, - int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize, - bool useThreads = true) - : base( - exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize, - useThreads) + BatchExportProcessorOptions options) + : base(exporter, options) { } diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 7f49eae457e..be4f3e5d351 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -30,9 +30,14 @@ public class PeriodicExportingMetricReader : BaseExportingMetricReader /// How long the export can run before it is cancelled. The default value is 30000. public PeriodicExportingMetricReader( BaseExporter exporter, - int exportIntervalMilliseconds, - int exportTimeoutMilliseconds) - : this(exporter, exportIntervalMilliseconds, exportTimeoutMilliseconds, true) + int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, + int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) + : this(exporter, new PeriodicExportingMetricReaderOptions + { + ExportIntervalMilliseconds = exportIntervalMilliseconds, + ExportTimeoutMilliseconds = exportTimeoutMilliseconds, + UseThreads = true, + }) { } @@ -40,16 +45,19 @@ public PeriodicExportingMetricReader( /// Initializes a new instance of the class. /// /// Exporter instance to export Metrics to. - /// The interval in milliseconds between two consecutive exports. The default value is 60000. - /// How long the export can run before it is cancelled. The default value is 30000. - /// Enables the use of when true, when false. + /// Configuration options for the periodic exporting metric reader. public PeriodicExportingMetricReader( BaseExporter exporter, - int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, - int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds, - bool useThreads = true) + PeriodicExportingMetricReaderOptions options) : base(exporter) { + Guard.ThrowIfNull(options); + +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + var exportIntervalMilliseconds = options!.ExportIntervalMilliseconds ?? DefaultExportIntervalMilliseconds; +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + var exportTimeoutMilliseconds = options.ExportTimeoutMilliseconds ?? DefaultExportTimeoutMilliseconds; + Guard.ThrowIfInvalidTimeout(exportIntervalMilliseconds); Guard.ThrowIfZero(exportIntervalMilliseconds); Guard.ThrowIfInvalidTimeout(exportTimeoutMilliseconds); @@ -61,7 +69,7 @@ public PeriodicExportingMetricReader( this.ExportIntervalMilliseconds = exportIntervalMilliseconds; this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; - this.useThreads = useThreads; + this.useThreads = options.UseThreads; this.worker = this.CreateWorker(); this.worker.Start(); diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs index b10967f2dc0..75d8b1d829f 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs @@ -52,4 +52,9 @@ internal PeriodicExportingMetricReaderOptions(IConfiguration configuration) /// associated with the metric reader. /// public int? ExportTimeoutMilliseconds { get; set; } + + /// + /// Gets or sets a value indicating whether to use threads. Enables the use of when true, when false. The default value is true. + /// + public bool UseThreads { get; set; } = true; } diff --git a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs index 41fd5fff2a7..06f1be1489f 100644 --- a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs +++ b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs @@ -21,43 +21,32 @@ public class BatchActivityExportProcessor : BatchExportProcessor /// public BatchActivityExportProcessor( BaseExporter exporter, - int maxQueueSize, - int scheduledDelayMilliseconds, - int exporterTimeoutMilliseconds, - int maxExportBatchSize) + int maxQueueSize = DefaultMaxQueueSize, + int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, + int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, + int maxExportBatchSize = DefaultMaxExportBatchSize) : this( exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize, - true) + new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) { } /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// - /// - /// + /// Activity exporter. + /// Configuration options for the batch export processor. public BatchActivityExportProcessor( BaseExporter exporter, - int maxQueueSize = DefaultMaxQueueSize, - int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, - int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, - int maxExportBatchSize = DefaultMaxExportBatchSize, - bool useThreads = true) - : base( - exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize, - useThreads) + BatchExportProcessorOptions options) + : base(exporter, options) { } diff --git a/src/Shared/PeriodicExportingMetricReaderHelper.cs b/src/Shared/PeriodicExportingMetricReaderHelper.cs index 3ccdc0dc1db..e3e0fcbb65e 100644 --- a/src/Shared/PeriodicExportingMetricReaderHelper.cs +++ b/src/Shared/PeriodicExportingMetricReaderHelper.cs @@ -15,13 +15,14 @@ internal static PeriodicExportingMetricReader CreatePeriodicExportingMetricReade int defaultExportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds, bool useThreads = true) { - var exportInterval = - options.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds ?? defaultExportIntervalMilliseconds; - - var exportTimeout = - options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds; + var periodicOptions = new PeriodicExportingMetricReaderOptions + { + ExportIntervalMilliseconds = options.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds ?? defaultExportIntervalMilliseconds, + ExportTimeoutMilliseconds = options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds, + UseThreads = useThreads, + }; - var metricReader = new PeriodicExportingMetricReader(exporter, exportInterval, exportTimeout, useThreads) + var metricReader = new PeriodicExportingMetricReader(exporter, periodicOptions) { TemporalityPreference = options.TemporalityPreference, }; diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index 59f94848414..0efd8bc0dda 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -23,11 +23,15 @@ public void StateValuesAndScopeBufferingTest(bool useThread) #pragma warning disable CA2000 // Dispose objects before losing scope new InMemoryExporter(exportedItems), #pragma warning restore CA2000 // Dispose objects before losing scope - useThreads: useThread, - maxQueueSize: BatchLogRecordExportProcessor.DefaultMaxQueueSize, - maxExportBatchSize: BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, - exporterTimeoutMilliseconds: BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, - scheduledDelayMilliseconds: int.MaxValue); + new() + { + // Use the default values for the other parameters, but allow overriding useThreads + MaxQueueSize = BatchLogRecordExportProcessor.DefaultMaxQueueSize, + MaxExportBatchSize = BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + ExporterTimeoutMilliseconds = BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, + ScheduledDelayMilliseconds = int.MaxValue, + UseThreads = useThread, + }); using var scope = scopeProvider.Push(exportedItems); @@ -168,11 +172,15 @@ public void DisposeWithoutShutdown(bool useThread) #pragma warning disable CA2000 // Dispose objects before losing scope new InMemoryExporter(exportedItems), #pragma warning restore CA2000 // Dispose objects before losing scope - useThreads: useThread, - maxQueueSize: BatchLogRecordExportProcessor.DefaultMaxQueueSize, - maxExportBatchSize: BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, - exporterTimeoutMilliseconds: BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, - scheduledDelayMilliseconds: int.MaxValue); + new() + { + // Use the default values for the other parameters, but allow overriding useThreads + MaxQueueSize = BatchLogRecordExportProcessor.DefaultMaxQueueSize, + MaxExportBatchSize = BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + ExporterTimeoutMilliseconds = BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, + ScheduledDelayMilliseconds = int.MaxValue, + UseThreads = useThread, + }); processor.Dispose(); diff --git a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs index 9f12cbf3110..a3c9a88260f 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs @@ -23,7 +23,13 @@ public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions, bool useT int maxNumberofDistinctMetricPoints = 4000; // Default max MetricPoints * 2 using var exporter = new CustomExporter(assertNoDroppedMeasurements: true); - using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10, useThreads: useThreads) + using var metricReader = new PeriodicExportingMetricReader( + exporter, + new() + { + ExportIntervalMilliseconds = 10, + UseThreads = useThreads, + }) { TemporalityPreference = MetricReaderTemporalityPreference.Delta, }; From 276d81c5d5adc91a60d21c1246b9915fe7608ed4 Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Wed, 23 Jul 2025 21:58:00 -0400 Subject: [PATCH 13/14] chore: Adjust after review - Adjusts null propagation, naming, and comments. - Adjusts BatchExportTaskWorker and PeriodicExportingMetricReaderTaskWorker to start the Task with a known cancellation token --- src/OpenTelemetry/BatchExportProcessor.cs | 31 +++++----- .../BatchExportProcessorOptions.cs | 2 +- .../Internal/BatchExportTaskWorker.cs | 16 +++--- .../Internal/BatchExportThreadWorker.cs | 14 ++--- .../Internal/BatchExportWorker.cs | 57 +++++++++++++------ ...PeriodicExportingMetricReaderTaskWorker.cs | 8 +-- ...riodicExportingMetricReaderThreadWorker.cs | 8 +-- .../PeriodicExportingMetricReaderWorker.cs | 33 ++++++++--- .../Reader/PeriodicExportingMetricReader.cs | 15 ++--- .../PeriodicExportingMetricReaderOptions.cs | 2 +- ...eriodicExportingMetricReaderHelperTests.cs | 4 +- 11 files changed, 110 insertions(+), 80 deletions(-) diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index c302f252322..9f5715e58f3 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Diagnostics; using System.Runtime.CompilerServices; using OpenTelemetry.Internal; @@ -20,7 +19,6 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal const int DefaultMaxExportBatchSize = 512; internal readonly int MaxExportBatchSize; - internal readonly int ScheduledDelayMilliseconds; internal readonly int ExporterTimeoutMilliseconds; @@ -65,18 +63,19 @@ protected BatchExportProcessor( : base(exporter) { Guard.ThrowIfNull(options); -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Guard.ThrowIfOutOfRange(options.MaxQueueSize, min: 1); -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Guard.ThrowIfOutOfRange(options.MaxExportBatchSize, min: 1, max: options.MaxQueueSize, maxName: nameof(options.MaxQueueSize)); - Guard.ThrowIfOutOfRange(options.ScheduledDelayMilliseconds, min: 1); - Guard.ThrowIfOutOfRange(options.ExporterTimeoutMilliseconds, min: 0); - - this.circularBuffer = new CircularBuffer(options.MaxQueueSize); - this.ScheduledDelayMilliseconds = options.ScheduledDelayMilliseconds; - this.ExporterTimeoutMilliseconds = options.ExporterTimeoutMilliseconds; - this.MaxExportBatchSize = options.MaxExportBatchSize; - this.useThreads = options.UseThreads; + + var maxQueueSize = options?.MaxQueueSize ?? 0; + Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); + + this.circularBuffer = new CircularBuffer(maxQueueSize); + this.ScheduledDelayMilliseconds = options?.ScheduledDelayMilliseconds ?? 0; + this.ExporterTimeoutMilliseconds = options?.ExporterTimeoutMilliseconds ?? -1; + this.MaxExportBatchSize = options?.MaxExportBatchSize ?? 0; + this.useThreads = options?.UseThreads ?? true; + + Guard.ThrowIfOutOfRange(this.MaxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(options.MaxQueueSize)); + Guard.ThrowIfOutOfRange(this.ScheduledDelayMilliseconds, min: 1); + Guard.ThrowIfOutOfRange(this.ExporterTimeoutMilliseconds, min: 0); this.worker = this.CreateWorker(); this.worker.Start(); @@ -145,9 +144,7 @@ protected override bool OnShutdown(int timeoutMilliseconds) return this.exporter.Shutdown(0) && result; } - var sw = Stopwatch.StartNew(); - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; + return this.exporter.Shutdown(timeoutMilliseconds) && result; } /// diff --git a/src/OpenTelemetry/BatchExportProcessorOptions.cs b/src/OpenTelemetry/BatchExportProcessorOptions.cs index ecd967f102c..4383e1567a8 100644 --- a/src/OpenTelemetry/BatchExportProcessorOptions.cs +++ b/src/OpenTelemetry/BatchExportProcessorOptions.cs @@ -31,7 +31,7 @@ public class BatchExportProcessorOptions public int MaxExportBatchSize { get; set; } = BatchExportProcessor.DefaultMaxExportBatchSize; /// - /// Gets or sets a value indicating whether to use threads. Enables the use of when true, when false. The default value is true. + /// Gets or sets a value indicating whether to use threads. Enables the use of when ; otherwise is used. The default value is . /// public bool UseThreads { get; set; } = true; } diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs index 87dba034af5..f323f88f57c 100644 --- a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -42,7 +42,7 @@ public override void Start() { this.workerTask = Task.Factory.StartNew( this.ExporterProcAsync, - CancellationToken.None, + this.cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); } @@ -74,8 +74,8 @@ public override bool TriggerExport() /// public override bool WaitForExport(int timeoutMilliseconds) { - var tail = this.circularBuffer.RemovedCount; - var head = this.circularBuffer.AddedCount; + var tail = this.CircularBuffer.RemovedCount; + var head = this.CircularBuffer.AddedCount; if (head == tail) { @@ -98,7 +98,7 @@ public override bool WaitForExport(int timeoutMilliseconds) /// public override bool Shutdown(int timeoutMilliseconds) { - this.SetShutdownDrainTarget(this.circularBuffer.AddedCount); + this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount); try { @@ -160,7 +160,7 @@ private async Task WaitForExportAsync(int timeoutMilliseconds, long target var remaining = timeoutMilliseconds - sw.ElapsedMilliseconds; if (remaining <= 0) { - return this.circularBuffer.RemovedCount >= targetHead; + return this.CircularBuffer.RemovedCount >= targetHead; } timeout = Math.Min((int)remaining, pollingMilliseconds); @@ -187,7 +187,7 @@ await Task.WhenAny( // Expected when timeout or shutdown occurs } - if (this.circularBuffer.RemovedCount >= targetHead) + if (this.CircularBuffer.RemovedCount >= targetHead) { return true; } @@ -208,13 +208,13 @@ private async Task ExporterProcAsync() while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously - if (this.circularBuffer.Count < this.maxExportBatchSize) + if (this.CircularBuffer.Count < this.MaxExportBatchSize) { try { await Task.WhenAny( this.exportTrigger.WaitAsync(cancellationToken), - Task.Delay(this.scheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false); + Task.Delay(this.ScheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs index 95b3328406e..6731d3d4aaf 100644 --- a/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs @@ -66,8 +66,8 @@ public override bool TriggerExport() /// public override bool WaitForExport(int timeoutMilliseconds) { - var tail = this.circularBuffer.RemovedCount; - var head = this.circularBuffer.AddedCount; + var tail = this.CircularBuffer.RemovedCount; + var head = this.CircularBuffer.AddedCount; if (head == tail) { @@ -113,7 +113,7 @@ public override bool WaitForExport(int timeoutMilliseconds) if (timeout <= 0) { - return this.circularBuffer.RemovedCount >= head; + return this.CircularBuffer.RemovedCount >= head; } try @@ -126,7 +126,7 @@ public override bool WaitForExport(int timeoutMilliseconds) } } - if (this.circularBuffer.RemovedCount >= head) + if (this.CircularBuffer.RemovedCount >= head) { return true; } @@ -141,7 +141,7 @@ public override bool WaitForExport(int timeoutMilliseconds) /// public override bool Shutdown(int timeoutMilliseconds) { - this.SetShutdownDrainTarget(this.circularBuffer.AddedCount); + this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount); try { @@ -188,11 +188,11 @@ private void ExporterProc() while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously - if (this.circularBuffer.Count < this.maxExportBatchSize) + if (this.CircularBuffer.Count < this.MaxExportBatchSize) { try { - WaitHandle.WaitAny(triggers, this.scheduledDelayMilliseconds); + WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); } catch (ObjectDisposedException) { diff --git a/src/OpenTelemetry/Internal/BatchExportWorker.cs b/src/OpenTelemetry/Internal/BatchExportWorker.cs index 1ff5c08cdd5..e2b92023429 100644 --- a/src/OpenTelemetry/Internal/BatchExportWorker.cs +++ b/src/OpenTelemetry/Internal/BatchExportWorker.cs @@ -10,12 +10,6 @@ namespace OpenTelemetry.Internal; internal abstract class BatchExportWorker : IDisposable where T : class { - protected readonly CircularBuffer circularBuffer; - protected readonly BaseExporter exporter; - protected readonly int maxExportBatchSize; - protected readonly int scheduledDelayMilliseconds; - protected readonly int exporterTimeoutMilliseconds; - private long shutdownDrainTarget = long.MaxValue; private long droppedCount; @@ -34,11 +28,17 @@ protected BatchExportWorker( int scheduledDelayMilliseconds, int exporterTimeoutMilliseconds) { - this.circularBuffer = circularBuffer; - this.exporter = exporter; - this.maxExportBatchSize = maxExportBatchSize; - this.scheduledDelayMilliseconds = scheduledDelayMilliseconds; - this.exporterTimeoutMilliseconds = exporterTimeoutMilliseconds; + this.CircularBuffer = circularBuffer; + this.Exporter = exporter; + this.MaxExportBatchSize = maxExportBatchSize; + this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; + this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; + } + + ~BatchExportWorker() + { + // Finalizer to ensure resources are cleaned up if Dispose is not called + this.Dispose(false); } /// @@ -46,6 +46,31 @@ protected BatchExportWorker( /// internal long DroppedCount => Volatile.Read(ref this.droppedCount); + /// + /// Gets the circular buffer for storing telemetry objects. + /// + protected CircularBuffer CircularBuffer { get; } + + /// + /// Gets the exporter instance. + /// + protected BaseExporter Exporter { get; } + + /// + /// Gets the maximum batch size for exports. + /// + protected int MaxExportBatchSize { get; } + + /// + /// Gets the delay between exports in milliseconds. + /// + protected int ScheduledDelayMilliseconds { get; } + + /// + /// Gets the timeout for export operations in milliseconds. + /// + protected int ExporterTimeoutMilliseconds { get; } + /// /// Gets the shutdown drain target. /// @@ -59,7 +84,7 @@ protected BatchExportWorker( /// /// Triggers an export operation. /// - /// True if the export was triggered successfully; otherwise, false. + /// if the export was triggered successfully; otherwise, . public abstract bool TriggerExport(); /// @@ -105,11 +130,11 @@ protected void SetShutdownDrainTarget(long target) /// protected void PerformExport() { - if (this.circularBuffer.Count > 0) + if (this.CircularBuffer.Count > 0) { - using (var batch = new Batch(this.circularBuffer, this.maxExportBatchSize)) + using (var batch = new Batch(this.CircularBuffer, this.MaxExportBatchSize)) { - this.exporter.Export(batch); + this.Exporter.Export(batch); } } } @@ -120,7 +145,7 @@ protected void PerformExport() /// True if shutdown should occur; otherwise, false. protected bool ShouldShutdown() { - return this.circularBuffer.RemovedCount >= this.ShutdownDrainTarget; + return this.CircularBuffer.RemovedCount >= this.ShutdownDrainTarget; } /// diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs index 5b5882ed3f0..63cc9eba923 100644 --- a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs @@ -36,7 +36,7 @@ public override void Start() { this.workerTask = Task.Factory.StartNew( this.ExporterProcAsync, - CancellationToken.None, + this.cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); } @@ -122,7 +122,7 @@ private async Task ExporterProcAsync() { while (!cancellationToken.IsCancellationRequested) { - var timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); + var timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); var exportTriggerTask = this.exportTrigger.WaitAsync(cancellationToken); Task? triggeredTask = null; @@ -141,7 +141,7 @@ private async Task ExporterProcAsync() if (cancellationToken.IsCancellationRequested) { OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); - this.metricReader.Collect(this.exportTimeoutMilliseconds); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); break; } @@ -157,7 +157,7 @@ private async Task ExporterProcAsync() OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); } - this.metricReader.Collect(this.exportTimeoutMilliseconds); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); } } catch (Exception ex) diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs index f04c56c3565..5dffebcd2d9 100644 --- a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs @@ -109,7 +109,7 @@ private void ExporterProc() while (true) { - timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds)); + timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); try { @@ -124,15 +124,15 @@ private void ExporterProc() { case 0: // export OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); - this.metricReader.Collect(this.exportTimeoutMilliseconds); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); break; case 1: // shutdown OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); - this.metricReader.Collect(this.exportTimeoutMilliseconds); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); return; case WaitHandle.WaitTimeout: // timer OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); - this.metricReader.Collect(this.exportTimeoutMilliseconds); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); break; } } diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs index a5ff61cc84e..7af5538e5c1 100644 --- a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs @@ -10,10 +10,6 @@ namespace OpenTelemetry.Internal; /// internal abstract class PeriodicExportingMetricReaderWorker : IDisposable { - protected readonly BaseExportingMetricReader metricReader; - protected readonly int exportIntervalMilliseconds; - protected readonly int exportTimeoutMilliseconds; - /// /// Initializes a new instance of the class. /// @@ -25,11 +21,32 @@ protected PeriodicExportingMetricReaderWorker( int exportIntervalMilliseconds, int exportTimeoutMilliseconds) { - this.metricReader = metricReader; - this.exportIntervalMilliseconds = exportIntervalMilliseconds; - this.exportTimeoutMilliseconds = exportTimeoutMilliseconds; + this.MetricReader = metricReader; + this.ExportIntervalMilliseconds = exportIntervalMilliseconds; + this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; + } + + ~PeriodicExportingMetricReaderWorker() + { + // Finalizer to ensure resources are cleaned up if Dispose is not called + this.Dispose(false); } + /// + /// Gets the metric reader instance. + /// + protected BaseExportingMetricReader MetricReader { get; } + + /// + /// Gets he interval in milliseconds between two consecutive exports. + /// + protected int ExportIntervalMilliseconds { get; } + + /// + /// Gets how long the export can run before it is cancelled. + /// + protected int ExportTimeoutMilliseconds { get; } + /// /// Starts the worker. /// @@ -38,7 +55,7 @@ protected PeriodicExportingMetricReaderWorker( /// /// Triggers an export operation. /// - /// True if the export was triggered successfully; otherwise, false. + /// if the shutdown completed within the timeout; otherwise, . public abstract bool TriggerExport(); /// diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index be4f3e5d351..1f760d12f65 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Diagnostics; using OpenTelemetry.Internal; namespace OpenTelemetry.Metrics; @@ -53,10 +52,8 @@ public PeriodicExportingMetricReader( { Guard.ThrowIfNull(options); -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - var exportIntervalMilliseconds = options!.ExportIntervalMilliseconds ?? DefaultExportIntervalMilliseconds; -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - var exportTimeoutMilliseconds = options.ExportTimeoutMilliseconds ?? DefaultExportTimeoutMilliseconds; + var exportIntervalMilliseconds = options?.ExportIntervalMilliseconds ?? DefaultExportIntervalMilliseconds; + var exportTimeoutMilliseconds = options?.ExportTimeoutMilliseconds ?? DefaultExportTimeoutMilliseconds; Guard.ThrowIfInvalidTimeout(exportIntervalMilliseconds); Guard.ThrowIfZero(exportIntervalMilliseconds); @@ -69,7 +66,7 @@ public PeriodicExportingMetricReader( this.ExportIntervalMilliseconds = exportIntervalMilliseconds; this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; - this.useThreads = options.UseThreads; + this.useThreads = options?.UseThreads ?? false; this.worker = this.CreateWorker(); this.worker.Start(); @@ -90,9 +87,7 @@ protected override bool OnShutdown(int timeoutMilliseconds) return this.exporter.Shutdown(0) && result; } - var sw = Stopwatch.StartNew(); - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; + return this.exporter.Shutdown(timeoutMilliseconds) && result; } /// @@ -111,8 +106,6 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - // The pragma is required by the fact that this method is compiled on both .NET Framework and .NET Core, and the CA1859 warning is only relevant for .NET Framework. - // The warning suggests changing the return type to PeriodicExportingMetricReaderThreadWorker for improved performance, but we want to keep the method signature consistent across platforms. #pragma warning disable CA1859 // Change return type of method 'CreateWorker' from 'PeriodicExportingMetricReaderWorker' to 'PeriodicExportingMetricReaderThreadWorker' for improved performance private PeriodicExportingMetricReaderWorker CreateWorker() diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs index 75d8b1d829f..300c33c23a8 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs @@ -54,7 +54,7 @@ internal PeriodicExportingMetricReaderOptions(IConfiguration configuration) public int? ExportTimeoutMilliseconds { get; set; } /// - /// Gets or sets a value indicating whether to use threads. Enables the use of when true, when false. The default value is true. + /// Gets or sets a value indicating whether to use threads. Enables the use of when ; otherwise is used. The default value is . /// public bool UseThreads { get; set; } = true; } diff --git a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs index 4a375126cc0..85538c020bd 100644 --- a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs +++ b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs @@ -36,9 +36,7 @@ public void CreatePeriodicExportingMetricReader_Defaults() [Fact] public void CreatePeriodicExportingMetricReader_Defaults_WithTask() { -#pragma warning disable CA2000 // Dispose objects before losing scope - var reader = CreatePeriodicExportingMetricReader(useThreads: false); -#pragma warning restore CA2000 // Dispose objects before losing scope + using var reader = CreatePeriodicExportingMetricReader(useThreads: false); Assert.Equal(60000, reader.ExportIntervalMilliseconds); Assert.Equal(30000, reader.ExportTimeoutMilliseconds); From 36aa05a2ac57dba3bc7e4c80fbb0a970fe0f103d Mon Sep 17 00:00:00 2001 From: Jerome Laban Date: Tue, 9 Sep 2025 22:10:55 -0400 Subject: [PATCH 14/14] chore: use IsThreadingDisabled instead of browser only --- src/OpenTelemetry/BatchExportProcessor.cs | 2 +- src/OpenTelemetry/Internal/ThreadingHelper.cs | 15 +++++++++++++++ .../Reader/PeriodicExportingMetricReader.cs | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 src/OpenTelemetry/Internal/ThreadingHelper.cs diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 9f5715e58f3..a9399447eab 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -167,7 +167,7 @@ private BatchExportWorker CreateWorker() { #if NET // Use task-based worker for browser platform where threading may be limited - if (OperatingSystem.IsBrowser() || !this.useThreads) + if (ThreadingHelper.IsThreadingDisabled() || !this.useThreads) { return new BatchExportTaskWorker( this.circularBuffer, diff --git a/src/OpenTelemetry/Internal/ThreadingHelper.cs b/src/OpenTelemetry/Internal/ThreadingHelper.cs new file mode 100644 index 00000000000..422bbd1ab34 --- /dev/null +++ b/src/OpenTelemetry/Internal/ThreadingHelper.cs @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Internal; + +internal class ThreadingHelper +{ + internal static bool IsThreadingDisabled() + { + // if the threadpool isn't using threads assume they aren't enabled + ThreadPool.GetMaxThreads(out int workerThreads, out int completionPortThreads); + + return workerThreads == 1 && completionPortThreads == 1; + } +} diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 1f760d12f65..74109eee7f0 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -113,7 +113,7 @@ private PeriodicExportingMetricReaderWorker CreateWorker() { #if NET // Use task-based worker for browser platform where threading may be limited - if (OperatingSystem.IsBrowser() || !this.useThreads) + if (ThreadingHelper.IsThreadingDisabled() || !this.useThreads) { return new PeriodicExportingMetricReaderTaskWorker( this,