Skip to content

Commit 80d45c2

Browse files
authored
Add missing table storage read metrics (#1202)
1 parent 63d45d2 commit 80d45c2

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public async IAsyncEnumerable<BlobPartitionLease> ListLeasesAsync([EnumeratorCan
7777
await foreach (Page<Blob> page in this.taskHubContainer.ListBlobsAsync(this.blobDirectoryName, cancellationToken: cancellationToken).AsPages())
7878
{
7979
// Start each of the Tasks in parallel
80-
Task<BlobPartitionLease>[] downloadTasks = page.Values.Select(b => this.DownloadLeaseBlob(b, cancellationToken)).ToArray();
80+
Task<BlobPartitionLease>[] downloadTasks = page.Values.Select(b => DownloadLeaseBlob(b, cancellationToken)).ToArray();
8181

8282
foreach (Task<BlobPartitionLease> downloadTask in downloadTasks)
8383
{
@@ -134,7 +134,7 @@ public async Task<BlobPartitionLease> GetLeaseAsync(string partitionId, Cancella
134134
Blob leaseBlob = this.taskHubContainer.GetBlobReference(partitionId, this.blobDirectoryName);
135135
if (await leaseBlob.ExistsAsync(cancellationToken))
136136
{
137-
return await this.DownloadLeaseBlob(leaseBlob, cancellationToken);
137+
return await DownloadLeaseBlob(leaseBlob, cancellationToken);
138138
}
139139

140140
return null;
@@ -315,7 +315,7 @@ async Task<TaskHubInfo> GetTaskHubInfoAsync(CancellationToken cancellationToken)
315315
return null;
316316
}
317317

318-
async Task<BlobPartitionLease> DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken)
318+
static async Task<BlobPartitionLease> DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken)
319319
{
320320
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
321321
BlobPartitionLease deserializedLease = Utils.DeserializeFromJson<BlobPartitionLease>(result.Content);

src/DurableTask.AzureStorage/Storage/Table.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public async Task<bool> ExistsAsync(CancellationToken cancellationToken = defaul
6262
{
6363
// TODO: Re-evaluate the use of an "Exists" method as it was intentional omitted from the client API
6464
List<TableItem> tables = await this.tableServiceClient
65-
.QueryAsync(filter: $"TableName eq '{tableClient.Name}'", cancellationToken: cancellationToken)
65+
.QueryAsync(filter: $"TableName eq '{this.tableClient.Name}'", cancellationToken: cancellationToken)
6666
.DecorateFailure()
6767
.ToListAsync(cancellationToken);
6868

@@ -173,9 +173,15 @@ public async Task<TableTransactionResults> ExecuteBatchAsync(IEnumerable<TableTr
173173
return new TableTransactionResults(batchResults, stopwatch.Elapsed);
174174
}
175175

176-
public TableQueryResponse<T> ExecuteQueryAsync<T>(string? filter = null, int? maxPerPage = null, IEnumerable<string>? select = null, CancellationToken cancellationToken = default) where T : class, ITableEntity, new()
176+
public TableQueryResponse<T> ExecuteQueryAsync<T>(
177+
string? filter = null,
178+
int? maxPerPage = null,
179+
IEnumerable<string>? select = null,
180+
CancellationToken cancellationToken = default) where T : class, ITableEntity, new()
177181
{
178-
return new TableQueryResponse<T>(this.tableClient.QueryAsync<T>(filter, maxPerPage, select, cancellationToken).DecorateFailure());
182+
return new TableQueryResponse<T>(
183+
this.tableClient.QueryAsync<T>(filter, maxPerPage, select, cancellationToken).DecorateFailure(),
184+
this.stats);
179185
}
180186
}
181187
}

src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,37 @@ namespace DurableTask.AzureStorage.Storage
2020
using System.Threading;
2121
using System.Threading.Tasks;
2222
using Azure;
23+
using DurableTask.AzureStorage.Monitoring;
2324

2425
class TableQueryResponse<T> : AsyncPageable<T> where T : notnull
2526
{
2627
readonly AsyncPageable<T> query;
28+
readonly AzureStorageOrchestrationServiceStats stats;
2729

28-
public TableQueryResponse(AsyncPageable<T> query) =>
30+
public TableQueryResponse(AsyncPageable<T> query, AzureStorageOrchestrationServiceStats stats)
31+
{
2932
this.query = query ?? throw new ArgumentNullException(nameof(query));
33+
this.stats = stats ?? throw new ArgumentNullException(nameof(stats));
34+
}
3035

31-
public override IAsyncEnumerable<Page<T>> AsPages(string? continuationToken = null, int? pageSizeHint = null) =>
32-
this.query.AsPages(continuationToken, pageSizeHint);
36+
public override IAsyncEnumerable<Page<T>> AsPages(string? continuationToken = null, int? pageSizeHint = null)
37+
{
38+
return this.query.AsPages(continuationToken, pageSizeHint);
39+
}
3340

34-
public async Task<TableQueryResults<T>> GetResultsAsync(string? continuationToken = null, int? pageSizeHint = null, CancellationToken cancellationToken = default)
41+
public async Task<TableQueryResults<T>> GetResultsAsync(
42+
string? continuationToken = null,
43+
int? pageSizeHint = null,
44+
CancellationToken cancellationToken = default)
3545
{
3646
var sw = Stopwatch.StartNew();
3747

3848
int pages = 0;
3949
var entities = new List<T>();
4050
await foreach (Page<T> page in this.query.AsPages(continuationToken, pageSizeHint).WithCancellation(cancellationToken))
4151
{
52+
this.stats.TableEntitiesRead.Increment(page.Values.Count);
53+
4254
pages++;
4355
entities.AddRange(page.Values);
4456
}

0 commit comments

Comments
 (0)