Skip to content

Commit c21d63c

Browse files
ramonsmitsandreasohlundmauroservienti
authored
Improve SQS query time handling and CloudWatch metric collection logic (#5234)
Co-authored-by: Andreas Öhlund <[email protected]> Co-authored-by: Mauro Servienti <[email protected]>
1 parent 7ba444c commit c21d63c

File tree

4 files changed

+74
-48
lines changed

4 files changed

+74
-48
lines changed

src/Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
</PropertyGroup>
66
<ItemGroup Label="Versions for direct package references">
77
<PackageVersion Include="Autofac" Version="8.2.0" />
8-
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
8+
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.6.1" />
99
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
1010
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
1111
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
@@ -27,6 +27,7 @@
2727
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.1" />
2828
<PackageVersion Include="Microsoft.Extensions.Logging.Configuration" Version="8.0.1" />
2929
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.3" />
30+
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
3031
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.10.0" />
3132
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="18.0.1" />
3233
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5" />

src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace ServiceControl.Transport.Tests;
66
using System.Linq;
77
using System.Threading;
88
using System.Threading.Tasks;
9-
using Microsoft.Extensions.Logging.Abstractions;
9+
using Microsoft.Extensions.Logging;
1010
using Microsoft.Extensions.Time.Testing;
1111
using NUnit.Framework;
1212
using Particular.Approvals;
@@ -32,7 +32,9 @@ public void Initialise()
3232
MaxConcurrency = 1,
3333
EndpointName = Guid.NewGuid().ToString("N")
3434
};
35-
query = new AmazonSQSQuery(NullLogger<AmazonSQSQuery>.Instance, provider, transportSettings);
35+
var loggerFactory = LoggerFactory.Create(builder => builder.AddSimpleConsole().SetMinimumLevel(LogLevel.Trace));
36+
var logger = loggerFactory.CreateLogger<AmazonSQSQuery>();
37+
query = new AmazonSQSQuery(logger, provider, transportSettings);
3638
}
3739

3840
[Test]
@@ -94,11 +96,9 @@ public async Task TestConnectionWithValidSettings()
9496
}
9597

9698
[Test]
99+
[CancelAfter(2 * 60 * 1000)]
97100
public async Task RunScenario()
98101
{
99-
// We need to wait a bit of time, to ensure AWS metrics are retrievable
100-
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(6));
101-
CancellationToken token = cancellationTokenSource.Token;
102102
const int numMessagesToIngest = 15;
103103

104104
await CreateTestQueue(transportSettings.EndpointName);
@@ -111,37 +111,52 @@ public async Task RunScenario()
111111
{
112112
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.AccessKey, connectionString.AccessKey);
113113
}
114+
114115
if (!string.IsNullOrEmpty(connectionString.SecretKey))
115116
{
116117
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.SecretKey, connectionString.SecretKey);
117118
}
119+
118120
if (!string.IsNullOrEmpty(connectionString.Region))
119121
{
120122
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.Region, connectionString.Region);
121123
}
122124

123-
query.Initialize(new ReadOnlyDictionary<string, string>(dictionary));
125+
query.Initialize(dictionary.AsReadOnly());
124126

125-
await Task.Delay(TimeSpan.FromMinutes(2), token);
127+
var startDate = DateOnly.FromDateTime(provider.GetUtcNow().DateTime);
128+
provider.Advance(TimeSpan.FromDays(1));
126129

127-
var queueNames = new List<IBrokerQueue>();
128-
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
130+
while (!TestContext.CurrentContext.CancellationToken.IsCancellationRequested)
129131
{
130-
queueNames.Add(queueName);
131-
}
132+
await Task.Delay(TimeSpan.FromSeconds(5), TestContext.CurrentContext.CancellationToken);
132133

133-
IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");
134-
Assert.That(queue, Is.Not.Null);
134+
var queueNames = new List<IBrokerQueue>();
135+
await foreach (IBrokerQueue queueName in query.GetQueueNames(TestContext.CurrentContext.CancellationToken))
136+
{
137+
queueNames.Add(queueName);
138+
}
135139

136-
long total = 0L;
140+
IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");
137141

138-
DateTime startDate = provider.GetUtcNow().DateTime;
139-
provider.Advance(TimeSpan.FromDays(1));
140-
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), token))
141-
{
142-
total += queueThroughput.TotalThroughput;
142+
if (queue == null)
143+
{
144+
continue;
145+
}
146+
147+
long total = 0L;
148+
149+
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, startDate, TestContext.CurrentContext.CancellationToken))
150+
{
151+
total += queueThroughput.TotalThroughput;
152+
}
153+
154+
if (total == numMessagesToIngest)
155+
{
156+
return;
157+
}
143158
}
144159

