-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathQueueLengthProvider.cs
More file actions
115 lines (97 loc) · 4.34 KB
/
QueueLengthProvider.cs
File metadata and controls
115 lines (97 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
namespace ServiceControl.Transports.ASBS
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;
class QueueLengthProvider : AbstractQueueLengthProvider
{
public QueueLengthProvider(TransportSettings settings, Action<QueueLengthEntry[], EndpointToQueueMapping> store, ILogger<QueueLengthProvider> logger) : base(settings, store)
{
var connectionSettings = ConnectionStringParser.Parse(ConnectionString);
queryDelayInterval = connectionSettings.QueryDelayInterval ?? TimeSpan.FromMilliseconds(500);
managementClient = connectionSettings.AuthenticationMethod.BuildManagementClient();
this.logger = logger;
}
public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) =>
endpointQueueMappings.AddOrUpdate(
queueToTrack.InputQueue,
id => queueToTrack.EndpointName,
(id, old) => queueToTrack.EndpointName
);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
logger.LogDebug("Waiting for next interval");
await Task.Delay(queryDelayInterval, stoppingToken);
logger.LogDebug("Querying management client.");
var queueRuntimeInfos = await GetQueueList(stoppingToken);
logger.LogDebug("Retrieved details of {QueueCount} queues", queueRuntimeInfos.Count);
UpdateAllQueueLengths(queueRuntimeInfos);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// no-op
}
catch (Exception e)
{
logger.LogError(e, "Error querying Azure Service Bus queue sizes.");
}
}
}
async Task<IReadOnlyDictionary<string, QueueRuntimeProperties>> GetQueueList(CancellationToken cancellationToken)
{
var queuePathToRuntimeInfo = new Dictionary<string, QueueRuntimeProperties>(StringComparer.InvariantCultureIgnoreCase);
var queuesRuntimeProperties = managementClient.GetQueuesRuntimePropertiesAsync(cancellationToken);
var enumerator = queuesRuntimeProperties.GetAsyncEnumerator(cancellationToken);
try
{
while (await enumerator.MoveNextAsync())
{
var queueRuntimeProperties = enumerator.Current;
queuePathToRuntimeInfo[queueRuntimeProperties.Name] = queueRuntimeProperties; // Assuming last write is most up to date
}
}
finally
{
await enumerator.DisposeAsync();
}
return queuePathToRuntimeInfo;
}
void UpdateAllQueueLengths(IReadOnlyDictionary<string, QueueRuntimeProperties> queues)
{
foreach (var eq in endpointQueueMappings)
{
UpdateQueueLength(eq, queues);
}
}
void UpdateQueueLength(KeyValuePair<string, string> monitoredEndpoint, IReadOnlyDictionary<string, QueueRuntimeProperties> queues)
{
var endpointName = monitoredEndpoint.Value;
var queueName = monitoredEndpoint.Key;
if (!queues.TryGetValue(queueName, out var runtimeInfo))
{
return;
}
var entries = new[]
{
new QueueLengthEntry
{
DateTicks = DateTime.UtcNow.Ticks,
Value = runtimeInfo.ActiveMessageCount
}
};
Store(entries, new EndpointToQueueMapping(endpointName, queueName));
}
readonly ConcurrentDictionary<string, string> endpointQueueMappings = new ConcurrentDictionary<string, string>();
readonly ServiceBusAdministrationClient managementClient;
readonly TimeSpan queryDelayInterval;
readonly ILogger<QueueLengthProvider> logger;
}
}