diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index e69de29bb2d..50c99d5b1d4 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -0,0 +1,8 @@ +OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.BatchExportProcessor.BatchExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.BatchExportProcessorOptions.UseThreads.get -> bool +OpenTelemetry.BatchExportProcessorOptions.UseThreads.set -> void +OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.BatchExportProcessorOptions! options) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter! exporter, OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions! options) -> void +OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.get -> bool +OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions.UseThreads.set -> void diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index 46a64b8e042..a9399447eab 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Diagnostics; using System.Runtime.CompilerServices; using OpenTelemetry.Internal; @@ -24,12 +23,8 @@ public abstract class BatchExportProcessor : BaseExportProcessor internal readonly int ExporterTimeoutMilliseconds; private readonly CircularBuffer circularBuffer; - private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent dataExportedNotification = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); - private long shutdownDrainTarget = long.MaxValue; - private long droppedCount; + private readonly BatchExportWorker worker; + private readonly bool useThreads; private bool disposed; /// @@ -46,31 +41,50 @@ protected BatchExportProcessor( int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) + : this(exporter, new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance. + /// Configuration options for the batch export processor. + protected BatchExportProcessor( + BaseExporter exporter, + BatchExportProcessorOptions options) : base(exporter) { + Guard.ThrowIfNull(options); + + var maxQueueSize = options?.MaxQueueSize ?? 0; Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); - Guard.ThrowIfOutOfRange(maxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(maxQueueSize)); - Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1); - Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0); this.circularBuffer = new CircularBuffer(maxQueueSize); - this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; - this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; - this.MaxExportBatchSize = maxExportBatchSize; - this.exporterThread = new Thread(this.ExporterProc) - { - IsBackground = true, -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - }; - this.exporterThread.Start(); + this.ScheduledDelayMilliseconds = options?.ScheduledDelayMilliseconds ?? 0; + this.ExporterTimeoutMilliseconds = options?.ExporterTimeoutMilliseconds ?? -1; + this.MaxExportBatchSize = options?.MaxExportBatchSize ?? 0; + this.useThreads = options?.UseThreads ?? true; + + Guard.ThrowIfOutOfRange(this.MaxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(options.MaxQueueSize)); + Guard.ThrowIfOutOfRange(this.ScheduledDelayMilliseconds, min: 1); + Guard.ThrowIfOutOfRange(this.ExporterTimeoutMilliseconds, min: 0); + + this.worker = this.CreateWorker(); + this.worker.Start(); } /// /// Gets the number of telemetry objects dropped by the processor. /// - internal long DroppedCount => Volatile.Read(ref this.droppedCount); + internal long DroppedCount => this.worker.DroppedCount; /// /// Gets the number of telemetry objects received by the processor. @@ -89,20 +103,14 @@ internal bool TryExport(T data) { if (this.circularBuffer.Count >= this.MaxExportBatchSize) { - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - } + this.worker.TriggerExport(); } return true; // enqueue succeeded } // either the queue is full or exceeded the spin limit, drop the item on the floor - Interlocked.Increment(ref this.droppedCount); + this.worker.IncrementDroppedCount(); return false; } @@ -116,113 +124,27 @@ protected override void OnExport(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) { - var tail = this.circularBuffer.RemovedCount; - var head = this.circularBuffer.AddedCount; - - if (head == tail) - { - return true; // nothing to flush - } - - try - { - this.exportTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } - - if (timeoutMilliseconds == 0) - { - return false; - } - - var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; - - var sw = timeoutMilliseconds == Timeout.Infinite - ? null - : Stopwatch.StartNew(); - - // There is a chance that the export thread finished processing all the data from the queue, - // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. - const int pollingMilliseconds = 1000; - - while (true) - { - if (sw == null) - { - try - { - WaitHandle.WaitAny(triggers, pollingMilliseconds); - } - catch (ObjectDisposedException) - { - return false; - } - } - else - { - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - - if (timeout <= 0) - { - return this.circularBuffer.RemovedCount >= head; - } - - try - { - WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); - } - catch (ObjectDisposedException) - { - return false; - } - } - - if (this.circularBuffer.RemovedCount >= head) - { - return true; - } - - if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) - { - return false; - } - } + return this.worker.WaitForExport(timeoutMilliseconds); } /// protected override bool OnShutdown(int timeoutMilliseconds) { - Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); - - try - { - this.shutdownTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } + var result = this.worker.Shutdown(timeoutMilliseconds); OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); if (timeoutMilliseconds == Timeout.Infinite) { - this.exporterThread.Join(); - return this.exporter.Shutdown(); + return this.exporter.Shutdown() && result; } if (timeoutMilliseconds == 0) { - return this.exporter.Shutdown(0); + return this.exporter.Shutdown(0) && result; } - var sw = Stopwatch.StartNew(); - this.exporterThread.Join(timeoutMilliseconds); - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - return this.exporter.Shutdown((int)Math.Max(timeout, 0)); + return this.exporter.Shutdown(timeoutMilliseconds) && result; } /// @@ -232,9 +154,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - this.exportTrigger.Dispose(); - this.dataExportedNotification.Dispose(); - this.shutdownTrigger.Dispose(); + this.worker?.Dispose(); } this.disposed = true; @@ -243,49 +163,27 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private void ExporterProc() + private BatchExportWorker CreateWorker() { - var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; - - while (true) +#if NET + // Use task-based worker for browser platform where threading may be limited + if (ThreadingHelper.IsThreadingDisabled() || !this.useThreads) { - // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously - if (this.circularBuffer.Count < this.MaxExportBatchSize) - { - try - { - WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); - } - catch (ObjectDisposedException) - { - // the exporter is somehow disposed before the worker thread could finish its job - return; - } - } - - if (this.circularBuffer.Count > 0) - { - using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) - { - this.exporter.Export(batch); - } - - try - { - this.dataExportedNotification.Set(); - this.dataExportedNotification.Reset(); - } - catch (ObjectDisposedException) - { - // the exporter is somehow disposed before the worker thread could finish its job - return; - } - } - - if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) - { - return; - } + return new BatchExportTaskWorker( + this.circularBuffer, + this.exporter, + this.MaxExportBatchSize, + this.ScheduledDelayMilliseconds, + this.ExporterTimeoutMilliseconds); } +#endif + + // Use thread-based worker for all other platforms + return new BatchExportThreadWorker( + this.circularBuffer, + this.exporter, + this.MaxExportBatchSize, + this.ScheduledDelayMilliseconds, + this.ExporterTimeoutMilliseconds); } } diff --git a/src/OpenTelemetry/BatchExportProcessorOptions.cs b/src/OpenTelemetry/BatchExportProcessorOptions.cs index 6695c0c6a56..4383e1567a8 100644 --- a/src/OpenTelemetry/BatchExportProcessorOptions.cs +++ b/src/OpenTelemetry/BatchExportProcessorOptions.cs @@ -29,4 +29,9 @@ public class BatchExportProcessorOptions /// Gets or sets the maximum batch size of every export. It must be smaller or equal to . The default value is 512. /// public int MaxExportBatchSize { get; set; } = BatchExportProcessor.DefaultMaxExportBatchSize; + + /// + /// Gets or sets a value indicating whether to use threads. Enables the use of when ; otherwise is used. The default value is . + /// + public bool UseThreads { get; set; } = true; } diff --git a/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs new file mode 100644 index 00000000000..f323f88f57c --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportTaskWorker.cs @@ -0,0 +1,256 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; + +namespace OpenTelemetry.Internal; + +/// +/// Task-based implementation of batch export worker for environments where threading may be limited. +/// +/// The type of telemetry object to be exported. +internal sealed class BatchExportTaskWorker : BatchExportWorker + where T : class +{ + private readonly CancellationTokenSource cancellationTokenSource = new(); + private readonly SemaphoreSlim exportTrigger = new(0, 1); + private readonly TaskCompletionSource shutdownCompletionSource = new(); + private volatile TaskCompletionSource dataExportedNotification = new(); + private Task? workerTask; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + public BatchExportTaskWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + : base(circularBuffer, exporter, maxExportBatchSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds) + { + } + + /// + public override void Start() + { + this.workerTask = Task.Factory.StartNew( + this.ExporterProcAsync, + this.cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap(); + } + + /// + public override bool TriggerExport() + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return false; + } + + try + { + this.exportTrigger.Release(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + catch (SemaphoreFullException) + { + // Semaphore is already signaled, export is pending + return true; + } + } + + /// + public override bool WaitForExport(int timeoutMilliseconds) + { + var tail = this.CircularBuffer.RemovedCount; + var head = this.CircularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + if (!this.TriggerExport()) + { + return false; + } + + if (timeoutMilliseconds == 0) + { + return false; + } + + return this.WaitForExportAsync(timeoutMilliseconds, head).GetAwaiter().GetResult(); + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount); + + try + { + this.cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (this.workerTask == null) + { + return true; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.workerTask.Wait(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.workerTask.Wait(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!this.disposed) + { + this.disposed = true; + + this.cancellationTokenSource.Dispose(); + this.exportTrigger.Dispose(); + } + } + + private async Task WaitForExportAsync(int timeoutMilliseconds, long targetHead) + { + var sw = timeoutMilliseconds == Timeout.Infinite + ? null + : Stopwatch.StartNew(); + + // There is a chance that the export task finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMilliseconds = 1000; + + while (true) + { + var timeout = pollingMilliseconds; + if (sw != null) + { + var remaining = timeoutMilliseconds - sw.ElapsedMilliseconds; + if (remaining <= 0) + { + return this.CircularBuffer.RemovedCount >= targetHead; + } + + timeout = Math.Min((int)remaining, pollingMilliseconds); + } + + try + { + using var cts = new CancellationTokenSource(timeout); + using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( + cts.Token, + this.cancellationTokenSource.Token); + + await Task.WhenAny( + this.dataExportedNotification.Task, + this.shutdownCompletionSource.Task, + Task.Delay(timeout, combinedTokenSource.Token)).ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + return false; // The worker has been disposed + } + catch (OperationCanceledException) + { + // Expected when timeout or shutdown occurs + } + + if (this.CircularBuffer.RemovedCount >= targetHead) + { + return true; + } + + if (this.ShutdownDrainTarget != long.MaxValue) + { + return false; + } + } + } + + private async Task ExporterProcAsync() + { + var cancellationToken = this.cancellationTokenSource.Token; + + try + { + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.CircularBuffer.Count < this.MaxExportBatchSize) + { + try + { + await Task.WhenAny( + this.exportTrigger.WaitAsync(cancellationToken), + Task.Delay(this.ScheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Continue to check if there's data to export before exiting + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + this.PerformExport(); + + // Signal that data has been exported + var previousTcs = this.dataExportedNotification; + var newTcs = new TaskCompletionSource(); + if (Interlocked.CompareExchange(ref this.dataExportedNotification, newTcs, previousTcs) == previousTcs) + { + previousTcs.TrySetResult(true); + } + + if (this.ShouldShutdown() || cancellationToken.IsCancellationRequested) + { + break; + } + } + } + catch (Exception ex) + { + // Log the exception if needed + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExporterProcAsync), ex); + } + finally + { + this.shutdownCompletionSource.TrySetResult(true); + } + } +} diff --git a/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs new file mode 100644 index 00000000000..6731d3d4aaf --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportThreadWorker.cs @@ -0,0 +1,223 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; + +namespace OpenTelemetry.Internal; + +/// +/// Thread-based implementation of batch export worker. +/// +/// The type of telemetry object to be exported. +internal sealed class BatchExportThreadWorker : BatchExportWorker + where T : class +{ + private readonly Thread exporterThread; + private readonly AutoResetEvent exportTrigger = new(false); + private readonly ManualResetEvent dataExportedNotification = new(false); + private readonly ManualResetEvent shutdownTrigger = new(false); + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + public BatchExportThreadWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + : base(circularBuffer, exporter, maxExportBatchSize, scheduledDelayMilliseconds, exporterTimeoutMilliseconds) + { + this.exporterThread = new Thread(this.ExporterProc) + { + IsBackground = true, +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + }; + } + + /// + public override void Start() + { + this.exporterThread.Start(); + } + + /// + public override bool TriggerExport() + { + try + { + this.exportTrigger.Set(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + } + + /// + public override bool WaitForExport(int timeoutMilliseconds) + { + var tail = this.CircularBuffer.RemovedCount; + var head = this.CircularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + if (!this.TriggerExport()) + { + return false; + } + + if (timeoutMilliseconds == 0) + { + return false; + } + + var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; + + var sw = timeoutMilliseconds == Timeout.Infinite + ? null + : Stopwatch.StartNew(); + + // There is a chance that the export thread finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMilliseconds = 1000; + + while (true) + { + if (sw == null) + { + try + { + WaitHandle.WaitAny(triggers, pollingMilliseconds); + } + catch (ObjectDisposedException) + { + return false; + } + } + else + { + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return this.CircularBuffer.RemovedCount >= head; + } + + try + { + WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); + } + catch (ObjectDisposedException) + { + return false; + } + } + + if (this.CircularBuffer.RemovedCount >= head) + { + return true; + } + + if (this.ShutdownDrainTarget != long.MaxValue) + { + return false; + } + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount); + + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.exporterThread.Join(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.exporterThread.Join(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!this.disposed) + { + this.disposed = true; + + this.exportTrigger.Dispose(); + this.dataExportedNotification.Dispose(); + this.shutdownTrigger.Dispose(); + } + } + + private void ExporterProc() + { + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.CircularBuffer.Count < this.MaxExportBatchSize) + { + try + { + WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + } + + this.PerformExport(); + + try + { + this.dataExportedNotification.Set(); + this.dataExportedNotification.Reset(); + } + catch (ObjectDisposedException) + { + // the exporter is somehow disposed before the worker thread could finish its job + return; + } + + if (this.ShouldShutdown()) + { + return; + } + } + } +} diff --git a/src/OpenTelemetry/Internal/BatchExportWorker.cs b/src/OpenTelemetry/Internal/BatchExportWorker.cs new file mode 100644 index 00000000000..e2b92023429 --- /dev/null +++ b/src/OpenTelemetry/Internal/BatchExportWorker.cs @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Internal; + +/// +/// Abstract base class for batch export workers that handle the threading and synchronization logic for batch export processors. +/// +/// The type of telemetry object to be exported. +internal abstract class BatchExportWorker : IDisposable + where T : class +{ + private long shutdownDrainTarget = long.MaxValue; + private long droppedCount; + + /// + /// Initializes a new instance of the class. + /// + /// The circular buffer for storing telemetry objects. + /// The exporter instance. + /// The maximum batch size for exports. + /// The delay between exports in milliseconds. + /// The timeout for export operations in milliseconds. + protected BatchExportWorker( + CircularBuffer circularBuffer, + BaseExporter exporter, + int maxExportBatchSize, + int scheduledDelayMilliseconds, + int exporterTimeoutMilliseconds) + { + this.CircularBuffer = circularBuffer; + this.Exporter = exporter; + this.MaxExportBatchSize = maxExportBatchSize; + this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds; + this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds; + } + + ~BatchExportWorker() + { + // Finalizer to ensure resources are cleaned up if Dispose is not called + this.Dispose(false); + } + + /// + /// Gets the number of telemetry objects dropped by the processor. + /// + internal long DroppedCount => Volatile.Read(ref this.droppedCount); + + /// + /// Gets the circular buffer for storing telemetry objects. + /// + protected CircularBuffer CircularBuffer { get; } + + /// + /// Gets the exporter instance. + /// + protected BaseExporter Exporter { get; } + + /// + /// Gets the maximum batch size for exports. + /// + protected int MaxExportBatchSize { get; } + + /// + /// Gets the delay between exports in milliseconds. + /// + protected int ScheduledDelayMilliseconds { get; } + + /// + /// Gets the timeout for export operations in milliseconds. + /// + protected int ExporterTimeoutMilliseconds { get; } + + /// + /// Gets the shutdown drain target. + /// + protected long ShutdownDrainTarget => Volatile.Read(ref this.shutdownDrainTarget); + + /// + /// Starts the worker. + /// + public abstract void Start(); + + /// + /// Triggers an export operation. + /// + /// if the export was triggered successfully; otherwise, . + public abstract bool TriggerExport(); + + /// + /// Waits for export to complete. + /// + /// The timeout in milliseconds. + /// True if the export completed within the timeout; otherwise, false. + public abstract bool WaitForExport(int timeoutMilliseconds); + + /// + /// Initiates shutdown and waits for completion. + /// + /// The timeout in milliseconds. + /// True if shutdown completed within the timeout; otherwise, false. + public abstract bool Shutdown(int timeoutMilliseconds); + + /// + /// Increments the dropped count. + /// + public void IncrementDroppedCount() + { + Interlocked.Increment(ref this.droppedCount); + } + + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Sets the shutdown drain target. + /// + /// The target count. + protected void SetShutdownDrainTarget(long target) + { + Volatile.Write(ref this.shutdownDrainTarget, target); + } + + /// + /// Performs the export operation. + /// + protected void PerformExport() + { + if (this.CircularBuffer.Count > 0) + { + using (var batch = new Batch(this.CircularBuffer, this.MaxExportBatchSize)) + { + this.Exporter.Export(batch); + } + } + } + + /// + /// Checks if shutdown should occur. + /// + /// True if shutdown should occur; otherwise, false. + protected bool ShouldShutdown() + { + return this.CircularBuffer.RemovedCount >= this.ShutdownDrainTarget; + } + + /// + /// Releases the unmanaged resources used by this class and optionally releases the managed resources. + /// + /// True to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs new file mode 100644 index 00000000000..63cc9eba923 --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs @@ -0,0 +1,173 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Task-based implementation of periodic exporting metric reader worker for environments where threading may be limited. +/// +internal sealed class PeriodicExportingMetricReaderTaskWorker : PeriodicExportingMetricReaderWorker +{ + private readonly CancellationTokenSource cancellationTokenSource = new(); + private readonly SemaphoreSlim exportTrigger = new(0, 1); + private readonly TaskCompletionSource shutdownCompletionSource = new(); + private Task? workerTask; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + public PeriodicExportingMetricReaderTaskWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + : base(metricReader, exportIntervalMilliseconds, exportTimeoutMilliseconds) + { + } + + /// + public override void Start() + { + this.workerTask = Task.Factory.StartNew( + this.ExporterProcAsync, + this.cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap(); + } + + /// + public override bool TriggerExport() + { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return false; + } + + try + { + this.exportTrigger.Release(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + catch (SemaphoreFullException) + { + // Semaphore is already signaled, export is pending + return true; + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + try + { + this.cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (this.workerTask == null) + { + return true; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.workerTask.Wait(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.workerTask.Wait(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.cancellationTokenSource.Dispose(); + this.exportTrigger.Dispose(); + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private async Task ExporterProcAsync() + { + var cancellationToken = this.cancellationTokenSource.Token; + var sw = Stopwatch.StartNew(); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + var timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); + + var exportTriggerTask = this.exportTrigger.WaitAsync(cancellationToken); + Task? triggeredTask = null; + + try + { + triggeredTask = await Task.WhenAny( + exportTriggerTask, + Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Continue to check if shutdown was requested + } + + if (cancellationToken.IsCancellationRequested) + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); + break; + } + + // Check if the trigger was signaled by trying to acquire it with a timeout of 0 + var exportWasTriggered = triggeredTask == exportTriggerTask; + + if (exportWasTriggered) + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); + } + else + { + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); + } + + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); + } + } + catch (Exception ex) + { + // Log the exception if needed + OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExporterProcAsync), ex); + } + finally + { + this.shutdownCompletionSource.TrySetResult(true); + } + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs new file mode 100644 index 00000000000..5dffebcd2d9 --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Thread-based implementation of periodic exporting metric reader worker. +/// +internal sealed class PeriodicExportingMetricReaderThreadWorker : PeriodicExportingMetricReaderWorker +{ + private readonly Thread exporterThread; + private readonly AutoResetEvent exportTrigger = new(false); + private readonly ManualResetEvent shutdownTrigger = new(false); + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + public PeriodicExportingMetricReaderThreadWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + : base(metricReader, exportIntervalMilliseconds, exportTimeoutMilliseconds) + { + this.exporterThread = new Thread(this.ExporterProc) + { + IsBackground = true, +#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 + Name = $"OpenTelemetry-{nameof(PeriodicExportingMetricReader)}-{metricReader.GetType().Name}", +#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 + }; + } + + /// + public override void Start() + { + this.exporterThread.Start(); + } + + /// + public override bool TriggerExport() + { + try + { + this.exportTrigger.Set(); + return true; + } + catch (ObjectDisposedException) + { + return false; + } + } + + /// + public override bool Shutdown(int timeoutMilliseconds) + { + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + if (timeoutMilliseconds == Timeout.Infinite) + { + this.exporterThread.Join(); + return true; + } + + if (timeoutMilliseconds == 0) + { + return true; + } + + return this.exporterThread.Join(timeoutMilliseconds); + } + + /// + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.exportTrigger.Dispose(); + this.shutdownTrigger.Dispose(); + } + + this.disposed = true; + } + + base.Dispose(disposing); + } + + private void ExporterProc() + { + int index; + int timeout; + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + var sw = Stopwatch.StartNew(); + + while (true) + { + timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); + + try + { + index = WaitHandle.WaitAny(triggers, timeout); + } + catch (ObjectDisposedException) + { + return; + } + + switch (index) + { + case 0: // export + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); + break; + case 1: // shutdown + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); + return; + case WaitHandle.WaitTimeout: // timer + OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); + this.MetricReader.Collect(this.ExportTimeoutMilliseconds); + break; + } + } + } +} diff --git a/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs new file mode 100644 index 00000000000..7af5538e5c1 --- /dev/null +++ b/src/OpenTelemetry/Internal/PeriodicExportingMetricReaderWorker.cs @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Internal; + +/// +/// Abstract base class for periodic exporting metric reader workers that handle the threading and synchronization logic. +/// +internal abstract class PeriodicExportingMetricReaderWorker : IDisposable +{ + /// + /// Initializes a new instance of the class. + /// + /// The metric reader instance. + /// The interval in milliseconds between two consecutive exports. + /// How long the export can run before it is cancelled. + protected PeriodicExportingMetricReaderWorker( + BaseExportingMetricReader metricReader, + int exportIntervalMilliseconds, + int exportTimeoutMilliseconds) + { + this.MetricReader = metricReader; + this.ExportIntervalMilliseconds = exportIntervalMilliseconds; + this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; + } + + ~PeriodicExportingMetricReaderWorker() + { + // Finalizer to ensure resources are cleaned up if Dispose is not called + this.Dispose(false); + } + + /// + /// Gets the metric reader instance. + /// + protected BaseExportingMetricReader MetricReader { get; } + + /// + /// Gets he interval in milliseconds between two consecutive exports. + /// + protected int ExportIntervalMilliseconds { get; } + + /// + /// Gets how long the export can run before it is cancelled. + /// + protected int ExportTimeoutMilliseconds { get; } + + /// + /// Starts the worker. + /// + public abstract void Start(); + + /// + /// Triggers an export operation. + /// + /// if the shutdown completed within the timeout; otherwise, . + public abstract bool TriggerExport(); + + /// + /// Initiates shutdown and waits for completion. + /// + /// The timeout in milliseconds. + /// True if the shutdown completed within the timeout; otherwise, false. + public abstract bool Shutdown(int timeoutMilliseconds); + + /// + /// Disposes of the worker and its resources. + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by this class and optionally releases the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + } +} diff --git a/src/OpenTelemetry/Internal/ThreadingHelper.cs b/src/OpenTelemetry/Internal/ThreadingHelper.cs new file mode 100644 index 00000000000..422bbd1ab34 --- /dev/null +++ b/src/OpenTelemetry/Internal/ThreadingHelper.cs @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Internal; + +internal class ThreadingHelper +{ + internal static bool IsThreadingDisabled() + { + // if the threadpool isn't using threads assume they aren't enabled + ThreadPool.GetMaxThreads(out int workerThreads, out int completionPortThreads); + + return workerThreads == 1 && completionPortThreads == 1; + } +} diff --git a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs index c1e341585a8..9dceeefe119 100644 --- a/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/Processor/BatchLogRecordExportProcessor.cs @@ -25,12 +25,28 @@ public BatchLogRecordExportProcessor( int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) - : base( + : this( exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize) + new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Log record exporter. + /// Configuration options for the batch export processor. + public BatchLogRecordExportProcessor( + BaseExporter exporter, + BatchExportProcessorOptions options) + : base(exporter, options) { } diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs index 0596e18af18..74109eee7f0 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReader.cs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Diagnostics; using OpenTelemetry.Internal; namespace OpenTelemetry.Metrics; @@ -18,9 +17,8 @@ public class PeriodicExportingMetricReader : BaseExportingMetricReader internal readonly int ExportIntervalMilliseconds; internal readonly int ExportTimeoutMilliseconds; - private readonly Thread exporterThread; - private readonly AutoResetEvent exportTrigger = new(false); - private readonly ManualResetEvent shutdownTrigger = new(false); + private readonly PeriodicExportingMetricReaderWorker worker; + private readonly bool useThreads; private bool disposed; /// @@ -33,8 +31,30 @@ public PeriodicExportingMetricReader( BaseExporter exporter, int exportIntervalMilliseconds = DefaultExportIntervalMilliseconds, int exportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) + : this(exporter, new PeriodicExportingMetricReaderOptions + { + ExportIntervalMilliseconds = exportIntervalMilliseconds, + ExportTimeoutMilliseconds = exportTimeoutMilliseconds, + UseThreads = true, + }) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Exporter instance to export Metrics to. + /// Configuration options for the periodic exporting metric reader. + public PeriodicExportingMetricReader( + BaseExporter exporter, + PeriodicExportingMetricReaderOptions options) : base(exporter) { + Guard.ThrowIfNull(options); + + var exportIntervalMilliseconds = options?.ExportIntervalMilliseconds ?? DefaultExportIntervalMilliseconds; + var exportTimeoutMilliseconds = options?.ExportTimeoutMilliseconds ?? DefaultExportTimeoutMilliseconds; + Guard.ThrowIfInvalidTimeout(exportIntervalMilliseconds); Guard.ThrowIfZero(exportIntervalMilliseconds); Guard.ThrowIfInvalidTimeout(exportTimeoutMilliseconds); @@ -46,45 +66,28 @@ public PeriodicExportingMetricReader( this.ExportIntervalMilliseconds = exportIntervalMilliseconds; this.ExportTimeoutMilliseconds = exportTimeoutMilliseconds; + this.useThreads = options?.UseThreads ?? false; - this.exporterThread = new Thread(this.ExporterProc) - { - IsBackground = true, -#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1 - Name = $"OpenTelemetry-{nameof(PeriodicExportingMetricReader)}-{exporter.GetType().Name}", -#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1 - }; - this.exporterThread.Start(); + this.worker = this.CreateWorker(); + this.worker.Start(); } /// protected override bool OnShutdown(int timeoutMilliseconds) { - var result = true; - - try - { - this.shutdownTrigger.Set(); - } - catch (ObjectDisposedException) - { - return false; - } + var result = this.worker.Shutdown(timeoutMilliseconds); if (timeoutMilliseconds == Timeout.Infinite) { - this.exporterThread.Join(); - result = this.exporter.Shutdown() && result; + return this.exporter.Shutdown() && result; } - else + + if (timeoutMilliseconds == 0) { - var sw = Stopwatch.StartNew(); - result = this.exporterThread.Join(timeoutMilliseconds) && result; - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - result = this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result; + return this.exporter.Shutdown(0) && result; } - return result; + return this.exporter.Shutdown(timeoutMilliseconds) && result; } /// @@ -94,8 +97,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - this.exportTrigger.Dispose(); - this.shutdownTrigger.Dispose(); + this.worker?.Dispose(); } this.disposed = true; @@ -104,41 +106,26 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private void ExporterProc() - { - int index; - int timeout; - var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; - var sw = Stopwatch.StartNew(); +#pragma warning disable CA1859 // Change return type of method 'CreateWorker' from 'PeriodicExportingMetricReaderWorker' to 'PeriodicExportingMetricReaderThreadWorker' for improved performance - while (true) + private PeriodicExportingMetricReaderWorker CreateWorker() +#pragma warning restore CA1859 + { +#if NET + // Use task-based worker for browser platform where threading may be limited + if (ThreadingHelper.IsThreadingDisabled() || !this.useThreads) { - timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds)); - - try - { - index = WaitHandle.WaitAny(triggers, timeout); - } - catch (ObjectDisposedException) - { - return; - } - - switch (index) - { - case 0: // export - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered."); - this.Collect(this.ExportTimeoutMilliseconds); - break; - case 1: // shutdown - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered."); - this.Collect(this.ExportTimeoutMilliseconds); // TODO: do we want to use the shutdown timeout here? - return; - case WaitHandle.WaitTimeout: // timer - OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed."); - this.Collect(this.ExportTimeoutMilliseconds); - break; - } + return new PeriodicExportingMetricReaderTaskWorker( + this, + this.ExportIntervalMilliseconds, + this.ExportTimeoutMilliseconds); } +#endif + + // Use thread-based worker for all other platforms + return new PeriodicExportingMetricReaderThreadWorker( + this, + this.ExportIntervalMilliseconds, + this.ExportTimeoutMilliseconds); } } diff --git a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs index b10967f2dc0..300c33c23a8 100644 --- a/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs +++ b/src/OpenTelemetry/Metrics/Reader/PeriodicExportingMetricReaderOptions.cs @@ -52,4 +52,9 @@ internal PeriodicExportingMetricReaderOptions(IConfiguration configuration) /// associated with the metric reader. /// public int? ExportTimeoutMilliseconds { get; set; } + + /// + /// Gets or sets a value indicating whether to use threads. Enables the use of when ; otherwise is used. The default value is . + /// + public bool UseThreads { get; set; } = true; } diff --git a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs index 73e7968c78f..06f1be1489f 100644 --- a/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs +++ b/src/OpenTelemetry/Trace/Processor/BatchActivityExportProcessor.cs @@ -14,23 +14,39 @@ public class BatchActivityExportProcessor : BatchExportProcessor /// /// Initializes a new instance of the class. /// - /// - /// - /// - /// - /// + /// + /// + /// + /// + /// public BatchActivityExportProcessor( BaseExporter exporter, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) - : base( + : this( exporter, - maxQueueSize, - scheduledDelayMilliseconds, - exporterTimeoutMilliseconds, - maxExportBatchSize) + new BatchExportProcessorOptions + { + MaxQueueSize = maxQueueSize, + ScheduledDelayMilliseconds = scheduledDelayMilliseconds, + ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds, + MaxExportBatchSize = maxExportBatchSize, + UseThreads = true, + }) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Activity exporter. + /// Configuration options for the batch export processor. + public BatchActivityExportProcessor( + BaseExporter exporter, + BatchExportProcessorOptions options) + : base(exporter, options) { } diff --git a/src/Shared/PeriodicExportingMetricReaderHelper.cs b/src/Shared/PeriodicExportingMetricReaderHelper.cs index e7dc244a94c..e3e0fcbb65e 100644 --- a/src/Shared/PeriodicExportingMetricReaderHelper.cs +++ b/src/Shared/PeriodicExportingMetricReaderHelper.cs @@ -12,15 +12,17 @@ internal static PeriodicExportingMetricReader CreatePeriodicExportingMetricReade BaseExporter exporter, MetricReaderOptions options, int defaultExportIntervalMilliseconds = DefaultExportIntervalMilliseconds, - int defaultExportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds) + int defaultExportTimeoutMilliseconds = DefaultExportTimeoutMilliseconds, + bool useThreads = true) { - var exportInterval = - options.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds ?? defaultExportIntervalMilliseconds; - - var exportTimeout = - options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds; + var periodicOptions = new PeriodicExportingMetricReaderOptions + { + ExportIntervalMilliseconds = options.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds ?? defaultExportIntervalMilliseconds, + ExportTimeoutMilliseconds = options.PeriodicExportingMetricReaderOptions.ExportTimeoutMilliseconds ?? defaultExportTimeoutMilliseconds, + UseThreads = useThreads, + }; - var metricReader = new PeriodicExportingMetricReader(exporter, exportInterval, exportTimeout) + var metricReader = new PeriodicExportingMetricReader(exporter, periodicOptions) { TemporalityPreference = options.TemporalityPreference, }; diff --git a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs index 236cbe1e727..85538c020bd 100644 --- a/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs +++ b/test/OpenTelemetry.Tests/Internal/PeriodicExportingMetricReaderHelperTests.cs @@ -33,6 +33,16 @@ public void CreatePeriodicExportingMetricReader_Defaults() Assert.Equal(MetricReaderTemporalityPreference.Cumulative, reader.TemporalityPreference); } + [Fact] + public void CreatePeriodicExportingMetricReader_Defaults_WithTask() + { + using var reader = CreatePeriodicExportingMetricReader(useThreads: false); + + Assert.Equal(60000, reader.ExportIntervalMilliseconds); + Assert.Equal(30000, reader.ExportTimeoutMilliseconds); + Assert.Equal(MetricReaderTemporalityPreference.Cumulative, reader.TemporalityPreference); + } + [Fact] public void CreatePeriodicExportingMetricReader_TemporalityPreference_FromOptions() { @@ -130,13 +140,13 @@ private static void ClearEnvVars() } private static PeriodicExportingMetricReader CreatePeriodicExportingMetricReader( - MetricReaderOptions? options = null) + MetricReaderOptions? options = null, bool useThreads = true) { options ??= new(); #pragma warning disable CA2000 // Dispose objects before losing scope var dummyMetricExporter = new InMemoryExporter(Array.Empty()); #pragma warning restore CA2000 // Dispose objects before losing scope - return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options); + return PeriodicExportingMetricReaderHelper.CreatePeriodicExportingMetricReader(dummyMetricExporter, options, useThreads: useThreads); } } diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index cd76a4c8880..0efd8bc0dda 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -10,8 +10,10 @@ namespace OpenTelemetry.Logs.Tests; public sealed class BatchLogRecordExportProcessorTests { - [Fact] - public void StateValuesAndScopeBufferingTest() + [Theory] + [InlineData(true)] + [InlineData(false)] + public void StateValuesAndScopeBufferingTest(bool useThread) { var scopeProvider = new LoggerExternalScopeProvider(); @@ -21,7 +23,15 @@ public void StateValuesAndScopeBufferingTest() #pragma warning disable CA2000 // Dispose objects before losing scope new InMemoryExporter(exportedItems), #pragma warning restore CA2000 // Dispose objects before losing scope - scheduledDelayMilliseconds: int.MaxValue); + new() + { + // Use the default values for the other parameters, but allow overriding useThreads + MaxQueueSize = BatchLogRecordExportProcessor.DefaultMaxQueueSize, + MaxExportBatchSize = BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + ExporterTimeoutMilliseconds = BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, + ScheduledDelayMilliseconds = int.MaxValue, + UseThreads = useThread, + }); using var scope = scopeProvider.Push(exportedItems); @@ -148,5 +158,48 @@ public void LogRecordAddedToBatchIfNotFromAnyPoolTest() Assert.Single(exportedItems); Assert.Same(logRecord, exportedItems[0]); } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void DisposeWithoutShutdown(bool useThread) + { + var scopeProvider = new LoggerExternalScopeProvider(); + + List exportedItems = new(); + + var processor = new BatchLogRecordExportProcessor( +#pragma warning disable CA2000 // Dispose objects before losing scope + new InMemoryExporter(exportedItems), +#pragma warning restore CA2000 // Dispose objects before losing scope + new() + { + // Use the default values for the other parameters, but allow overriding useThreads + MaxQueueSize = BatchLogRecordExportProcessor.DefaultMaxQueueSize, + MaxExportBatchSize = BatchLogRecordExportProcessor.DefaultMaxExportBatchSize, + ExporterTimeoutMilliseconds = BatchLogRecordExportProcessor.DefaultExporterTimeoutMilliseconds, + ScheduledDelayMilliseconds = int.MaxValue, + UseThreads = useThread, + }); + + processor.Dispose(); + + using var scope = scopeProvider.Push(exportedItems); + + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); + + var state = new LogRecordTests.DisposingState("Hello world"); + + logRecord.ILoggerData.ScopeProvider = scopeProvider; + logRecord.StateValues = state; + + processor.OnEnd(logRecord); + + state.Dispose(); + + Assert.Empty(exportedItems); + } } #endif diff --git a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs index 3ffcd82d6d9..a3c9a88260f 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs @@ -10,9 +10,11 @@ namespace OpenTelemetry.Metrics.Tests; public class MetricPointReclaimTests { [Theory] - [InlineData(false)] - [InlineData(true)] - public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions) + [InlineData(false, false)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions, bool useThreads) { using var meter = new Meter(Utils.GetCurrentMethodName()); var counter = meter.CreateCounter("MyFruitCounter"); @@ -21,7 +23,13 @@ public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions) int maxNumberofDistinctMetricPoints = 4000; // Default max MetricPoints * 2 using var exporter = new CustomExporter(assertNoDroppedMeasurements: true); - using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10) + using var metricReader = new PeriodicExportingMetricReader( + exporter, + new() + { + ExportIntervalMilliseconds = 10, + UseThreads = useThreads, + }) { TemporalityPreference = MetricReaderTemporalityPreference.Delta, };