Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions csharp/src/BigQueryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class BigQueryConnection : TracingConnection, ITokenProtectedResource
{
readonly Dictionary<string, string> properties;
readonly HttpClient httpClient;
const string ClassName = nameof(BigQueryConnection);
const string infoDriverName = "ADBC BigQuery Driver";
const string infoVendorName = "BigQuery";
// Note: this needs to be set before the constructor runs
Expand Down Expand Up @@ -268,7 +269,7 @@ internal BigQueryClient Open(string? projectId = null)

Client = client;
return client;
});
}, ClassName + "." + nameof(Open));
}

internal void SetCredential()
Expand Down Expand Up @@ -342,7 +343,7 @@ internal void SetCredential()
{
throw new ArgumentException($"{authenticationType} is not a valid authenticationType");
}
});
}, ClassName + "." + nameof(SetCredential));
}

public override void SetOption(string key, string value)
Expand All @@ -357,7 +358,7 @@ public override void SetOption(string key, string value)
{
UpdateClientToken();
}
});
}, ClassName + "." + nameof(SetOption));
}

/// <summary>
Expand Down Expand Up @@ -505,7 +506,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode> codes)
StandardSchemas.GetInfoSchema.Validate(dataArrays);

return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
});
}, ClassName + "." + nameof(GetInfo));
}

public override IArrowArrayStream GetObjects(
Expand All @@ -529,7 +530,7 @@ public override IArrowArrayStream GetObjects(
{
throw new AdbcException(googleEx!.Message, AdbcStatusCode.Unauthorized, ex);
}
});
}, ClassName + "." + nameof(GetObjects));
}

/// <summary>
Expand Down Expand Up @@ -578,7 +579,7 @@ internal void UpdateClientToken()
{
throw new AdbcException(googleEx!.Message, AdbcStatusCode.Unauthorized, ex);
}
});
}, ClassName + "." + nameof(ExecuteQuery));
}

internal static bool IsUnauthorizedException(Exception ex, out GoogleApiException? googleEx)
Expand Down Expand Up @@ -648,7 +649,7 @@ private IArrowArray[] GetCatalogs(
StandardSchemas.GetObjectsSchema.Validate(dataArrays);

return dataArrays;
});
}, ClassName + "." + nameof(GetCatalogs));
}

private StructArray GetDbSchemas(
Expand Down Expand Up @@ -712,7 +713,7 @@ private StructArray GetDbSchemas(
length,
dataArrays,
nullBitmapBuffer.Build());
});
}, ClassName + "." + nameof(GetDbSchemas));
}

private StructArray GetTableSchemas(
Expand Down Expand Up @@ -806,7 +807,7 @@ private StructArray GetTableSchemas(
length,
dataArrays,
nullBitmapBuffer.Build());
});
}, ClassName + "." + nameof(GetTableSchemas));
}

private StructArray GetColumnSchema(
Expand Down Expand Up @@ -922,7 +923,7 @@ private StructArray GetColumnSchema(
length,
dataArrays,
nullBitmapBuffer.Build());
});
}, ClassName + "." + nameof(GetColumnSchema));
}

private StructArray GetConstraintSchema(
Expand Down Expand Up @@ -994,7 +995,7 @@ private StructArray GetConstraintSchema(
length,
dataArrays,
nullBitmapBuffer.Build());
});
}, ClassName + "." + nameof(GetConstraintSchema));
}

private StringArray GetConstraintColumnNames(
Expand Down Expand Up @@ -1022,7 +1023,7 @@ private StringArray GetConstraintColumnNames(
}

return constraintColumnNamesBuilder.Build();
});
}, ClassName + "." + nameof(GetConstraintColumnNames));
}

private StructArray GetConstraintsUsage(
Expand Down Expand Up @@ -1078,7 +1079,7 @@ private StructArray GetConstraintsUsage(
length,
dataArrays,
nullBitmapBuffer.Build());
});
}, ClassName + "." + nameof(GetConstraintsUsage));
}

private string PatternToRegEx(string? pattern)
Expand Down Expand Up @@ -1196,7 +1197,7 @@ public override Schema GetTableSchema(string? catalog, string? dbSchema, string
}

return new Schema(fields, null);
});
}, ClassName + "." + nameof(GetTableSchema));
}

private Field DescToField(BigQueryRow row)
Expand Down
27 changes: 18 additions & 9 deletions csharp/src/BigQueryStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace AdbcDrivers.BigQuery
/// </summary>
class BigQueryStatement : TracingStatement, ITokenProtectedResource, IDisposable
{
private const string ClassName = nameof(BigQueryStatement);
readonly BigQueryConnection bigQueryConnection;
readonly CancellationRegistry cancellationRegistry;

Expand Down Expand Up @@ -131,6 +132,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
}
activity?.AddBigQueryParameterTag(BigQueryParameters.ClientTimeout, Client.Service.HttpClient.Timeout.Seconds);

using JobCancellationContext cancellationContext = new JobCancellationContext(cancellationRegistry, job);

Expand All @@ -140,9 +142,12 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
{
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
{
// if the authentication token was reset, then we need a new job with the latest token
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
return await this.TraceActivityAsync(async activity =>
{
// if the authentication token was reset, then we need a new job with the latest token
context.Job = await Client.GetJobAsync(jobReference, cancellationToken: context.CancellationToken).ConfigureAwait(false);
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(BigQueryJob.GetQueryResultsAsync));
}).ConfigureAwait(false);
};

Expand Down Expand Up @@ -235,8 +240,11 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
{
return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) =>
{
// Cancelling this step may leave the server with unread streams.
return await GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, activity, context.CancellationToken).ConfigureAwait(false);
return await this.TraceActivityAsync(async activity =>
{
// Cancelling this step may leave the server with unread streams.
return await GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, activity, context.CancellationToken).ConfigureAwait(false);
}, ClassName + "." + nameof(ExecuteQueryInternalAsync) + "." + nameof(GetArrowReaders));
}).ConfigureAwait(false);
};
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
Expand All @@ -245,7 +253,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry));
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows);
return new QueryResult(totalRows, stream);
});
}, ClassName + "." + nameof(ExecuteQueryInternalAsync));
}

private Task<QueryResult> ExecuteMetadataCommandQuery(Activity? activity)
Expand Down Expand Up @@ -706,7 +714,7 @@ public override void Cancel()
this.TraceActivity(_ =>
{
this.cancellationRegistry.CancelAll();
});
}, ClassName + "." + nameof(Cancel));
}

public override void Dispose()
Expand Down Expand Up @@ -756,7 +764,7 @@ private async Task<UpdateResult> ExecuteUpdateInternalAsync()

activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, updatedRows);
return new UpdateResult(updatedRows);
});
}, ClassName + "." + nameof(ExecuteUpdateInternalAsync));
}

private Schema TranslateSchema(TableSchema schema)
Expand Down Expand Up @@ -1166,6 +1174,7 @@ public void Dispose()

private class MultiArrowReader : TracingReader
{
private const string ClassName = BigQueryStatement.ClassName + ".MultiArrowReader";
private static readonly string s_assemblyName = BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement));
private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));

Expand Down Expand Up @@ -1222,7 +1231,7 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<

this.reader = null;
}
});
}, ClassName + "." + nameof(ReadNextRecordBatchAsync));
}

protected override void Dispose(bool disposing)
Expand Down