Skip to content

Commit 3728845

Browse files
authored
Add metric on extractor upload queue length (#874)
* Add metric on extractor upload queue length * Review comment
1 parent 999bb29 commit 3728845

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

Extractor/Utils/AsyncBlockingQueue.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using Microsoft.Extensions.Logging;
77
using Nito.AsyncEx;
8+
using Prometheus;
89

910
namespace Cognite.OpcUa.Utils
1011
{
@@ -32,6 +33,9 @@ public class AsyncBlockingQueue<T>
3233

3334
private ILogger log;
3435

36+
private static readonly Gauge queueLength = Metrics
37+
.CreateGauge("opcua_extractor_queue_length", "Length of the upload queues", "type");
38+
3539
/// <summary>
3640
/// Constructor.
3741
/// </summary>
@@ -52,6 +56,11 @@ private void NotifyOverflow()
5256
OnQueueOverflow?.Invoke(this, new EventArgs());
5357
}
5458

59+
private void UpdateMetrics()
60+
{
61+
queueLength.WithLabels(Name).Set(queue.Count);
62+
}
63+
5564
/// <summary>
5665
/// Enqeueue an item, blocking until the queue has capacity to accept the item.
5766
/// </summary>
@@ -67,6 +76,7 @@ public void Enqueue(T item, CancellationToken token = default)
6776
queueNotFull.Wait(token);
6877
}
6978
queue.Enqueue(item);
79+
UpdateMetrics();
7080
queueNotEmpty.Notify();
7181
if (Capacity > 0 && queue.Count >= Capacity) NotifyOverflow();
7282
}
@@ -86,13 +96,14 @@ public void Enqueue(IEnumerable<T> items, CancellationToken token = default)
8696
while (Capacity > 0 && queue.Count >= Capacity)
8797
{
8898
log.LogTrace("{} queue is full", Name);
99+
UpdateMetrics();
89100
queueNotFull.Wait(token);
90101
}
91102
queue.Enqueue(item);
92103
queueNotEmpty.Notify();
93104
if (Capacity > 0 && queue.Count >= Capacity) NotifyOverflow();
94105
}
95-
106+
UpdateMetrics();
96107
}
97108
}
98109

@@ -111,6 +122,7 @@ public async Task EnqueueAsync(T item, CancellationToken token = default)
111122
await queueNotFull.WaitAsync(token);
112123
}
113124
queue.Enqueue(item);
125+
UpdateMetrics();
114126
queueNotEmpty.Notify();
115127
if (Capacity > 0 && queue.Count >= Capacity) NotifyOverflow();
116128
}
@@ -130,12 +142,14 @@ public async Task EnqueueAsync(IEnumerable<T> items, CancellationToken token = d
130142
while (Capacity > 0 && queue.Count >= Capacity)
131143
{
132144
log.LogTrace("{} queue is full", Name);
145+
UpdateMetrics();
133146
await queueNotFull.WaitAsync(token);
134147
}
135148
queue.Enqueue(item);
136149
queueNotEmpty.Notify();
137150
if (Capacity > 0 && queue.Count >= Capacity) NotifyOverflow();
138151
}
152+
UpdateMetrics();
139153
}
140154
}
141155

@@ -152,6 +166,7 @@ public async IAsyncEnumerable<T> DrainAsync([EnumeratorCancellation] Cancellatio
152166
{
153167
yield return item;
154168
}
169+
UpdateMetrics();
155170
queueNotFull.NotifyAll();
156171
}
157172
}
@@ -169,6 +184,7 @@ public IEnumerable<T> Drain(CancellationToken token = default)
169184
{
170185
yield return item;
171186
}
187+
UpdateMetrics();
172188
queueNotFull.NotifyAll();
173189
}
174190
}
@@ -186,6 +202,7 @@ public bool TryDequeue(out T? item, CancellationToken token = default)
186202
using (queueMutex.Lock(token))
187203
{
188204
var r = queue.TryDequeue(out item);
205+
UpdateMetrics();
189206
if (r) queueNotFull.Notify();
190207
return r;
191208
}
@@ -202,6 +219,7 @@ public bool TryDequeue(out T? item, CancellationToken token = default)
202219
using (await queueMutex.LockAsync(token))
203220
{
204221
var r = queue.TryDequeue(out var item);
222+
UpdateMetrics();
205223
if (r) queueNotFull.Notify();
206224
else return default;
207225
return item;
@@ -221,6 +239,7 @@ public T Dequeue(CancellationToken token = default)
221239
{
222240
token.ThrowIfCancellationRequested();
223241
var r = queue.TryDequeue(out var item);
242+
UpdateMetrics();
224243
if (r)
225244
{
226245
queueNotFull.Notify();
@@ -244,6 +263,7 @@ public async Task<T> DequeueAsync(CancellationToken token = default)
244263
{
245264
token.ThrowIfCancellationRequested();
246265
var r = queue.TryDequeue(out var item);
266+
UpdateMetrics();
247267
if (r)
248268
{
249269
queueNotFull.Notify();
@@ -263,6 +283,7 @@ public async Task Clear(CancellationToken token = default)
263283
using (await queueMutex.LockAsync(token))
264284
{
265285
queue.Clear();
286+
UpdateMetrics();
266287
queueNotFull.NotifyAll();
267288
}
268289
}

manifest.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ schema:
6767
- "https://raw.githubusercontent.com/"
6868

6969
versions:
70-
"2.28.2":
70+
"2.39.0":
71+
description: Add a metric on upload queue length.
72+
changelog:
73+
added:
74+
- Added a metric opcua_extractor_queue_length to monitor the length of the upload queues. This metric is labeled either "Events" or "Datapoints".
75+
"2.38.2":
7176
description: Avoid restarting history after rebrowse if no new nodes were discovered.
7277
changelog:
7378
fixed:

0 commit comments

Comments
 (0)