Skip to content

Commit 9168db9

Browse files
progress
1 parent 2db60a2 commit 9168db9

File tree

5 files changed

+202
-15
lines changed

5 files changed

+202
-15
lines changed

dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterService.cs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class MetricsReporterService : IAsyncDisposable, IMetricsReporterService
1919
private ITimer? _reportingTimer;
2020
private bool _isDisposed;
2121
private readonly SemaphoreSlim _reportingSemaphore = new(1, 1);
22+
private readonly SemaphoreSlim _timerSemaphore = new(1, 1);
2223

2324
public MetricsReporterService(
2425
IAkriObservabilityService observabilityService,
@@ -36,23 +37,42 @@ public MetricsReporterService(
3637

3738
public void Start(CancellationToken cancellationToken = default)
3839
{
39-
_reportingTimer = _timerFactory.CreateTimer();
40+
_timerSemaphore.Wait(cancellationToken);
4041

41-
_reportingTimer.Start(async _ => await ReportMetricsAsync(cancellationToken),
42-
cancellationToken,
43-
_options.ReportingInterval,
44-
_options.ReportingInterval);
42+
try
43+
{
44+
if (_reportingTimer == null)
45+
{
46+
_reportingTimer = _timerFactory.CreateTimer();
47+
48+
_reportingTimer.Start(async _ => await ReportMetricsAsync(cancellationToken),
49+
cancellationToken,
50+
_options.ReportingInterval,
51+
_options.ReportingInterval);
52+
}
53+
}
54+
finally
55+
{
56+
_timerSemaphore.Release();
57+
}
4558
}
4659

4760
public async Task StopAsync()
4861
{
49-
if (_reportingTimer == null)
62+
await _timerSemaphore.WaitAsync();
63+
64+
try
5065
{
51-
return;
66+
if (_reportingTimer != null)
67+
{
68+
await _reportingTimer.DisposeAsync();
69+
_reportingTimer = null;
70+
}
71+
}
72+
finally
73+
{
74+
_timerSemaphore.Release();
5275
}
53-
54-
await _reportingTimer.DisposeAsync();
55-
_reportingTimer = null;
5676
}
5777

5878
public ICounter CreateCounter(string name, Dictionary<string, string> labels, string? unit = null)
@@ -135,12 +155,9 @@ public async ValueTask DisposeAsync()
135155
return;
136156
}
137157

138-
if (_reportingTimer != null)
139-
{
140-
await _reportingTimer.DisposeAsync();
141-
_reportingTimer = null;
142-
}
158+
await StopAsync();
143159

160+
_timerSemaphore.Dispose();
144161
_reportingSemaphore.Dispose();
145162
_isDisposed = true;
146163
}

dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedCounterTests.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,32 @@ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
124124
// Assert
125125
Assert.Empty(operations);
126126
}
127+
128+
[Fact]
129+
public async Task Concurrent_Operations_ThreadSafe()
130+
{
131+
// Arrange
132+
var counter = new CachedCounter(_name, _labels, _unit);
133+
var operationCount = 1000;
134+
var threads = 5;
135+
136+
// Act
137+
var tasks = new List<Task>();
138+
for (int t = 0; t < threads; t++)
139+
{
140+
tasks.Add(Task.Run(() =>
141+
{
142+
for (int i = 0; i < operationCount / threads; i++)
143+
{
144+
counter.Add(i);
145+
}
146+
}));
147+
}
148+
149+
await Task.WhenAll(tasks.ToArray());
150+
151+
// Assert
152+
var operations = counter.GetOperationsAndClear(operationCount);
153+
Assert.Equal(operationCount, operations.Count);
154+
}
127155
}

dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedGaugeTests.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,32 @@ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
124124
// Assert
125125
Assert.Empty(operations);
126126
}
127+
128+
[Fact]
129+
public async Task Concurrent_Operations_ThreadSafe()
130+
{
131+
// Arrange
132+
var gauge = new CachedGauge(_name, _labels, _unit);
133+
var operationCount = 1000;
134+
var threads = 5;
135+
136+
// Act
137+
var tasks = new List<Task>();
138+
for (int t = 0; t < threads; t++)
139+
{
140+
tasks.Add(Task.Run(() =>
141+
{
142+
for (int i = 0; i < operationCount / threads; i++)
143+
{
144+
gauge.Record(i);
145+
}
146+
}));
147+
}
148+
149+
await Task.WhenAll(tasks.ToArray());
150+
151+
// Assert
152+
var operations = gauge.GetOperationsAndClear(operationCount);
153+
Assert.Equal(operationCount, operations.Count);
154+
}
127155
}

dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedHistogramTests.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,32 @@ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
124124
// Assert
125125
Assert.Empty(operations);
126126
}
127+
128+
[Fact]
129+
public async Task Concurrent_Operations_ThreadSafe()
130+
{
131+
// Arrange
132+
var histogram = new CachedHistogram(_name, _labels, _unit);
133+
var operationCount = 1000;
134+
var threads = 5;
135+
136+
// Act
137+
var tasks = new List<Task>();
138+
for (int t = 0; t < threads; t++)
139+
{
140+
tasks.Add(Task.Run(() =>
141+
{
142+
for (int i = 0; i < operationCount / threads; i++)
143+
{
144+
histogram.Record(i);
145+
}
146+
}));
147+
}
148+
149+
await Task.WhenAll(tasks.ToArray());
150+
151+
// Assert
152+
var operations = histogram.GetOperationsAndClear(operationCount);
153+
Assert.Equal(operationCount, operations.Count);
154+
}
127155
}

dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/MetricsReporterServiceTests.cs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,36 @@ public void Start_CreatesAndStartsTimer()
9898
_options.ReportingInterval, _options.ReportingInterval), Times.Once);
9999
}
100100

101+
[Fact]
102+
public void Start_CalledMultipleTimes_OnlyOneTimerCreated()
103+
{
104+
// Arrange
105+
var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
106+
107+
// Act
108+
service.Start();
109+
service.Start();
110+
111+
// Assert
112+
_mockTimerFactory.Verify(f => f.CreateTimer(), Times.Once);
113+
_mockTimer.Verify(t => t.Start(
114+
It.IsAny<Func<CancellationToken, Task>>(),
115+
It.IsAny<CancellationToken>(),
116+
_options.ReportingInterval,
117+
_options.ReportingInterval),
118+
Times.Once);
119+
}
120+
121+
[Fact]
122+
public async Task StopAsync_CalledBeforeStart_DoesNotThrow()
123+
{
124+
// Arrange
125+
var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
126+
127+
// Act & Assert
128+
await service.StopAsync();
129+
}
130+
101131
[Fact]
102132
public async Task Stop_DisposesTimer()
103133
{
@@ -163,6 +193,62 @@ public async Task ReportMetrics_SendsMetricsToObservabilityService()
163193
Times.Once);
164194
}
165195

196+
[Fact]
197+
public async Task ReportMetrics_ObservabilityServiceThrows_DoesNotPropagateException()
198+
{
199+
// Arrange
200+
_mockObservabilityService
201+
.Setup(s => s.PublishMetricsAsync(It.IsAny<PublishMetricsRequestPayload>()))
202+
.Throws(new InvalidOperationException("Test exception"));
203+
204+
var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
205+
206+
// Create and update a metric
207+
var counter = service.CreateCounter("test_counter", _defaultLabels);
208+
counter.Add(5);
209+
210+
service.Start();
211+
212+
// Act & Assert - Should not throw
213+
if (_timerCallback != null)
214+
{
215+
await _timerCallback(CancellationToken.None);
216+
}
217+
218+
// Assert - Verify that throwing PublishMetricsAsync was called
219+
_mockObservabilityService.Verify(s => s.PublishMetricsAsync(It.IsAny<PublishMetricsRequestPayload>()), Times.Once);
220+
}
221+
222+
[Fact]
223+
public void ValidateMetricParameters_WithInvalidData_ThrowsExpectedException()
224+
{
225+
// Arrange
226+
var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
227+
228+
// Act & Assert - Test with null name
229+
Assert.Throws<ArgumentException>(() => service.CreateCounter(string.Empty, _defaultLabels));
230+
231+
// Act & Assert - Test with null labels
232+
Assert.Throws<ArgumentNullException>(() => service.CreateCounter("test_counter", null!));
233+
}
234+
235+
[Fact]
236+
public void CreateMetric_WithIdenticalKeyButDifferentUnit_UsesFirstUnit()
237+
{
238+
// Arrange
239+
var mockCache = new CounterMetricCache();
240+
var name = "test_counter";
241+
var labels = new Dictionary<string, string> { { "service", "test" } };
242+
243+
// Act
244+
var counter1 = mockCache.CreateCounter(name, labels, "bytes");
245+
var counter2 = mockCache.CreateCounter(name, labels, "kilobytes"); // Same name and labels, different unit
246+
247+
// Assert
248+
Assert.Same(counter1, counter2); // Should return the same instance
249+
Assert.Equal("bytes", counter1.Unit); // Should use the first unit
250+
}
251+
166252
[Fact]
167253
public async Task Dispose_CleanupResources()
168254
{

0 commit comments

Comments
 (0)