Skip to content

Commit 5259b1b

Browse files
committed
chore: Adjust after review
- Adjusts null propagation, naming, and comments. - Adjusts BatchExportTaskWorker and PeriodicExportingMetricReaderTaskWorker to start the Task with a known cancellation token
1 parent 24bec97 commit 5259b1b

11 files changed

+110
-80
lines changed

src/OpenTelemetry/BatchExportProcessor.cs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
using System.Diagnostics;
54
using System.Runtime.CompilerServices;
65
using OpenTelemetry.Internal;
76

@@ -20,7 +19,6 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
2019
internal const int DefaultMaxExportBatchSize = 512;
2120

2221
internal readonly int MaxExportBatchSize;
23-
2422
internal readonly int ScheduledDelayMilliseconds;
2523
internal readonly int ExporterTimeoutMilliseconds;
2624

@@ -65,18 +63,19 @@ protected BatchExportProcessor(
6563
: base(exporter)
6664
{
6765
Guard.ThrowIfNull(options);
68-
#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1
69-
Guard.ThrowIfOutOfRange(options.MaxQueueSize, min: 1);
70-
#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1
71-
Guard.ThrowIfOutOfRange(options.MaxExportBatchSize, min: 1, max: options.MaxQueueSize, maxName: nameof(options.MaxQueueSize));
72-
Guard.ThrowIfOutOfRange(options.ScheduledDelayMilliseconds, min: 1);
73-
Guard.ThrowIfOutOfRange(options.ExporterTimeoutMilliseconds, min: 0);
74-
75-
this.circularBuffer = new CircularBuffer<T>(options.MaxQueueSize);
76-
this.ScheduledDelayMilliseconds = options.ScheduledDelayMilliseconds;
77-
this.ExporterTimeoutMilliseconds = options.ExporterTimeoutMilliseconds;
78-
this.MaxExportBatchSize = options.MaxExportBatchSize;
79-
this.useThreads = options.UseThreads;
66+
67+
var maxQueueSize = options?.MaxQueueSize ?? 0;
68+
Guard.ThrowIfOutOfRange(maxQueueSize, min: 1);
69+
70+
this.circularBuffer = new CircularBuffer<T>(maxQueueSize);
71+
this.ScheduledDelayMilliseconds = options?.ScheduledDelayMilliseconds ?? 0;
72+
this.ExporterTimeoutMilliseconds = options?.ExporterTimeoutMilliseconds ?? -1;
73+
this.MaxExportBatchSize = options?.MaxExportBatchSize ?? 0;
74+
this.useThreads = options?.UseThreads ?? true;
75+
76+
Guard.ThrowIfOutOfRange(this.MaxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(options.MaxQueueSize));
77+
Guard.ThrowIfOutOfRange(this.ScheduledDelayMilliseconds, min: 1);
78+
Guard.ThrowIfOutOfRange(this.ExporterTimeoutMilliseconds, min: 0);
8079

8180
this.worker = this.CreateWorker();
8281
this.worker.Start();
@@ -145,9 +144,7 @@ protected override bool OnShutdown(int timeoutMilliseconds)
145144
return this.exporter.Shutdown(0) && result;
146145
}
147146

148-
var sw = Stopwatch.StartNew();
149-
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
150-
return this.exporter.Shutdown((int)Math.Max(timeout, 0)) && result;
147+
return this.exporter.Shutdown(timeoutMilliseconds) && result;
151148
}
152149

153150
/// <inheritdoc/>

src/OpenTelemetry/BatchExportProcessorOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class BatchExportProcessorOptions<T>
3131
public int MaxExportBatchSize { get; set; } = BatchExportProcessor<T>.DefaultMaxExportBatchSize;
3232

3333
/// <summary>
34-
/// Gets or sets a value indicating whether to use threads. Enables the use of <see cref="Thread" /> when true, <see cref="Task"/> when false. The default value is true.
34+
/// Gets or sets a value indicating whether to use threads. Enables the use of <see cref="Thread" /> when <see langword="true"/>; otherwise <see cref="Task"/> is used. The default value is <see langword="true"/>.
3535
/// </summary>
3636
public bool UseThreads { get; set; } = true;
3737
}

