Skip to content

Commit 6caf83f

Browse files
committed
Update SQS query logic to improve time handling and CloudWatch metric collection
- Adjusted `Initialize` to use the most eastern timezone UTC offset for accurate time handling to validate internally using TUTC - Fixed Start/End timestamps to correctly be Utc - Upgraded `AWSSDK.CloudWatch` package to version 4.0.6.1 for latest fixes and improvements. - Replaced manual cancellation handling by utilizing `TestContext.CurrentContext.CancellationToken` to improve test cancellation flow.
1 parent 99d8a55 commit 6caf83f

File tree

3 files changed

+25
-20
lines changed

3 files changed

+25
-20
lines changed

src/Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
</PropertyGroup>
66
<ItemGroup Label="Versions for direct package references">
77
<PackageVersion Include="Autofac" Version="8.2.0" />
8-
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
8+
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.6.1" />
99
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
1010
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
1111
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />

src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ class AmazonSQSQueryTests : TransportTestFixture
2525
public void Initialise()
2626
{
2727
provider = new();
28-
provider.SetUtcNow(DateTimeOffset.UtcNow);
28+
29+
var kiribati = TimeZoneInfo.FindSystemTimeZoneById("Pacific/Kiritimati");
30+
var furthestAhead = TimeZoneInfo.ConvertTime(DateTimeOffset.UtcNow, kiribati);
31+
provider.SetUtcNow(furthestAhead);
2932
transportSettings = new TransportSettings
3033
{
31-
ConnectionString = configuration.ConnectionString,
32-
MaxConcurrency = 1,
33-
EndpointName = Guid.NewGuid().ToString("N")
34+
ConnectionString = configuration.ConnectionString, MaxConcurrency = 1, EndpointName = Guid.NewGuid().ToString("N")
3435
};
3536
query = new AmazonSQSQuery(NullLogger<AmazonSQSQuery>.Instance, provider, transportSettings);
3637
}
@@ -94,11 +95,9 @@ public async Task TestConnectionWithValidSettings()
9495
}
9596

9697
[Test]
98+
[CancelAfter(6 * 60 * 1000)]
9799
public async Task RunScenario()
98100
{
99-
// We need to wait a bit of time, to ensure AWS metrics are retrievable
100-
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(6));
101-
CancellationToken token = cancellationTokenSource.Token;
102101
const int numMessagesToIngest = 15;
103102

104103
await CreateTestQueue(transportSettings.EndpointName);
@@ -120,12 +119,13 @@ public async Task RunScenario()
120119
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.Region, connectionString.Region);
121120
}
122121

123-
query.Initialize(new ReadOnlyDictionary<string, string>(dictionary));
122+
query.Initialize(dictionary.AsReadOnly());
124123

125-
await Task.Delay(TimeSpan.FromMinutes(2), token);
124+
// Wait for metrics to become visible, usually takes 20-30 seconds
125+
await Task.Delay(TimeSpan.FromMinutes(1), TestContext.CurrentContext.CancellationToken);
126126

127127
var queueNames = new List<IBrokerQueue>();
128-
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
128+
await foreach (IBrokerQueue queueName in query.GetQueueNames(TestContext.CurrentContext.CancellationToken))
129129
{
130130
queueNames.Add(queueName);
131131
}
@@ -137,7 +137,7 @@ public async Task RunScenario()
137137

138138
DateTime startDate = provider.GetUtcNow().DateTime;
139139
provider.Advance(TimeSpan.FromDays(1));
140-
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), token))
140+
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), TestContext.CurrentContext.CancellationToken))
141141
{
142142
total += queueThroughput.TotalThroughput;
143143
}

src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,24 +197,30 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
197197
DateOnly startDate,
198198
[EnumeratorCancellation] CancellationToken cancellationToken = default)
199199
{
200-
var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1);
200+
var utcNow = timeProvider.GetUtcNow();
201+
var endDate = DateOnly.FromDateTime(utcNow.DateTime).AddDays(-1); // Query date up to but not including today
201202

202203
if (endDate < startDate)
203204
{
204205
yield break;
205206
}
206207

208+
var startUtc = startDate.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
209+
var endUtc = endDate.AddDays(1).ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
210+
211+
const int SecondsInDay = 24 * 60 * 60;;
212+
207213
var req = new GetMetricStatisticsRequest
208214
{
209215
Namespace = "AWS/SQS",
210216
MetricName = "NumberOfMessagesDeleted",
211-
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
212-
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
213-
Period = 24 * 60 * 60, // 1 day
217+
StartTime = startUtc,
218+
EndTime = endUtc, // exclusive
219+
Period = SecondsInDay,
214220
Statistics = ["Sum"],
215221
Dimensions = [
216222
new Dimension { Name = "QueueName", Value = brokerQueue.QueueName }
217-
]
223+
],
218224
};
219225

220226
var resp = await cloudWatch!.GetMetricStatisticsAsync(req, cancellationToken);
@@ -228,11 +234,10 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
228234
currentDate = currentDate.AddDays(1);
229235
}
230236

237+
// Cloudwatch returns data points per 5 minutes in UTC
231238
foreach (var datapoint in resp.Datapoints ?? [])
232239
{
233-
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
234-
// See https://github.com/aws/aws-sdk-net/issues/167
235-
// So do not convert the timestamp to UTC time!
240+
logger.LogInformation("\tDatapoint {Timestamp:O} {Sum} {Unit}", datapoint.Timestamp, datapoint.Sum, datapoint.Unit);
236241
if (datapoint.Timestamp.HasValue)
237242
{
238243
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);

0 commit comments

Comments
 (0)