Skip to content

Commit 59ae884

Browse files
[infra/Prometheus] Fix flaky test and test deadlock (#6643)
1 parent 0da377a commit 59ae884

File tree

2 files changed

+111
-59
lines changed

2 files changed

+111
-59
lines changed

src/OpenTelemetry.Exporter.Prometheus.HttpListener/Internal/PrometheusCollectionManager.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public PrometheusCollectionManager(PrometheusExporter exporter)
3838
this.scopes = [];
3939
}
4040

41+
internal Func<DateTime> UtcNow { get; set; } = static () => DateTime.UtcNow;
42+
4143
#if NET
4244
public ValueTask<CollectionResponse> EnterCollect(bool openMetricsRequested)
4345
#else
@@ -58,7 +60,7 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
5860

5961
if (previousDataViewGeneratedAtUtc.HasValue
6062
&& this.scrapeResponseCacheDurationMilliseconds > 0
61-
&& previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
63+
&& previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= this.UtcNow())
6264
{
6365
#if NET
6466
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
@@ -103,13 +105,15 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
103105
var result = this.ExecuteCollect(openMetricsRequested);
104106
if (result)
105107
{
108+
var generatedAt = this.UtcNow();
109+
106110
if (openMetricsRequested)
107111
{
108-
this.previousOpenMetricsDataViewGeneratedAtUtc = DateTime.UtcNow;
112+
this.previousOpenMetricsDataViewGeneratedAtUtc = generatedAt;
109113
}
110114
else
111115
{
112-
this.previousPlainTextDataViewGeneratedAtUtc = DateTime.UtcNow;
116+
this.previousPlainTextDataViewGeneratedAtUtc = generatedAt;
113117
}
114118

115119
previousDataViewGeneratedAtUtc = openMetricsRequested

test/OpenTelemetry.Exporter.Prometheus.HttpListener.Tests/PrometheusCollectionManagerTests.cs

Lines changed: 104 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ public sealed class PrometheusCollectionManagerTests
1919
#endif
2020
public async Task EnterExitCollectTest(int scrapeResponseCacheDurationMilliseconds, bool openMetricsRequested)
2121
{
22+
var testTimeout = TimeSpan.FromMinutes(1);
23+
using var cts = new CancellationTokenSource(testTimeout);
24+
2225
bool cacheEnabled = scrapeResponseCacheDurationMilliseconds != 0;
2326
using var meter = new Meter(Utils.GetCurrentMethodName());
2427

@@ -44,66 +47,126 @@ public async Task EnterExitCollectTest(int scrapeResponseCacheDurationMillisecon
4447
{
4548
bool result = collectFunc!(timeout);
4649
runningCollectCount++;
50+
51+
cts.Token.ThrowIfCancellationRequested();
4752
Thread.Sleep(5000);
53+
4854
return result;
4955
};
5056

57+
var utcNow = DateTime.UtcNow;
58+
59+
if (cacheEnabled)
60+
{
61+
// Override the cache to ensure the cache is always seen again during its validity period.
62+
exporter.CollectionManager.UtcNow = () => utcNow;
63+
}
64+
5165
var counter = meter.CreateCounter<int>("counter_int", description: "Prometheus help text goes here \n escaping.");
5266
counter.Add(100);
5367

54-
Task<Response>[] collectTasks = new Task<Response>[10];
55-
for (int i = 0; i < collectTasks.Length; i++)
68+
async Task<Response> CollectAsync(bool advanceClock)
5669
{
57-
collectTasks[i] = Task.Run(async () =>
70+
cts.Token.ThrowIfCancellationRequested();
71+
72+
if (advanceClock)
5873
{
59-
var response = await exporter.CollectionManager.EnterCollect(openMetricsRequested);
60-
try
61-
{
62-
return new Response
63-
{
64-
CollectionResponse = response,
65-
ViewPayload = openMetricsRequested ? [.. response.OpenMetricsView] : [.. response.PlainTextView],
66-
};
67-
}
68-
finally
74+
// Tick the clock forward - it should still be well within the cache duration.
75+
utcNow = utcNow.AddMilliseconds(1);
76+
}
77+
78+
var response = await exporter.CollectionManager.EnterCollect(openMetricsRequested);
79+
try
80+
{
81+
return new()
6982
{
70-
exporter.CollectionManager.ExitCollect();
71-
}
72-
});
83+
CollectionResponse = response,
84+
ViewPayload = openMetricsRequested ? [.. response.OpenMetricsView] : [.. response.PlainTextView],
85+
};
86+
}
87+
finally
88+
{
89+
exporter.CollectionManager.ExitCollect();
90+
}
7391
}
7492

75-
await Task.WhenAll(collectTasks);
93+
async Task<Task<Response>[]> CollectInParallelAsync(bool advanceClock)
94+
{
95+
// Avoid deadlocks by limiting parallelism to a reasonable level based on CPU count.
96+
// Always use at least 2 to ensure concurrency happens. Running on a single core machine is unlikely.
97+
var parallelism = Math.Max((Environment.ProcessorCount + 1) / 2, 2);
98+
99+
#if NET
100+
var bag = new System.Collections.Concurrent.ConcurrentBag<Response>();
101+
102+
var parallel = Parallel.ForAsync(
103+
0,
104+
parallelism,
105+
cts.Token,
106+
async (_, _) => bag.Add(await CollectAsync(advanceClock)));
107+
108+
await Task.WhenAny(parallel, Task.Delay(testTimeout, cts.Token));
109+
110+
cts.Token.ThrowIfCancellationRequested();
111+
112+
await parallel;
113+
114+
return [.. bag.Select((r) => Task.FromResult(r))];
115+
#else
116+
117+
Task<Response>[] tasks = new Task<Response>[parallelism];
118+
119+
for (int i = 0; i < tasks.Length; i++)
120+
{
121+
tasks[i] = Task.Run(() => CollectAsync(advanceClock), cts.Token);
122+
}
123+
124+
var all = Task.WhenAll(tasks);
125+
await Task.WhenAny(all, Task.Delay(testTimeout, cts.Token));
126+
127+
cts.Token.ThrowIfCancellationRequested();
128+
129+
await all;
130+
131+
return tasks;
132+
#endif
133+
}
134+
135+
var collectTasks = await CollectInParallelAsync(advanceClock: true);
76136

77137
Assert.Equal(1, runningCollectCount);
78138

79139
var firstResponse = await collectTasks[0];
80140

81-
Assert.False(firstResponse.CollectionResponse.FromCache);
141+
Assert.False(firstResponse.CollectionResponse.FromCache, "Response was served from the cache.");
82142

83143
for (int i = 1; i < collectTasks.Length; i++)
84144
{
85-
Assert.Equal(firstResponse.ViewPayload, (await collectTasks[i]).ViewPayload);
86-
Assert.Equal(firstResponse.CollectionResponse.GeneratedAtUtc, (await collectTasks[i]).CollectionResponse.GeneratedAtUtc);
145+
var response = await collectTasks[i];
146+
147+
Assert.Equal(firstResponse.ViewPayload, response.ViewPayload);
148+
Assert.Equal(firstResponse.CollectionResponse.GeneratedAtUtc, response.CollectionResponse.GeneratedAtUtc);
87149
}
88150

89151
counter.Add(100);
90152

91-
// This should use the cache and ignore the second counter update.
92-
var task = exporter.CollectionManager.EnterCollect(openMetricsRequested);
93-
Assert.True(task.IsCompleted);
94-
var response = await task;
95153
try
96154
{
155+
// This should use the cache and ignore the second counter update.
156+
var task = exporter.CollectionManager.EnterCollect(openMetricsRequested);
157+
Assert.True(task.IsCompleted, "Collection did not complete.");
158+
var response = await task;
159+
97160
if (cacheEnabled)
98161
{
99162
Assert.Equal(1, runningCollectCount);
100-
Assert.True(response.FromCache);
163+
Assert.True(response.FromCache, "Response was not served from the cache.");
101164
Assert.Equal(firstResponse.CollectionResponse.GeneratedAtUtc, response.GeneratedAtUtc);
102165
}
103166
else
104167
{
105168
Assert.Equal(2, runningCollectCount);
106-
Assert.False(response.FromCache);
169+
Assert.False(response.FromCache, "Response was served from the cache.");
107170
Assert.True(firstResponse.CollectionResponse.GeneratedAtUtc < response.GeneratedAtUtc);
108171
}
109172
}
@@ -112,47 +175,32 @@ public async Task EnterExitCollectTest(int scrapeResponseCacheDurationMillisecon
112175
exporter.CollectionManager.ExitCollect();
113176
}
114177

115-
#pragma warning disable CA1849 // 'Thread.Sleep(int)' synchronously blocks. Use await instead.
116-
// Changing to await Task.Delay leads to test instability.
117-
Thread.Sleep(exporter.ScrapeResponseCacheDurationMilliseconds);
118-
#pragma warning restore CA1849 // 'Thread.Sleep(int)' synchronously blocks. Use await instead.
119-
120-
counter.Add(100);
121-
122-
for (int i = 0; i < collectTasks.Length; i++)
178+
if (cacheEnabled)
123179
{
124-
collectTasks[i] = Task.Run(async () =>
125-
{
126-
var collectionResponse = await exporter.CollectionManager.EnterCollect(openMetricsRequested);
127-
try
128-
{
129-
return new Response
130-
{
131-
CollectionResponse = collectionResponse,
132-
ViewPayload = openMetricsRequested ? [.. collectionResponse.OpenMetricsView] : [.. collectionResponse.PlainTextView],
133-
};
134-
}
135-
finally
136-
{
137-
exporter.CollectionManager.ExitCollect();
138-
}
139-
});
180+
// Progress time beyond the cache duration to force cache expiry.
181+
utcNow = utcNow.AddMilliseconds(exporter.ScrapeResponseCacheDurationMilliseconds + 1);
140182
}
141183

142-
await Task.WhenAll(collectTasks);
184+
counter.Add(100);
185+
186+
collectTasks = await CollectInParallelAsync(advanceClock: false);
143187

144188
Assert.Equal(cacheEnabled ? 2 : 3, runningCollectCount);
145-
Assert.NotEqual(firstResponse.ViewPayload, (await collectTasks[0]).ViewPayload);
146-
Assert.NotEqual(firstResponse.CollectionResponse.GeneratedAtUtc, (await collectTasks[0]).CollectionResponse.GeneratedAtUtc);
147189

190+
var original = firstResponse;
148191
firstResponse = await collectTasks[0];
149192

150-
Assert.False(firstResponse.CollectionResponse.FromCache);
193+
Assert.NotEqual(original.ViewPayload, firstResponse.ViewPayload);
194+
Assert.NotEqual(original.CollectionResponse.GeneratedAtUtc, firstResponse.CollectionResponse.GeneratedAtUtc);
195+
196+
Assert.False(firstResponse.CollectionResponse.FromCache, "Response was served from the cache.");
151197

152198
for (int i = 1; i < collectTasks.Length; i++)
153199
{
154-
Assert.Equal(firstResponse.ViewPayload, (await collectTasks[i]).ViewPayload);
155-
Assert.Equal(firstResponse.CollectionResponse.GeneratedAtUtc, (await collectTasks[i]).CollectionResponse.GeneratedAtUtc);
200+
var response = await collectTasks[i];
201+
202+
Assert.Equal(firstResponse.ViewPayload, response.ViewPayload);
203+
Assert.Equal(firstResponse.CollectionResponse.GeneratedAtUtc, response.CollectionResponse.GeneratedAtUtc);
156204
}
157205
}
158206
}

0 commit comments

Comments
 (0)