src/OpenTelemetry/Internal/BatchExportTaskWorker.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public override void Start()
4242
{
4343
this.workerTask = Task.Factory.StartNew(
4444
this.ExporterProcAsync,
45-
CancellationToken.None,
45+
this.cancellationTokenSource.Token,
4646
TaskCreationOptions.LongRunning,
4747
TaskScheduler.Default).Unwrap();
4848
}
@@ -74,8 +74,8 @@ public override bool TriggerExport()
7474
/// <inheritdoc/>
7575
public override bool WaitForExport(int timeoutMilliseconds)
7676
{
77-
var tail = this.circularBuffer.RemovedCount;
78-
var head = this.circularBuffer.AddedCount;
77+
var tail = this.CircularBuffer.RemovedCount;
78+
var head = this.CircularBuffer.AddedCount;
7979

8080
if (head == tail)
8181
{
@@ -98,7 +98,7 @@ public override bool WaitForExport(int timeoutMilliseconds)
9898
/// <inheritdoc/>
9999
public override bool Shutdown(int timeoutMilliseconds)
100100
{
101-
this.SetShutdownDrainTarget(this.circularBuffer.AddedCount);
101+
this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount);
102102

103103
try
104104
{
@@ -160,7 +160,7 @@ private async Task<bool> WaitForExportAsync(int timeoutMilliseconds, long target
160160
var remaining = timeoutMilliseconds - sw.ElapsedMilliseconds;
161161
if (remaining <= 0)
162162
{
163-
return this.circularBuffer.RemovedCount >= targetHead;
163+
return this.CircularBuffer.RemovedCount >= targetHead;
164164
}
165165

166166
timeout = Math.Min((int)remaining, pollingMilliseconds);
@@ -187,7 +187,7 @@ await Task.WhenAny(
187187
// Expected when timeout or shutdown occurs
188188
}
189189

190-
if (this.circularBuffer.RemovedCount >= targetHead)
190+
if (this.CircularBuffer.RemovedCount >= targetHead)
191191
{
192192
return true;
193193
}
@@ -208,13 +208,13 @@ private async Task ExporterProcAsync()
208208
while (true)
209209
{
210210
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
211-
if (this.circularBuffer.Count < this.maxExportBatchSize)
211+
if (this.CircularBuffer.Count < this.MaxExportBatchSize)
212212
{
213213
try
214214
{
215215
await Task.WhenAny(
216216
this.exportTrigger.WaitAsync(cancellationToken),
217-
Task.Delay(this.scheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false);
217+
Task.Delay(this.ScheduledDelayMilliseconds, cancellationToken)).ConfigureAwait(false);
218218
}
219219
catch (OperationCanceledException)
220220
{

src/OpenTelemetry/Internal/BatchExportThreadWorker.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public override bool TriggerExport()
6666
/// <inheritdoc/>
6767
public override bool WaitForExport(int timeoutMilliseconds)
6868
{
69-
var tail = this.circularBuffer.RemovedCount;
70-
var head = this.circularBuffer.AddedCount;
69+
var tail = this.CircularBuffer.RemovedCount;
70+
var head = this.CircularBuffer.AddedCount;
7171

7272
if (head == tail)
7373
{
@@ -113,7 +113,7 @@ public override bool WaitForExport(int timeoutMilliseconds)
113113

114114
if (timeout <= 0)
115115
{
116-
return this.circularBuffer.RemovedCount >= head;
116+
return this.CircularBuffer.RemovedCount >= head;
117117
}
118118

119119
try
@@ -126,7 +126,7 @@ public override bool WaitForExport(int timeoutMilliseconds)
126126
}
127127
}
128128

129-
if (this.circularBuffer.RemovedCount >= head)
129+
if (this.CircularBuffer.RemovedCount >= head)
130130
{
131131
return true;
132132
}
@@ -141,7 +141,7 @@ public override bool WaitForExport(int timeoutMilliseconds)
141141
/// <inheritdoc/>
142142
public override bool Shutdown(int timeoutMilliseconds)
143143
{
144-
this.SetShutdownDrainTarget(this.circularBuffer.AddedCount);
144+
this.SetShutdownDrainTarget(this.CircularBuffer.AddedCount);
145145

146146
try
147147
{
@@ -188,11 +188,11 @@ private void ExporterProc()
188188
while (true)
189189
{
190190
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
191-
if (this.circularBuffer.Count < this.maxExportBatchSize)
191+
if (this.CircularBuffer.Count < this.MaxExportBatchSize)
192192
{
193193
try
194194
{
195-
WaitHandle.WaitAny(triggers, this.scheduledDelayMilliseconds);
195+
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
196196
}
197197
catch (ObjectDisposedException)
198198
{

src/OpenTelemetry/Internal/BatchExportWorker.cs

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ namespace OpenTelemetry.Internal;
1010
internal abstract class BatchExportWorker<T> : IDisposable
1111
where T : class
1212
{
13-
protected readonly CircularBuffer<T> circularBuffer;
14-
protected readonly BaseExporter<T> exporter;
15-
protected readonly int maxExportBatchSize;
16-
protected readonly int scheduledDelayMilliseconds;
17-
protected readonly int exporterTimeoutMilliseconds;
18-
1913
private long shutdownDrainTarget = long.MaxValue;
2014
private long droppedCount;
2115

@@ -34,18 +28,49 @@ protected BatchExportWorker(
3428
int scheduledDelayMilliseconds,
3529
int exporterTimeoutMilliseconds)
3630
{
37-
this.circularBuffer = circularBuffer;
38-
this.exporter = exporter;
39-
this.maxExportBatchSize = maxExportBatchSize;
40-
this.scheduledDelayMilliseconds = scheduledDelayMilliseconds;
41-
this.exporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
31+
this.CircularBuffer = circularBuffer;
32+
this.Exporter = exporter;
33+
this.MaxExportBatchSize = maxExportBatchSize;
34+
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
35+
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
36+
}
37+
38+
~BatchExportWorker()
39+
{
40+
// Finalizer to ensure resources are cleaned up if Dispose is not called
41+
this.Dispose(false);
4242
}
4343

4444
/// <summary>
4545
/// Gets the number of telemetry objects dropped by the processor.
4646
/// </summary>
4747
internal long DroppedCount => Volatile.Read(ref this.droppedCount);
4848

49+
/// <summary>
50+
/// Gets the circular buffer for storing telemetry objects.
51+
/// </summary>
52+
protected CircularBuffer<T> CircularBuffer { get; }
53+
54+
/// <summary>
55+
/// Gets the exporter instance.
56+
/// </summary>
57+
protected BaseExporter<T> Exporter { get; }
58+
59+
/// <summary>
60+
/// Gets the maximum batch size for exports.
61+
/// </summary>
62+
protected int MaxExportBatchSize { get; }
63+
64+
/// <summary>
65+
/// Gets the delay between exports in milliseconds.
66+
/// </summary>
67+
protected int ScheduledDelayMilliseconds { get; }
68+
69+
/// <summary>
70+
/// Gets the timeout for export operations in milliseconds.
71+
/// </summary>
72+
protected int ExporterTimeoutMilliseconds { get; }
73+
4974
/// <summary>
5075
/// Gets the shutdown drain target.
5176
/// </summary>
@@ -59,7 +84,7 @@ protected BatchExportWorker(
5984
/// <summary>
6085
/// Triggers an export operation.
6186
/// </summary>
62-
/// <returns>True if the export was triggered successfully; otherwise, false.</returns>
87+
/// <returns><see langword="true"/> if the export was triggered successfully; otherwise, <see langword="false"/>.</returns>
6388
public abstract bool TriggerExport();
6489

6590
/// <summary>
@@ -105,11 +130,11 @@ protected void SetShutdownDrainTarget(long target)
105130
/// </summary>
106131
protected void PerformExport()
107132
{
108-
if (this.circularBuffer.Count > 0)
133+
if (this.CircularBuffer.Count > 0)
109134
{
110-
using (var batch = new Batch<T>(this.circularBuffer, this.maxExportBatchSize))
135+
using (var batch = new Batch<T>(this.CircularBuffer, this.MaxExportBatchSize))
111136
{
112-
this.exporter.Export(batch);
137+
this.Exporter.Export(batch);
113138
}
114139
}
115140
}
@@ -120,7 +145,7 @@ protected void PerformExport()
120145
/// <returns>True if shutdown should occur; otherwise, false.</returns>
121146
protected bool ShouldShutdown()
122147
{
123-
return this.circularBuffer.RemovedCount >= this.ShutdownDrainTarget;
148+
return this.CircularBuffer.RemovedCount >= this.ShutdownDrainTarget;
124149
}
125150

126151
/// <summary>

src/OpenTelemetry/Internal/PeriodicExportingMetricReaderTaskWorker.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public override void Start()
3636
{
3737
this.workerTask = Task.Factory.StartNew(
3838
this.ExporterProcAsync,
39-
CancellationToken.None,
39+
this.cancellationTokenSource.Token,
4040
TaskCreationOptions.LongRunning,
4141
TaskScheduler.Default).Unwrap();
4242
}
@@ -122,7 +122,7 @@ private async Task ExporterProcAsync()
122122
{
123123
while (!cancellationToken.IsCancellationRequested)
124124
{
125-
var timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds));
125+
var timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds));
126126

127127
var exportTriggerTask = this.exportTrigger.WaitAsync(cancellationToken);
128128
Task? triggeredTask = null;
@@ -141,7 +141,7 @@ private async Task ExporterProcAsync()
141141
if (cancellationToken.IsCancellationRequested)
142142
{
143143
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered.");
144-
this.metricReader.Collect(this.exportTimeoutMilliseconds);
144+
this.MetricReader.Collect(this.ExportTimeoutMilliseconds);
145145
break;
146146
}
147147

@@ -157,7 +157,7 @@ private async Task ExporterProcAsync()
157157
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed.");
158158
}
159159

160-
this.metricReader.Collect(this.exportTimeoutMilliseconds);
160+
this.MetricReader.Collect(this.ExportTimeoutMilliseconds);
161161
}
162162
}
163163
catch (Exception ex)

src/OpenTelemetry/Internal/PeriodicExportingMetricReaderThreadWorker.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private void ExporterProc()
109109

110110
while (true)
111111
{
112-
timeout = (int)(this.exportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.exportIntervalMilliseconds));
112+
timeout = (int)(this.ExportIntervalMilliseconds - (sw.ElapsedMilliseconds % this.ExportIntervalMilliseconds));
113113

114114
try
115115
{
@@ -124,15 +124,15 @@ private void ExporterProc()
124124
{
125125
case 0: // export
126126
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Export was triggered.");
127-
this.metricReader.Collect(this.exportTimeoutMilliseconds);
127+
this.MetricReader.Collect(this.ExportTimeoutMilliseconds);
128128
break;
129129
case 1: // shutdown
130130
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because Shutdown was triggered.");
131-
this.metricReader.Collect(this.exportTimeoutMilliseconds);
131+
this.MetricReader.Collect(this.ExportTimeoutMilliseconds);
132132
return;
133133
case WaitHandle.WaitTimeout: // timer
134134
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("PeriodicExportingMetricReader calling MetricReader.Collect because the export interval has elapsed.");
135-
this.metricReader.Collect(this.exportTimeoutMilliseconds);
135+
this.MetricReader.Collect(this.ExportTimeoutMilliseconds);
136136
break;
137137
}
138138
}

0 commit comments

Comments
 (0)