Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions csharp/src/BigQueryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,16 @@ internal void UpdateClientToken()
try
{
activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, IsSafeToTrace);

Func<Task<BigQueryResults?>> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty<BigQueryParameter>(), queryOptions, resultsOptions);
BigQueryResults? result = ExecuteWithRetriesAsync<BigQueryResults?>(func, activity).GetAwaiter().GetResult();
Task<BigQueryResults> func()
{
return this.TraceActivityAsync(async (activity) =>
{
BigQueryJob job = await Client.CreateQueryJobAsync(sql, parameters ?? Enumerable.Empty<BigQueryParameter>(), queryOptions);
activity?.AddBigQueryTag("job_id", job.Reference.JobId);
return await job.GetQueryResultsAsync(resultsOptions);
}, ClassName + "." + nameof(ExecuteQuery) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
}
BigQueryResults? result = ExecuteWithRetriesAsync(func, activity).GetAwaiter().GetResult();

return result;
}
Expand Down
62 changes: 32 additions & 30 deletions csharp/src/BigQueryStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,16 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()

// We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
// When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
Func<Task<BigQueryResults>> getJobResults = async () =>
Task<BigQueryResults> getJobResults()
{
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
return ExecuteCancellableJobAsync(cancellationContext, activity, async (context, jobActivity) =>
{
return await this.TraceActivityAsync(async activity =>
{
// if the authentication token was reset, then we need a new job with the latest token
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
}).ConfigureAwait(false);
};
// if the authentication token was reset, then we need a new job with the latest token
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
jobActivity?.AddBigQueryTag("job_id", context.Job.Reference.JobId);
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
}

BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -181,7 +179,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
evaluationKind = evaluationKindString;
}

Func<Task<BigQueryResults>> getMultiJobResults = async () =>
Task<BigQueryResults> getMultiJobResults()
{
// To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
// Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
Expand All @@ -202,14 +200,15 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
}
BigQueryJob indexedJob = joblist[statementIndex - 1];
cancellationContext.Job = indexedJob;
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
return ExecuteCancellableJobAsync(cancellationContext, activity, (context, jobActivity) =>
{
return await indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
jobActivity?.AddBigQueryTag("job_id", context.Job?.Reference.JobId);
return indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync) + ".MultiJobResults");
}

throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
};
}

results = await ExecuteWithRetriesAsync(getMultiJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
}
Expand All @@ -236,17 +235,15 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()

long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;

Func<Task<IEnumerable<IArrowReader>>> getArrowReadersFunc = async () =>
Task<IEnumerable<IArrowReader>> getArrowReadersFunc()
{
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
return ExecuteCancellableJobAsync(cancellationContext, activity, (context, jobActivity) =>
{
return await this.TraceActivityAsync(async activity =>
{
// Cancelling this step may leave the server with unread streams.
return await GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, activity, context.CancellationToken).ConfigureAwait(false);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(GetArrowReaders));
}).ConfigureAwait(false);
};
// Cancelling this step may leave the server with unread streams.
jobActivity?.AddBigQueryTag("job_id", context.Job?.Reference.JobId);
return GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, jobActivity, context.CancellationToken);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(GetArrowReaders));
}
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false);

// Note: MultiArrowReader must dispose the cancellationContext.
Expand Down Expand Up @@ -751,14 +748,15 @@ private async Task<UpdateResult> ExecuteUpdateInternalAsync()

using JobCancellationContext context = new(cancellationRegistry);
// Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
Func<Task<BigQueryResults?>> getQueryResultsAsyncFunc = async () =>
Task<BigQueryResults> getQueryResultsAsyncFunc()
{
return await ExecuteCancellableJobAsync(context, activity, async (context) =>
return ExecuteCancellableJobAsync(context, activity, async (context, jobActivity) =>
{
context.Job = await this.Client.CreateQueryJobAsync(SqlQuery, null, null, context.CancellationToken).ConfigureAwait(false);
jobActivity?.AddBigQueryTag("job_id", context.Job.Reference.JobId);
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, context.CancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
};
}, ClassName + "." + nameof(ExecuteUpdateInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
}
BigQueryResults? result = await ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity, context.CancellationToken);
long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;

Expand Down Expand Up @@ -1056,11 +1054,15 @@ private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity?
private async Task<T> ExecuteCancellableJobAsync<T>(
JobCancellationContext context,
Activity? activity,
Func<JobCancellationContext, Task<T>> func)
Func<JobCancellationContext, Activity?, Task<T>> func,
string activityName)
{
try
{
return await func(context).ConfigureAwait(false);
return await this.TraceActivityAsync(jobActivity =>
{
return func(context, jobActivity);
}, activityName).ConfigureAwait(false);
}
catch (Exception ex)
when (context.CancellationToken.IsCancellationRequested &&
Expand Down