-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathBrokerThroughputCollectorHostedService.cs
More file actions
127 lines (105 loc) · 4.83 KB
/
BrokerThroughputCollectorHostedService.cs
File metadata and controls
127 lines (105 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
namespace Particular.LicensingComponent.BrokerThroughput;
using System.Collections.ObjectModel;
using Contracts;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Persistence;
using ServiceControl.Configuration;
using ServiceControl.Transports.BrokerThroughput;
using Shared;
public class BrokerThroughputCollectorHostedService(
ILogger<BrokerThroughputCollectorHostedService> logger,
IBrokerThroughputQuery brokerThroughputQuery,
ThroughputSettings throughputSettings,
ILicensingDataStore dataStore,
TimeProvider timeProvider)
: BackgroundService
{
public TimeSpan DelayStart { get; set; } = TimeSpan.FromSeconds(40);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<KeyDescriptionPair> brokerKeys)
=> new(brokerKeys.Select(pair => KeyValuePair.Create(pair.Key, SettingsReader.Read<string>(ThroughputSettings.SettingsNamespace, pair.Key)))
.Where(pair => !string.IsNullOrEmpty(pair.Value)).ToDictionary());
brokerThroughputQuery.Initialize(LoadBrokerSettingValues(brokerThroughputQuery.Settings));
if (brokerThroughputQuery.HasInitialisationErrors(out var errorMessage))
{
logger.LogError($"Could not start {nameof(BrokerThroughputCollectorHostedService)}, due to initialisation errors:\n{errorMessage}");
return;
}
logger.LogInformation($"Starting {nameof(BrokerThroughputCollectorHostedService)}");
try
{
await Task.Delay(DelayStart, stoppingToken);
using PeriodicTimer timer = new(TimeSpan.FromDays(1), timeProvider);
do
{
try
{
await GatherThroughput(stoppingToken);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogError(ex, "Failed to gather throughput from broker");
}
} while (await timer.WaitForNextTickAsync(stoppingToken));
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
}
}
async Task GatherThroughput(CancellationToken stoppingToken)
{
logger.LogInformation("Gathering throughput from broker");
var waitingTasks = new List<Task>();
var postfixGenerator = new PostfixGenerator();
await foreach (var queueName in brokerThroughputQuery.GetQueueNames(stoppingToken))
{
if (PlatformEndpointHelper.IsPlatformEndpoint(queueName.SanitizedName, throughputSettings))
{
continue;
}
var postfix = postfixGenerator.GetPostfix(queueName.SanitizedName);
waitingTasks.Add(Exec(queueName, postfix));
}
await Task.WhenAll(waitingTasks);
await dataStore.SaveBrokerMetadata(new BrokerMetadata(brokerThroughputQuery.ScopeType, brokerThroughputQuery.Data), stoppingToken);
return;
async Task Exec(IBrokerQueue queueName, string postfix)
{
var endpointId = new EndpointIdentifier(queueName.QueueName, ThroughputSource.Broker);
var endpoint = await dataStore.GetEndpoint(endpointId, stoppingToken);
if (endpoint == null)
{
endpoint = new Endpoint(endpointId)
{
SanitizedName = queueName.SanitizedName + postfix,
Scope = queueName.Scope,
EndpointIndicators = [.. queueName.EndpointIndicators]
};
await dataStore.SaveEndpoint(endpoint, stoppingToken);
}
var defaultStartDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-30);
var startDate = endpoint.LastCollectedDate < defaultStartDate
? defaultStartDate
: endpoint.LastCollectedDate.AddDays(1);
await foreach (var queueThroughput in brokerThroughputQuery.GetThroughputPerDay(queueName, startDate, stoppingToken))
{
await dataStore.RecordEndpointThroughput(queueName.QueueName, ThroughputSource.Broker, queueThroughput.DateUTC, queueThroughput.TotalThroughput, stoppingToken);
}
}
}
class PostfixGenerator
{
readonly Dictionary<string, int> names = new(StringComparer.OrdinalIgnoreCase);
public string GetPostfix(string sanitizedName)
{
if (!names.TryAdd(sanitizedName, 0))
{
names[sanitizedName]++;
}
return names[sanitizedName] == 0 ? string.Empty : names[sanitizedName].ToString();
}
}
}