145-
Assert.That(total, Is.EqualTo(numMessagesToIngest));
160+
Assert.Fail("Timeout waiting for expected throughput to be report");
146161
}
147162
}

src/ServiceControl.Transports.SQS.Tests/ServiceControl.Transports.SQS.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
<ItemGroup>
1515
<PackageReference Include="GitHubActionsTestLogger" />
16+
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
1617
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
1718
<PackageReference Include="Microsoft.NET.Test.Sdk" />
1819
<PackageReference Include="NServiceBus.AcceptanceTesting" />

src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -195,53 +195,62 @@ class AwsHttpClientFactory : HttpClientFactory
195195

196196
public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue,
197197
DateOnly startDate,
198-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
198+
[EnumeratorCancellation] CancellationToken cancellationToken)
199199
{
200-
var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1);
200+
var utcNow = timeProvider.GetUtcNow();
201+
var endDate = DateOnly.FromDateTime(utcNow.DateTime).AddDays(-1); // Query date up to but not including today
202+
203+
var isBeforeStartDate = endDate < startDate;
201204

202-
if (endDate < startDate)
205+
if (isBeforeStartDate)
203206
{
207+
logger.LogTrace("Skipping {Start} {End} {UtcNow}, ", startDate, endDate, utcNow);
204208
yield break;
205209
}
206210

211+
// Convert DATES that state up to but INCLUDING the TO value to a timestamp from X to Y EXCLUDING
212+
// Example: 2025-01-01 to 2025-01-10 => 2025-01-01T00:00:00 to 2025-01-11T00:00:00
213+
214+
var queryStartUtc = startDate.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
215+
var queryEndUtc = endDate
216+
.AddDays(1) // Convert from INCLUDING to EXCLUDING, thus need to bump one day, using ToDateTime(TimeOnly.MaxValue) would be wrong
217+
.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
218+
219+
logger.LogDebug("GetThroughputPerDay {QueueName} {UtcNow} {StartDate} {EndDate} {QueryStart} {QueryEnd}", brokerQueue.QueueName, utcNow, startDate, endDate, queryStartUtc, queryEndUtc);
220+
221+
const int SecondsInDay = 24 * 60 * 60;
222+
207223
var req = new GetMetricStatisticsRequest
208224
{
209225
Namespace = "AWS/SQS",
210226
MetricName = "NumberOfMessagesDeleted",
211-
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
212-
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
213-
Period = 24 * 60 * 60, // 1 day
227+
StartTime = queryStartUtc,
228+
EndTime = queryEndUtc, // The value specified is exclusive; results include data points up to the specified time stamp.
229+
Period = SecondsInDay,
214230
Statistics = ["Sum"],
215-
Dimensions = [
216-
new Dimension { Name = "QueueName", Value = brokerQueue.QueueName }
231+
Dimensions =
232+
[
233+
new Dimension
234+
{
235+
Name = "QueueName", Value = brokerQueue.QueueName
236+
}
217237
]
218238
};
219239

220240
var resp = await cloudWatch!.GetMetricStatisticsAsync(req, cancellationToken);
241+
var dataPoints = resp.Datapoints.ToDictionary(x => DateOnly.FromDateTime(x.Timestamp!.Value.Date), x => (long)(x.Sum ?? 0));
221242

222-
DateOnly currentDate = startDate;
223-
var data = new Dictionary<DateOnly, QueueThroughput>();
224-
while (currentDate <= endDate)
243+
for (DateOnly currentDate = startDate; currentDate <= endDate; currentDate = currentDate.AddDays(1))
225244
{
226-
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });
245+
dataPoints.TryGetValue(currentDate, out var sum);
227246

228-
currentDate = currentDate.AddDays(1);
229-
}
247+
logger.LogTrace("Queue throughput {QueueName} {Date} {Total}", brokerQueue.QueueName, currentDate, sum);
230248

231-
foreach (var datapoint in resp.Datapoints ?? [])
232-
{
233-
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
234-
// See https://github.com/aws/aws-sdk-net/issues/167
235-
// So do not convert the timestamp to UTC time!
236-
if (datapoint.Timestamp.HasValue)
249+
yield return new QueueThroughput
237250
{
238-
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
239-
}
240-
}
241-
242-
foreach (QueueThroughput queueThroughput in data.Values)
243-
{
244-
yield return queueThroughput;
251+
TotalThroughput = sum,
252+
DateUTC = currentDate
253+
};
245254
}
246255
}
247256

0 commit comments

Comments
 (0)