Skip to content

Commit 3b0a7c5

Browse files
authored
chore(csharp): add JobId tracing tags (#153)
## What's Changed - adds tracing to tag the JobId to provide reference to the Google JobId. - simplifies by removing unnecessary async / await in nested async calls.
1 parent 2cd823e commit 3b0a7c5

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

csharp/src/BigQueryConnection.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,9 +569,16 @@ internal void UpdateClientToken()
569569
try
570570
{
571571
activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, IsSafeToTrace);
572-
573-
Func<Task<BigQueryResults?>> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions);
574-
BigQueryResults? result = ExecuteWithRetriesAsync<BigQueryResults?>(func, activity).GetAwaiter().GetResult();
572+
Task<BigQueryResults> func()
573+
{
574+
return this.TraceActivityAsync(async (activity) =>
575+
{
576+
BigQueryJob job = await Client.CreateQueryJobAsync(sql, parameters ?? Enumerable.Empty<BigQueryParameter>(), queryOptions);
577+
activity?.AddBigQueryTag("job_id", job.Reference.JobId);
578+
return await job.GetQueryResultsAsync(resultsOptions);
579+
}, ClassName + "." + nameof(ExecuteQuery) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
580+
}
581+
BigQueryResults? result = ExecuteWithRetriesAsync(func, activity).GetAwaiter().GetResult();
575582

576583
return result;
577584
}

csharp/src/BigQueryStatement.cs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,16 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
138138

139139
// We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
140140
// When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
141-
Func<Task<BigQueryResults>> getJobResults = async () =>
141+
Task<BigQueryResults> getJobResults()
142142
{
143-
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
143+
return ExecuteCancellableJobAsync(cancellationContext, activity, async (context, jobActivity) =>
144144
{
145-
return await this.TraceActivityAsync(async activity =>
146-
{
147-
// if the authentication token was reset, then we need a new job with the latest token
148-
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
149-
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
150-
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
151-
}).ConfigureAwait(false);
152-
};
145+
// if the authentication token was reset, then we need a new job with the latest token
146+
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
147+
jobActivity?.AddBigQueryTag("job_id", context.Job.Reference.JobId);
148+
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
149+
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
150+
}
153151

154152
BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
155153

@@ -181,7 +179,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
181179
evaluationKind = evaluationKindString;
182180
}
183181

184-
Func<Task<BigQueryResults>> getMultiJobResults = async () =>
182+
Task<BigQueryResults> getMultiJobResults()
185183
{
186184
// To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
187185
// Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
@@ -202,14 +200,15 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
202200
}
203201
BigQueryJob indexedJob = joblist[statementIndex - 1];
204202
cancellationContext.Job = indexedJob;
205-
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
203+
return ExecuteCancellableJobAsync(cancellationContext, activity, (context, jobActivity) =>
206204
{
207-
return await indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
208-
}).ConfigureAwait(false);
205+
jobActivity?.AddBigQueryTag("job_id", context.Job?.Reference.JobId);
206+
return indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken);
207+
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync) + ".MultiJobResults");
209208
}
210209

211210
throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
212-
};
211+
}
213212

214213
results = await ExecuteWithRetriesAsync(getMultiJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
215214
}
@@ -236,17 +235,15 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
236235

237236
long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;
238237

239-
Func<Task<IEnumerable<IArrowReader>>> getArrowReadersFunc = async () =>
238+
Task<IEnumerable<IArrowReader>> getArrowReadersFunc()
240239
{
241-
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
240+
return ExecuteCancellableJobAsync(cancellationContext, activity, (context, jobActivity) =>
242241
{
243-
return await this.TraceActivityAsync(async activity =>
244-
{
245-
// Cancelling this step may leave the server with unread streams.
246-
return await GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, activity, context.CancellationToken).ConfigureAwait(false);
247-
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(GetArrowReaders));
248-
}).ConfigureAwait(false);
249-
};
242+
// Cancelling this step may leave the server with unread streams.
243+
jobActivity?.AddBigQueryTag("job_id", context.Job?.Reference.JobId);
244+
return GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, jobActivity, context.CancellationToken);
245+
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(GetArrowReaders));
246+
}
250247
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
251248

252249
// Note: MultiArrowReader must dispose the cancellationContext.
@@ -751,14 +748,15 @@ private async Task<UpdateResult> ExecuteUpdateInternalAsync()
751748

752749
using JobCancellationContext context = new(cancellationRegistry);
753750
// Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
754-
Func<Task<BigQueryResults?>> getQueryResultsAsyncFunc = async () =>
751+
Task<BigQueryResults> getQueryResultsAsyncFunc()
755752
{
756-
return await ExecuteCancellableJobAsync(context, activity, async (context) =>
753+
return ExecuteCancellableJobAsync(context, activity, async (context, jobActivity) =>
757754
{
758755
context.Job = await this.Client.CreateQueryJobAsync(SqlQuery, null, null, context.CancellationToken).ConfigureAwait(false);
756+
jobActivity?.AddBigQueryTag("job_id", context.Job.Reference.JobId);
759757
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, context.CancellationToken).ConfigureAwait(false);
760-
}).ConfigureAwait(false);
761-
};
758+
}, ClassName + "." + nameof(ExecuteUpdateInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
759+
}
762760
BigQueryResults? result = await ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity, context.CancellationToken);
763761
long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;
764762

@@ -1056,11 +1054,15 @@ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity?
10561054
private async Task<T> ExecuteCancellableJobAsync<T>(
10571055
JobCancellationContext context,
10581056
Activity? activity,
1059-
Func<JobCancellationContext, Task<T>> func)
1057+
Func<JobCancellationContext, Activity?, Task<T>> func,
1058+
string activityName)
10601059
{
10611060
try
10621061
{
1063-
return await func(context).ConfigureAwait(false);
1062+
return await this.TraceActivityAsync(jobActivity =>
1063+
{
1064+
return func(context, jobActivity);
1065+
}, activityName).ConfigureAwait(false);
10641066
}
10651067
catch (Exception ex)
10661068
when (context.CancellationToken.IsCancellationRequested &&

0 commit comments

Comments
 (0)