Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions src/AppCommon/Commands/AzureServiceBusCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati
{
try
{
var endTime = DateTime.UtcNow.Date.AddDays(1);
var startTime = endTime.AddDays(-30);
var endTime = DateOnly.FromDateTime(DateTime.UtcNow).AddDays(-1);
var startTime = endTime.AddDays(-90);
var results = new List<QueueThroughput>();

azure.ResetConnectionQueue();
Expand All @@ -79,30 +79,49 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati

Out.WriteLine($"Gathering metrics for queue {i + 1}/{queueNames.Length}: {queueName}");

var metricValues = await azure.GetMetrics(queueName, startTime, endTime, cancellationToken);
var metricValues = (await azure.GetMetrics(queueName, startTime, endTime, cancellationToken)).OrderBy(m => m.TimeStamp).ToArray();

if (metricValues is not null)
{
var maxThroughput = metricValues.Select(timeEntry => timeEntry.Total).Max();

// Since we get 30 days of data, if there's no throughput in that amount of time, hard to legitimately call it an endpoint
// Since we get 90 days of data, if there's no throughput in that amount of time, hard to legitimately call it an endpoint
if (maxThroughput is not null and not 0)
{
results.Add(new QueueThroughput { QueueName = queueName, Throughput = (long?)maxThroughput });
var start = DateOnly.FromDateTime(metricValues.First().TimeStamp.UtcDateTime);
var end = DateOnly.FromDateTime(metricValues.Last().TimeStamp.UtcDateTime);
var currentDate = start;
var data = new Dictionary<DateOnly, DailyThroughput>();
while (currentDate <= end)
{
data.Add(currentDate, new DailyThroughput { MessageCount = 0, DateUTC = currentDate });

currentDate = currentDate.AddDays(1);
}

foreach (var metricValue in metricValues)
{
currentDate = DateOnly.FromDateTime(metricValue.TimeStamp.UtcDateTime);
data[currentDate] = new DailyThroughput { MessageCount = (long)(metricValue.Total ?? 0), DateUTC = currentDate };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we distinguish your initialized "0" above from the metrics "0"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to distinguish whether the zero comes from the default or is set later on?
There is no reason to distinguish those.
The reason we need to do (metricValue.Total ?? 0) is because metricValue.Total can return a null, that is all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what circumstances lead to a null?

}

results.Add(new QueueThroughput { QueueName = queueName, Throughput = (long?)maxThroughput, DailyThroughputFromBroker = [.. data.Values] });
}
else
{
Out.WriteLine(" - No throughput detected in 30 days, ignoring");
Out.WriteLine(" - No throughput detected in 90 days, ignoring");
}
}
}

var s = new DateTimeOffset(startTime, TimeOnly.MinValue, TimeSpan.Zero);
var e = new DateTimeOffset(endTime, TimeOnly.MaxValue, TimeSpan.Zero);
return new QueueDetails
{
StartTime = new DateTimeOffset(startTime, TimeSpan.Zero),
EndTime = new DateTimeOffset(endTime, TimeSpan.Zero),
StartTime = s,
EndTime = e,
Queues = results.OrderBy(q => q.QueueName).ToArray(),
TimeOfObservation = TimeSpan.FromDays(1)
TimeOfObservation = e - s
};
}
catch (QueryException x)
Expand Down
2 changes: 1 addition & 1 deletion src/AppCommon/Commands/BaseCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async Task RunInternal(CancellationToken cancellationToken)
ScopeType = data.ScopeType,
StartTime = data.StartTime,
EndTime = data.EndTime,
ReportDuration = data.TimeOfObservation ?? data.EndTime - data.StartTime,
ReportDuration = data.TimeOfObservation ?? (data.EndTime - data.StartTime),
Queues = data.Queues,
TotalThroughput = data.Queues.Sum(q => q.Throughput ?? 0),
TotalQueues = data.Queues.Length,
Expand Down
40 changes: 34 additions & 6 deletions src/AppCommon/Commands/SqsCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,41 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati

