Skip to content

Commit 90c2c46

Browse files
Update to SQS 7.4.0 (#4996)
* Update to SQS 7.4.0 and v4 SDK * Validate region name is within allowed names This is required because RegionEndpoint.GetBySystemName() returns `unknown` instead of throwing. --------- Co-authored-by: John Simons <[email protected]>
1 parent 7369efd commit 90c2c46

File tree

3 files changed

+31
-21
lines changed

3 files changed

+31
-21
lines changed

src/Directory.Packages.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
</PropertyGroup>
66
<ItemGroup Label="Versions for direct package references">
77
<PackageVersion Include="Autofac" Version="8.2.0" />
8-
<PackageVersion Include="AWSSDK.CloudWatch" Version="3.7.402.63" />
9-
<PackageVersion Include="AWSSDK.SecurityToken" Version="3.7.401.78" />
8+
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
9+
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
1010
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
1111
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
1212
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
@@ -31,7 +31,7 @@
3131
<PackageVersion Include="NLog.Extensions.Logging" Version="5.4.0" />
3232
<PackageVersion Include="NServiceBus" Version="9.2.7" />
3333
<PackageVersion Include="NServiceBus.AcceptanceTesting" Version="9.2.7" />
34-
<PackageVersion Include="NServiceBus.AmazonSQS" Version="7.3.0" />
34+
<PackageVersion Include="NServiceBus.AmazonSQS" Version="7.4.0" />
3535
<PackageVersion Include="NServiceBus.CustomChecks" Version="5.0.1" />
3636
<PackageVersion Include="NServiceBus.Extensions.Hosting" Version="3.0.1" />
3737
<PackageVersion Include="NServiceBus.Extensions.Logging" Version="3.0.1" />
@@ -91,4 +91,4 @@
9191
<GlobalPackageReference Include="Microsoft.Build.CopyOnWrite" Version="1.0.334" />
9292
<GlobalPackageReference Include="Particular.Packaging" Version="4.2.2" />
9393
</ItemGroup>
94-
</Project>
94+
</Project>

src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace ServiceControl.Transports.SQS;
1414
using Amazon.CloudWatch.Model;
1515
using Amazon.Runtime;
1616
using Amazon.Runtime.CredentialManagement;
17+
using Amazon.Runtime.Credentials;
1718
using Amazon.SQS;
1819
using Amazon.SQS.Model;
1920
using BrokerThroughput;
@@ -29,7 +30,7 @@ public class AmazonSQSQuery(ILogger<AmazonSQSQuery> logger, TimeProvider timePro
2930
protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
3031
{
3132
var sqsConnectionString = new SQSTransportConnectionString(transportSettings.ConnectionString);
32-
AWSCredentials credentials = FallbackCredentialsFactory.GetCredentials();
33+
AWSCredentials credentials = DefaultAWSCredentialsIdentityResolver.GetCredentials();
3334
RegionEndpoint? regionEndpoint = null;
3435
if (settings.TryGetValue(AmazonSQSSettings.Profile, out string? profile))
3536
{
@@ -91,9 +92,15 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
9192
}
9293
}
9394

95+
bool IsValidAwsRegion(string region) => RegionEndpoint.EnumerableAllRegions.Any(r => r.SystemName.Equals(region, StringComparison.OrdinalIgnoreCase));
96+
9497
if (settings.TryGetValue(AmazonSQSSettings.Region, out string? region))
9598
{
9699
string? previousSetSystemName = regionEndpoint?.SystemName;
100+
if (!IsValidAwsRegion(region))
101+
{
102+
throw new ArgumentException("Invalid region endpoint provided");
103+
}
97104
regionEndpoint = RegionEndpoint.GetBySystemName(region);
98105

99106
Diagnostics.Append($"Region set to \"{regionEndpoint.SystemName}\"");
@@ -108,6 +115,10 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
108115
{
109116
if (sqsConnectionString.Region != null)
110117
{
118+
if (!IsValidAwsRegion(sqsConnectionString.Region))
119+
{
120+
throw new ArgumentException("Invalid region endpoint provided");
121+
}
111122
regionEndpoint = RegionEndpoint.GetBySystemName(sqsConnectionString.Region);
112123
Diagnostics.AppendLine(
113124
$"Region not set, defaulted to using \"{regionEndpoint.SystemName}\" from the ConnectionString used by instance");
@@ -197,8 +208,8 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
197208
{
198209
Namespace = "AWS/SQS",
199210
MetricName = "NumberOfMessagesDeleted",
200-
StartTimeUtc = startDate.ToDateTime(TimeOnly.MinValue),
201-
EndTimeUtc = endDate.ToDateTime(TimeOnly.MaxValue),
211+
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
212+
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
202213
Period = 24 * 60 * 60, // 1 day
203214
Statistics = ["Sum"],
204215
Dimensions = [
@@ -217,12 +228,15 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
217228
currentDate = currentDate.AddDays(1);
218229
}
219230

220-
foreach (var datapoint in resp.Datapoints)
231+
foreach (var datapoint in resp.Datapoints ?? [])
221232
{
222233
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
223234
// See https://github.com/aws/aws-sdk-net/issues/167
224235
// So do not convert the timestamp to UTC time!
225-
data[DateOnly.FromDateTime(datapoint.Timestamp)].TotalThroughput = (long)datapoint.Sum;
236+
if (datapoint.Timestamp.HasValue)
237+
{
238+
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
239+
}
226240
}
227241

228242
foreach (QueueThroughput queueThroughput in data.Values)
@@ -244,7 +258,7 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
244258
{
245259
var response = await sqs!.ListQueuesAsync(request, cancellationToken);
246260

247-
foreach (var queue in response.QueueUrls.Select(url => url.Split('/')[4]))
261+
foreach (var queue in (response.QueueUrls ?? []).Select(url => url.Split('/')[4]))
248262
{
249263
if (!queue.EndsWith("-delay.fifo", StringComparison.OrdinalIgnoreCase))
250264
{

src/ServiceControl.Transports.SQS/QueueAttributesRequestCache.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,8 @@
66
using Amazon.SQS;
77
using Amazon.SQS.Model;
88

9-
class QueueAttributesRequestCache
9+
class QueueAttributesRequestCache(IAmazonSQS sqsClient)
1010
{
11-
public QueueAttributesRequestCache(IAmazonSQS sqsClient)
12-
{
13-
cache = new ConcurrentDictionary<string, GetQueueAttributesRequest>();
14-
this.sqsClient = sqsClient;
15-
}
16-
1711
public async Task<GetQueueAttributesRequest> GetQueueAttributesRequest(string queueName, CancellationToken cancellationToken = default)
1812
{
1913
if (cache.TryGetValue(queueName, out var attReq))
@@ -23,8 +17,11 @@ public async Task<GetQueueAttributesRequest> GetQueueAttributesRequest(string qu
2317

2418
var queueUrl = await GetQueueUrl(queueName, cancellationToken);
2519

26-
attReq = new GetQueueAttributesRequest { QueueUrl = queueUrl };
27-
attReq.AttributeNames.Add("ApproximateNumberOfMessages");
20+
attReq = new GetQueueAttributesRequest
21+
{
22+
QueueUrl = queueUrl,
23+
AttributeNames = ["ApproximateNumberOfMessages"]
24+
};
2825

2926
cache[queueName] = attReq;
3027

@@ -37,7 +34,6 @@ async Task<string> GetQueueUrl(string queueName, CancellationToken cancellationT
3734
return response.QueueUrl;
3835
}
3936

40-
ConcurrentDictionary<string, GetQueueAttributesRequest> cache;
41-
IAmazonSQS sqsClient;
37+
readonly ConcurrentDictionary<string, GetQueueAttributesRequest> cache = new();
4238
}
4339
}

0 commit comments

Comments
 (0)