Skip to content

Commit cc85720

Browse files
authored
Merge pull request #133 from PrometheusClientNet/fix-race-condition-in-summary
Fix race condition in summary
2 parents fa525e9 + 8593fa1 commit cc85720

File tree

10 files changed

+323
-7
lines changed

10 files changed

+323
-7
lines changed

src/Prometheus.Client/SummaryImpl/QuantileStream.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,20 @@ public QuantileStream(int bufferSize, TimeSpan streamDuration, int ageBuckets, I
4747

4848
public void Append(double value)
4949
{
50-
if (ShouldFlushBuffer())
51-
FlushBuffer();
52-
53-
lock (_bufferLock)
50+
while (true)
5451
{
55-
_buffer[_bufferPosition++] = value;
52+
if(ShouldFlushBuffer())
53+
FlushBuffer();
54+
55+
lock (_bufferLock)
56+
{
57+
// use this trick to make FlushBuffer outside of the lock
58+
if (ShouldFlushBuffer())
59+
continue;
60+
61+
_buffer[_bufferPosition++] = value;
62+
return;
63+
}
5664
}
5765
}
5866

@@ -62,10 +70,18 @@ public void Reset()
6270
{
6371
_bufferPosition = 0;
6472
_headStreamIndex = 0;
73+
}
6574

75+
_sampleStreamsLock.EnterWriteLock();
76+
try
77+
{
6678
for(var i = 0; i < _sampleStreams.Length; i++)
6779
_sampleStreams[i].Reset();
6880
}
81+
finally
82+
{
83+
_sampleStreamsLock.ExitWriteLock();
84+
}
6985
}
7086

7187
public void FlushBuffer()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.CounterInt64Tests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateCounter();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
long vl = 0;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Inc(rnd.Next());
26+
if(i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private CounterInt64 CreateCounter()
38+
{
39+
var config = new MetricConfiguration("test", string.Empty, Array.Empty<string>(), false);
40+
return new CounterInt64(config, Array.Empty<string>());
41+
}
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.CounterTests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateCounter();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
double vl = 0;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Inc(rnd.NextDouble());
26+
if(i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private Counter CreateCounter()
38+
{
39+
var config = new MetricConfiguration("test", string.Empty, Array.Empty<string>(), false);
40+
return new Counter(config, Array.Empty<string>());
41+
}
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.GaugeInt64Tests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateGauge();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
long vl;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Inc(rnd.Next());
26+
if (i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private IGauge<long> CreateGauge()
38+
{
39+
var config = new MetricConfiguration("test", string.Empty, Array.Empty<string>(), false);
40+
return new GaugeInt64(config, Array.Empty<string>());
41+
}
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.GaugeTests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateGauge();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
double vl;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Inc(rnd.NextDouble());
26+
if (i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private IGauge CreateGauge()
38+
{
39+
var config = new MetricConfiguration("test", string.Empty, Array.Empty<string>(), false);
40+
return new Gauge(config, Array.Empty<string>());
41+
}
42+
}
43+
}

tests/Prometheus.Client.Tests/HistogramTests/HistogramConfigurationTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Linq;
32
using Xunit;
43

54
namespace Prometheus.Client.Tests.HistogramTests
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.HistogramTests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateHistogram();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
HistogramState vl;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Observe(rnd.NextDouble());
26+
if (i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private IHistogram CreateHistogram(double[] buckets = null)
38+
{
39+
var config = new HistogramConfiguration("test", string.Empty, Array.Empty<string>(), buckets, false);
40+
return new Histogram(config, Array.Empty<string>());
41+
}
42+
}
43+
}

tests/Prometheus.Client.Tests/MetricFactoryLegacyExtensionsTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using NSubstitute;
2-
using Prometheus.Client;
32
using Xunit;
43

54
namespace Prometheus.Client.Tests
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.SummaryTests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateSummary();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
SummaryState vl;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Observe(rnd.NextDouble());
26+
27+
if (i % 100 == 0)
28+
vl = metric.Value;
29+
}
30+
31+
metric.Reset();
32+
}))
33+
.ToArray();
34+
35+
await Task.WhenAll(tasks);
36+
}
37+
38+
private Summary CreateSummary()
39+
{
40+
var config = new SummaryConfiguration("test", string.Empty, Array.Empty<string>(), false);
41+
return new Summary(config, Array.Empty<string>());
42+
}
43+
}
44+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
6+
namespace Prometheus.Client.Tests.UntypedTests
7+
{
8+
public class ThreadingTests
9+
{
10+
[Theory]
11+
[InlineData(10000, 1)]
12+
[InlineData(10000, 10)]
13+
[InlineData(10000, 100)]
14+
public async Task ObserveInParallel(int observations, int threads)
15+
{
16+
var metric = CreateUntyped();
17+
18+
var tasks = Enumerable.Range(0, threads)
19+
.Select(n => Task.Run(() =>
20+
{
21+
double vl;
22+
var rnd = new Random();
23+
for (var i = 0; i < observations; i++)
24+
{
25+
metric.Set(rnd.NextDouble());
26+
if (i % 100 == 0)
27+
vl = metric.Value;
28+
}
29+
30+
metric.Reset();
31+
}))
32+
.ToArray();
33+
34+
await Task.WhenAll(tasks);
35+
}
36+
37+
private IUntyped CreateUntyped()
38+
{
39+
var config = new MetricConfiguration("test", string.Empty, Array.Empty<string>(), false);
40+
return new Untyped(config, Array.Empty<string>());
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)