var tasks = queueNames.Select(async queueName =>
{
var maxThroughput = await aws.GetMaxThroughput(queueName, cancellationToken).ConfigureAwait(false);
var datapoints = (await aws.GetMMetricsData(queueName, cancellationToken)).OrderBy(d => d.Timestamp).ToArray();

// Since we get 30 days of data, if there's no throughput in that amount of time, hard to legitimately call it an endpoint
var maxThroughput = datapoints is { Length: > 0 } ?
(long)datapoints.Select(d => d.Sum.GetValueOrDefault(0)).Max() : 0L;
// Since we get 365 days of data, if there's no throughput in that amount of time, hard to legitimately call it an endpoint
if (maxThroughput > 0)
{
var startTime = DateOnly.FromDateTime(datapoints.First().Timestamp.Value);
var endTime = DateOnly.FromDateTime(datapoints.Last().Timestamp.Value);
DateOnly currentDate = startTime;
var dailyData = new Dictionary<DateOnly, DailyThroughput>();
while (currentDate <= endTime)
{
dailyData.Add(currentDate, new DailyThroughput { MessageCount = 0, DateUTC = currentDate });

currentDate = currentDate.AddDays(1);
}

foreach (var datapoint in datapoints)
{
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
// See https://github.com/aws/aws-sdk-net/issues/167
// So do not convert the timestamp to UTC time!
if (datapoint.Timestamp.HasValue)
{
currentDate = DateOnly.FromDateTime(datapoint.Timestamp.Value);
dailyData[currentDate] = new DailyThroughput { MessageCount = (long)datapoint.Sum.GetValueOrDefault(0), DateUTC = currentDate };
}
}

data.Add(new QueueThroughput
{
QueueName = queueName,
Throughput = maxThroughput
Throughput = maxThroughput,
DailyThroughputFromBroker = [.. dailyData.Values]
});
}

Expand All @@ -94,12 +120,14 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati

Out.EndProgress();

var s = new DateTimeOffset(aws.StartDate.ToDateTime(TimeOnly.MinValue), TimeSpan.Zero);
var e = new DateTimeOffset(aws.EndDate.ToDateTime(TimeOnly.MaxValue), TimeSpan.Zero);
return new QueueDetails
{
StartTime = new DateTimeOffset(aws.StartDate.ToDateTime(TimeOnly.MinValue), TimeSpan.Zero),
EndTime = new DateTimeOffset(aws.EndDate.ToDateTime(TimeOnly.MaxValue), TimeSpan.Zero),
StartTime = s,
EndTime = e,
Queues = [.. data.OrderBy(q => q.QueueName)],
TimeOfObservation = TimeSpan.FromDays(1)
TimeOfObservation = e - s
};
}

Expand Down
7 changes: 3 additions & 4 deletions src/Query/AmazonSQS/AwsQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public AwsQuery()
QueueLimit = int.MaxValue
});
EndDate = DateOnly.FromDateTime(DateTime.UtcNow).AddDays(1);
StartDate = EndDate.AddDays(-30);
StartDate = EndDate.AddDays(-365);

sqs = new AmazonSQSClient();
cloudWatch = new AmazonCloudWatchClient();
Expand Down Expand Up @@ -83,7 +83,7 @@ public async Task<List<string>> GetQueueNames(Action<int> onProgress, Cancellati
}
}

public async Task<long> GetMaxThroughput(string queueName, CancellationToken cancellationToken = default)
public async Task<List<Datapoint>> GetMMetricsData(string queueName, CancellationToken cancellationToken = default)
{
var req = new GetMetricStatisticsRequest
{
Expand All @@ -100,8 +100,7 @@ public async Task<long> GetMaxThroughput(string queueName, CancellationToken can
using var lease = await rateLimiter.AcquireAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var resp = await cloudWatch.GetMetricStatisticsAsync(req, cancellationToken).ConfigureAwait(false);

return resp.Datapoints is { Count: > 0 } ?
(long)resp.Datapoints.Select(d => d.Sum.GetValueOrDefault(0)).Max() : 0L;
return resp.Datapoints ?? [];
}
}
}
6 changes: 3 additions & 3 deletions src/Query/AzureServiceBus/AzureClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,18 @@ bool NextCredentials()
}
}

public Task<IReadOnlyList<MetricValue>> GetMetrics(string queueName, DateTime startTime, DateTime endTime, CancellationToken cancellationToken = default)
public Task<IReadOnlyList<MetricValue>> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime, CancellationToken cancellationToken = default)
{
return GetDataWithCurrentCredentials(async token =>
{
try
{
var response = await currentClients.Metrics.QueryResourceAsync(resourceId,
new[] { "CompleteMessage" },
["CompleteMessage"],
new MetricsQueryOptions
{
Filter = $"EntityName eq '{queueName}'",
TimeRange = new QueryTimeRange(startTime, endTime),
TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
Granularity = TimeSpan.FromDays(1)
},
token).ConfigureAwait(false);
Expand Down