diff --git a/csharp/src/Drivers/Apache/ActivityKeys.cs b/csharp/src/Drivers/Apache/ActivityKeys.cs new file mode 100644 index 0000000000..849d3d3dad --- /dev/null +++ b/csharp/src/Drivers/Apache/ActivityKeys.cs @@ -0,0 +1,43 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +namespace Apache.Arrow.Adbc.Drivers.Apache +{ + internal static class ActivityKeys + { + public const string AuthType = "auth_type"; + public const string Encrypted = "encrypted"; + public const string TransportType = "transport_type"; + public const string Host = "host"; + public const string Port = "port"; + + internal static class Http + { + public const string Key = "http"; + public const string UserAgent = Key + ".user_agent"; + public const string Uri = Key + ".uri"; + public const string AuthScheme = Key + ".auth_scheme"; + } + + internal static class Thrift + { + public const string Key = "thrift"; + public const string MaxMessageSize = Key + ".max_message_size"; + public const string MaxFrameSize = Key + ".max_frame_size"; + } + } +} diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs index f448d66835..5df4fdb551 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs @@ -50,6 +50,7 @@ internal abstract class HiveServer2Connection : TracingConnection internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection)); internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection)); private const int ConnectTimeoutMillisecondsDefault = 30000; + private const string ClassName = nameof(HiveServer2Connection); private TTransport? _transport; private TCLIService.IAsync? _client; private readonly Lazy _vendorVersion; @@ -387,7 +388,7 @@ await this.TraceActivityAsync(async activity => // Handle other exceptions if necessary throw new HiveServer2Exception($"An unexpected error occurred while opening the session. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, ClassName + "." + nameof(OpenAsync)); } private static bool IsUnauthorized(HttpRequestException httpEx) @@ -687,7 +688,7 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata { throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, ClassName + "." + nameof(GetObjects)); } public override IArrowArrayStream GetTableTypes() @@ -729,28 +730,39 @@ public override IArrowArrayStream GetTableTypes() { throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, ClassName + "." + nameof(GetTableTypes)); } - internal static async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default) + internal async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default) { - TGetOperationStatusResp? statusResponse = null; - do - { - if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); } - TGetOperationStatusReq request = new(operationHandle); - statusResponse = await client.GetOperationStatus(request, cancellationToken); - } while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE); - - // Must be in the finished state to be valid. If not, typically a server error or timeout has occurred. - if (statusResponse.OperationState != TOperationState.FINISHED_STATE) + await this.TraceActivityAsync(async activity => { + activity?.AddEvent("hive2.thrift.poll_start"); + TGetOperationStatusResp? statusResponse = null; + int attempts = 0; + do + { + if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); } + TGetOperationStatusReq request = new(operationHandle); + attempts++; + statusResponse = await client.GetOperationStatus(request, cancellationToken); + } while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE); + activity?.AddEvent("hive2.thrift.poll_end", + [ + new("hive2.thrift.poll_attempts", attempts), + new("hive2.thrift.operation_state", statusResponse.OperationState.ToString()), + ]); + + // Must be in the finished state to be valid. If not, typically a server error or timeout has occurred. + if (statusResponse.OperationState != TOperationState.FINISHED_STATE) + { #pragma warning disable CS0618 // Type or member is obsolete - throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState) - .SetSqlState(statusResponse.SqlState) - .SetNativeError(statusResponse.ErrorCode); + throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState) + .SetSqlState(statusResponse.SqlState) + .SetNativeError(statusResponse.ErrorCode); #pragma warning restore CS0618 // Type or member is obsolete - } + } + }, ClassName + "." + nameof(PollForResponseAsync)); } private string GetInfoTypeStringValue(TGetInfoType infoType) @@ -779,7 +791,7 @@ private string GetInfoTypeStringValue(TGetInfoType infoType) { throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, nameof(HiveServer2Connection) + "." + nameof(GetInfoTypeStringValue)); } protected override void Dispose(bool disposing) @@ -813,10 +825,10 @@ private void DisposeClient() _transport = null; _client = null; } - }); + }, ClassName + "." + nameof(DisposeClient)); } - internal static async Task GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default) + internal async Task GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default) { TGetResultSetMetadataReq request = new(operationHandle); TGetResultSetMetadataResp response = await client.GetResultSetMetadata(request, cancellationToken); @@ -1062,7 +1074,7 @@ internal async Task GetCatalogsAsync(CancellationToken cancell HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetCatalogsAsync)); } internal async Task GetSchemasAsync( @@ -1092,7 +1104,7 @@ internal async Task GetSchemasAsync( HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetSchemasAsync)); } internal async Task GetTablesAsync( @@ -1132,7 +1144,7 @@ internal async Task GetTablesAsync( HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetTablesAsync)); } internal async Task GetColumnsAsync( @@ -1172,7 +1184,7 @@ internal async Task GetColumnsAsync( HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetColumnsAsync)); } internal async Task GetPrimaryKeysAsync( @@ -1207,7 +1219,7 @@ internal async Task GetPrimaryKeysAsync( HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetPrimaryKeysAsync)); } internal async Task GetCrossReferenceAsync( @@ -1256,7 +1268,7 @@ internal async Task GetCrossReferenceAsync( TGetCrossReferenceResp resp = await Client.GetCrossReference(req, cancellationToken); HandleThriftResponse(resp.Status, activity); return resp; - }); + }, ClassName + "." + nameof(GetCrossReferenceAsync)); } private static StructArray GetColumnSchema(TableInfo tableInfo) @@ -1394,7 +1406,7 @@ public override Schema GetTableSchema(string? catalog, string? dbSchema, string? { throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, ClassName + "." + nameof(GetTableSchema)); } private static IArrowType GetArrowType(int columnTypeId, string typeName, bool isColumnSizeValid, int? columnSize, int? decimalDigits) @@ -1459,16 +1471,20 @@ private static IArrowType GetArrowType(int columnTypeId, string typeName, bool i internal async Task FetchResultsAsync(TOperationHandle operationHandle, long batchSize = BatchSizeDefault, CancellationToken cancellationToken = default) { - await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken); - - TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken); - if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS) + return await this.TraceActivityAsync(async activity => { - throw new HiveServer2Exception(fetchResp.Status.ErrorMessage) - .SetNativeError(fetchResp.Status.ErrorCode) - .SetSqlState(fetchResp.Status.SqlState); - } - return fetchResp.Results; + await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken); + + TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken); + if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS) + { + throw new HiveServer2Exception(fetchResp.Status.ErrorMessage) + .SetNativeError(fetchResp.Status.ErrorCode) + .SetSqlState(fetchResp.Status.SqlState); + } + activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, HiveServer2Reader.GetRowCount(fetchResp.Results, fetchResp.Results.Columns.Count)); + return fetchResp.Results; + }, ClassName + "." + nameof(FetchResultsAsync)); } private static async Task FetchNextAsync(TOperationHandle operationHandle, TCLIService.IAsync client, long batchSize, CancellationToken cancellationToken = default) @@ -1628,7 +1644,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList codes) StandardSchemas.GetInfoSchema.Validate(dataArrays); return new HiveInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays); - }); + }, ClassName + "." + nameof(GetInfo)); } internal struct TableInfo(string type) @@ -1723,19 +1739,44 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus protected TConfiguration GetTconfiguration() { var thriftConfig = new TConfiguration(); + Activity? activity = Activity.Current; Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize); if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0) { + activity?.AddTag(ActivityKeys.Thrift.MaxMessageSize, maxMessageSizeValue); thriftConfig.MaxMessageSize = maxMessageSizeValue; } Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize); if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0) { + activity?.AddTag(ActivityKeys.Thrift.MaxFrameSize, maxFrameSizeValue); thriftConfig.MaxFrameSize = maxFrameSizeValue; } return thriftConfig; } + + protected static class ActivityKeys + { + private const string Prefix = "hive2."; + + public const string Encrypted = Prefix + Apache.ActivityKeys.Encrypted; + public const string TransportType = Prefix + Apache.ActivityKeys.TransportType; + public const string AuthType = Prefix + Apache.ActivityKeys.AuthType; + + public static class Http + { + public const string AuthScheme = Prefix + Apache.ActivityKeys.Http.AuthScheme; + public const string UserAgent = Prefix + Apache.ActivityKeys.Http.UserAgent; + public const string Uri = Prefix + Apache.ActivityKeys.Http.Uri; + } + + public static class Thrift + { + public const string MaxMessageSize = Prefix + Apache.ActivityKeys.Thrift.MaxMessageSize; + public const string MaxFrameSize = Prefix + Apache.ActivityKeys.Thrift.MaxFrameSize; + } + } } } diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs index 6cc80c51b2..530066a376 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Globalization; using System.Net; using System.Net.Http; @@ -35,6 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2 internal class HiveServer2HttpConnection : HiveServer2ExtendedConnection { private const string BasicAuthenticationScheme = "Basic"; + private const string AnonymousAuthenticationScheme = "Anonymous"; private readonly HiveServer2ProxyConfigurator _proxyConfigurator; @@ -125,6 +127,8 @@ protected override void ValidateOptions() protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Assumption: parameters have already been validated. Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName); Properties.TryGetValue(HiveServer2Parameters.Path, out string? path); @@ -150,6 +154,12 @@ protected override TTransport CreateTransport() httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; + activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled); + activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme); + activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString()); + activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent); + activity?.AddTag(ActivityKeys.Http.Uri, baseAddress); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { @@ -163,16 +173,21 @@ protected override TTransport CreateTransport() private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(HiveServer2AuthType authType, string? username, string? password) { + Activity? activity = Activity.Current; + if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.Basic)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"))); } else if (!string.IsNullOrEmpty(username) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.UsernameOnly)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:"))); } else if (authType == HiveServer2AuthType.None) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme); return null; } else diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs index 8adf7ee386..2c6ce3a9fd 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs @@ -55,6 +55,7 @@ internal class HiveServer2Reader : TracingReader private const int SecondSubsecondSepIndex = 19; private const int SubsecondIndex = 20; private const int MillisecondDecimalPlaces = 3; + private const string ClassName = nameof(HiveServer2Reader); private readonly IHiveServer2Statement _statement; private readonly IResponse _response; private bool _disposed; @@ -129,7 +130,7 @@ public HiveServer2Reader( { throw new HiveServer2Exception($"An unexpected error occurred while fetching results. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex); } - }); + }, ClassName + "." + nameof(ReadNextRecordBatchAsync)); } private RecordBatch CreateBatch(TFetchResultsResp response, int columnCount, int rowCount) diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs index 974c07a1fd..79331a4b79 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Net; using System.Net.Security; using System.Security.Cryptography.X509Certificates; @@ -95,6 +96,8 @@ protected override void ValidateOptions() protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Required properties (validated previously) Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName); Properties.TryGetValue(HiveServer2Parameters.Port, out string? port); @@ -134,11 +137,13 @@ protected override TTransport CreateTransport() { baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig); } + activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled); TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport); switch (authTypeValue) { case HiveServer2AuthType.None: + activity?.AddTag(ActivityKeys.TransportType, "buffered_socket"); return bufferedTransport; case HiveServer2AuthType.Basic: @@ -152,6 +157,7 @@ protected override TTransport CreateTransport() PlainSaslMechanism saslMechanism = new(username, password); TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); + activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket"); return new TFramedTransport(saslTransport); default: diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs index 25e14f1cc4..28461fbd2a 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs @@ -52,6 +52,7 @@ internal class HiveServer2Statement : TracingStatement, IHiveServer2Statement protected static readonly string[] ForeignKeyFields = new[] { "PKCOLUMN_NAME", "PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "FKCOLUMN_NAME", "FK_NAME", "KEQ_SEQ" }; protected const string PrimaryKeyPrefix = "PK_"; protected const string ForeignKeyPrefix = "FK_"; + private const string ClassName = nameof(HiveServer2Statement); // Lock to ensure consistent access to TokenSource private readonly object _tokenSourceLock = new(); @@ -147,8 +148,8 @@ private async Task ExecuteQueryAsyncInternal(CancellationToken canc { try { - await HiveServer2Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); // + poll, up to QueryTimeout - metadata = await HiveServer2Connection.GetResultSetMetadataAsync(response.OperationHandle!, Connection.Client, cancellationToken); + await Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); // + poll, up to QueryTimeout + metadata = await Connection.GetResultSetMetadataAsync(response.OperationHandle!, Connection.Client, cancellationToken); } catch (Exception ex) when (IsCancellation(ex, cancellationToken)) { @@ -159,7 +160,7 @@ private async Task ExecuteQueryAsyncInternal(CancellationToken canc } Schema schema = GetSchemaFromMetadata(metadata); return new QueryResult(-1, Connection.NewReader(this, schema, response, metadata)); - }); + }, ClassName + "." + nameof(ExecuteQueryAsyncInternal)); } public override async ValueTask ExecuteQueryAsync() @@ -234,7 +235,7 @@ private async Task ExecuteUpdateAsyncInternal(CancellationToken ca { activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, affectedRows ?? -1); } - }); + }, ClassName + "." + nameof(ExecuteUpdateAsyncInternal)); } public override async Task ExecuteUpdateAsync() @@ -258,7 +259,7 @@ public override async Task ExecuteUpdateAsync() { DisposeTokenSource(); } - }); + }, ClassName + "." + nameof(ExecuteUpdateAsync)); } public override void SetOption(string key, string value) @@ -351,7 +352,7 @@ protected async Task ExecuteStatementAsync(CancellationToken cancella } } return response; - }); + }, ClassName + "." + nameof(ExecuteStatementAsync)); } protected internal int PollTimeMilliseconds { get; private set; } = HiveServer2Connection.PollTimeMillisecondsDefault; @@ -422,18 +423,22 @@ protected void ValidateOptions(IReadOnlyDictionary properties) private async Task ExecuteMetadataCommandQuery(CancellationToken cancellationToken) { - return SqlQuery?.ToLowerInvariant() switch + return await this.TraceActivityAsync(async activity => { - GetCatalogsCommandName => await GetCatalogsAsync(cancellationToken), - GetSchemasCommandName => await GetSchemasAsync(cancellationToken), - GetTablesCommandName => await GetTablesAsync(cancellationToken), - GetColumnsCommandName => await GetColumnsAsync(cancellationToken), - GetPrimaryKeysCommandName => await GetPrimaryKeysAsync(cancellationToken), - GetCrossReferenceCommandName => await GetCrossReferenceAsync(cancellationToken), - GetColumnsExtendedCommandName => await GetColumnsExtendedAsync(cancellationToken), - null or "" => throw new ArgumentNullException(nameof(SqlQuery), $"Metadata command for property 'SqlQuery' must not be empty or null. Supported metadata commands: {SupportedMetadataCommands}"), - _ => throw new NotSupportedException($"Metadata command '{SqlQuery}' is not supported. Supported metadata commands: {SupportedMetadataCommands}"), - }; + activity?.AddTag(SemanticConventions.Db.Query.Text, SqlQuery ?? ""); + return SqlQuery?.ToLowerInvariant() switch + { + GetCatalogsCommandName => await GetCatalogsAsync(cancellationToken), + GetSchemasCommandName => await GetSchemasAsync(cancellationToken), + GetTablesCommandName => await GetTablesAsync(cancellationToken), + GetColumnsCommandName => await GetColumnsAsync(cancellationToken), + GetPrimaryKeysCommandName => await GetPrimaryKeysAsync(cancellationToken), + GetCrossReferenceCommandName => await GetCrossReferenceAsync(cancellationToken), + GetColumnsExtendedCommandName => await GetColumnsExtendedAsync(cancellationToken), + null or "" => throw new ArgumentNullException(nameof(SqlQuery), $"Metadata command for property 'SqlQuery' must not be empty or null. Supported metadata commands: {SupportedMetadataCommands}"), + _ => throw new NotSupportedException($"Metadata command '{SqlQuery}' is not supported. Supported metadata commands: {SupportedMetadataCommands}"), + }; + }, ClassName + "." + nameof(ExecuteMetadataCommandQuery)); } // This method is for internal use only and is not available for external use. // It retrieves cross-reference data where the current table is treated as a foreign table. @@ -532,10 +537,10 @@ protected virtual async Task GetColumnsAsync(CancellationToken canc if (!Connection.TryGetDirectResults(response.DirectResults, out TGetResultSetMetadataResp? metadata, out TRowSet? rowSet)) { // Poll and fetch results - await HiveServer2Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); + await Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); // Get metadata - metadata = await HiveServer2Connection.GetResultSetMetadataAsync(response.OperationHandle!, Connection.Client, cancellationToken); + metadata = await Connection.GetResultSetMetadataAsync(response.OperationHandle!, Connection.Client, cancellationToken); // Fetch the results rowSet = await Connection.FetchResultsAsync(response.OperationHandle!, BatchSize, cancellationToken); @@ -553,21 +558,25 @@ protected virtual async Task GetColumnsAsync(CancellationToken canc private async Task GetResultSetSchemaAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default) { - TGetResultSetMetadataResp response = await HiveServer2Connection.GetResultSetMetadataAsync(operationHandle, client, cancellationToken); + TGetResultSetMetadataResp response = await Connection.GetResultSetMetadataAsync(operationHandle, client, cancellationToken); return GetSchemaFromMetadata(response); } private async Task GetQueryResult(IResponse response, CancellationToken cancellationToken) { - if (Connection.TryGetDirectResults(response.DirectResults, out QueryResult? result)) + return await this.TraceActivityAsync(async activity => { - return result!; - } + HiveServer2Connection.HandleThriftResponse(response.Status!, activity); + if (Connection.TryGetDirectResults(response.DirectResults, out QueryResult? result)) + { + return result!; + } - await HiveServer2Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); - Schema schema = await GetResultSetSchemaAsync(response.OperationHandle!, Connection.Client, cancellationToken); + await Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken); + Schema schema = await GetResultSetSchemaAsync(response.OperationHandle!, Connection.Client, cancellationToken); - return new QueryResult(-1, Connection.NewReader(this, schema, response)); + return new QueryResult(-1, Connection.NewReader(this, schema, response)); + }, ClassName + "." + nameof(GetQueryResult)); } protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IReadOnlyList originalData, @@ -1044,7 +1053,7 @@ public override void Cancel() { // This will cancel any operation using the current token source CancelTokenSource(); - }); + }, ClassName + "." + nameof(Cancel)); } private async Task CancelOperationAsync(Activity? activity, TOperationHandle? operationHandle) diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs index a44132933e..4da1bc6192 100644 --- a/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs +++ b/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs @@ -120,5 +120,22 @@ protected override ColumnsMetadataColumnNames GetColumnsMetadataColumnNames() DecimalDigits = DecimalDigits, }; } + + protected static new class ActivityKeys + { + private const string Prefix = "impala."; + + public const string AuthType = Prefix + Apache.ActivityKeys.AuthType; + public const string Encrypted = Prefix + Apache.ActivityKeys.Encrypted; + public const string TransportType = Prefix + Apache.ActivityKeys.TransportType; + public const string Host = Prefix + Apache.ActivityKeys.Host; + public const string Port = Prefix + Apache.ActivityKeys.Port; + public static class Http + { + public const string AuthScheme = Prefix + Apache.ActivityKeys.Http.AuthScheme; + public const string UserAgent = Prefix + Apache.ActivityKeys.Http.UserAgent; + public const string Uri = Prefix + Apache.ActivityKeys.Http.Uri; + } + } } } diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs index 8458cd50f0..1b73da8b64 100644 --- a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs +++ b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Globalization; using System.Net; using System.Net.Http; @@ -37,6 +38,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala internal class ImpalaHttpConnection : ImpalaConnection { private const string BasicAuthenticationScheme = "Basic"; + private const string AnonymousAuthenticationScheme = "Anonymous"; private readonly HiveServer2ProxyConfigurator _proxyConfigurator; @@ -130,6 +132,8 @@ internal override IArrowArrayStream NewReader(T statement, Schema schema, IRe protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Assumption: parameters have already been validated. Properties.TryGetValue(ImpalaParameters.HostName, out string? hostName); Properties.TryGetValue(ImpalaParameters.Path, out string? path); @@ -155,6 +159,12 @@ protected override TTransport CreateTransport() httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; + activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps); + activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme); + activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString()); + activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent); + activity?.AddTag(ActivityKeys.Http.Uri, baseAddress); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { @@ -168,16 +178,21 @@ protected override TTransport CreateTransport() private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(ImpalaAuthType authType, string? username, string? password) { + Activity? activity = Activity.Current; + if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == ImpalaAuthType.Empty || authType == ImpalaAuthType.Basic)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"))); } else if (!string.IsNullOrEmpty(username) && (authType == ImpalaAuthType.Empty || authType == ImpalaAuthType.UsernameOnly)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:"))); } else if (authType == ImpalaAuthType.None) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme); return null; } else diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs index 457915d03a..c468bdbfe9 100644 --- a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs +++ b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs @@ -17,9 +17,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Net; using System.Net.Security; -using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.Apache.Hive2; @@ -103,6 +103,8 @@ protected override void ValidateOptions() protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Assumption: hostName and port have already been validated. Properties.TryGetValue(ImpalaParameters.HostName, out string? hostName); Properties.TryGetValue(ImpalaParameters.Port, out string? port); @@ -127,16 +129,21 @@ protected override TTransport CreateTransport() { transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator); } + activity?.AddTag(ActivityKeys.Encrypted, true); } else { transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: thriftConfig); + activity?.AddTag(ActivityKeys.Encrypted, false); } + activity?.AddTag(ActivityKeys.Host, hostName); + activity?.AddTag(ActivityKeys.Port, port); TBufferedTransport bufferedTransport = new(transport); switch (authTypeValue) { case ImpalaAuthType.None: + activity?.AddTag(ActivityKeys.TransportType, "buffered_socket"); return bufferedTransport; case ImpalaAuthType.Basic: @@ -149,6 +156,7 @@ protected override TTransport CreateTransport() PlainSaslMechanism saslMechanism = new(username, password); TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); + activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket"); return new TFramedTransport(saslTransport); default: @@ -164,6 +172,8 @@ protected override async Task CreateProtocolAsync(TTransport transpor protected override TOpenSessionReq CreateSessionRequest() { + Activity? activity = Activity.Current; + // Assumption: user name and password have already been validated. Properties.TryGetValue(AdbcOptions.Username, out string? username); Properties.TryGetValue(AdbcOptions.Password, out string? password); @@ -192,6 +202,13 @@ protected override TOpenSessionReq CreateSessionRequest() request.Password = password!; break; } + + authTypeValue = authTypeValue == ImpalaAuthType.Empty && !string.IsNullOrEmpty(username) + ? !string.IsNullOrEmpty(password) + ? ImpalaAuthType.Basic + : ImpalaAuthType.UsernameOnly + : authTypeValue; + activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString()); return request; } diff --git a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs index 3b0fb634b8..522b6f3d46 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs @@ -190,5 +190,23 @@ protected override ColumnsMetadataColumnNames GetColumnsMetadataColumnNames() DecimalDigits = DecimalDigits, }; } + + protected static new class ActivityKeys + { + private const string Prefix = "spark."; + + public const string Encrypted = Prefix + Apache.ActivityKeys.Encrypted; + public const string TransportType = Prefix + Apache.ActivityKeys.TransportType; + public const string Host = Prefix + Apache.ActivityKeys.Host; + public const string Port = Prefix + Apache.ActivityKeys.Port; + public const string AuthType = Prefix + Apache.ActivityKeys.AuthType; + + public static class Http + { + public const string UserAgent = Prefix + Apache.ActivityKeys.Http.UserAgent; + public const string Uri = Prefix + Apache.ActivityKeys.Http.Uri; + public const string AuthScheme = Prefix + Apache.ActivityKeys.Http.AuthScheme; + } + } } } diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs index 1aff9cc70b..d19cc3c282 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Globalization; using System.Net; using System.Net.Http; @@ -38,6 +39,7 @@ internal class SparkHttpConnection : SparkConnection { private const string BasicAuthenticationScheme = "Basic"; private const string BearerAuthenticationScheme = "Bearer"; + private const string AnonymousAuthenticationScheme = "Anonymous"; protected readonly HiveServer2ProxyConfigurator _proxyConfigurator; @@ -160,6 +162,8 @@ protected virtual HttpMessageHandler CreateHttpHandler() protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Assumption: parameters have already been validated. Properties.TryGetValue(SparkParameters.HostName, out string? hostName); Properties.TryGetValue(SparkParameters.Path, out string? path); @@ -177,11 +181,18 @@ protected override TTransport CreateTransport() HttpClient httpClient = new(CreateHttpHandler()); httpClient.BaseAddress = baseAddress; httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue; - httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(GetUserAgent()); + string userAgent = GetUserAgent(); + httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(userAgent); httpClient.DefaultRequestHeaders.AcceptEncoding.Clear(); httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; + activity?.AddTag(ActivityKeys.Encrypted, baseAddress.Scheme == Uri.UriSchemeHttps); + activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme); + activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString()); + activity?.AddTag(ActivityKeys.Http.UserAgent, userAgent); + activity?.AddTag(ActivityKeys.Http.Uri, baseAddress); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { @@ -195,28 +206,35 @@ protected override TTransport CreateTransport() protected virtual AuthenticationHeaderValue? GetAuthenticationHeaderValue(SparkAuthType authType) { + Activity? activity = Activity.Current; + Properties.TryGetValue(SparkParameters.Token, out string? token); Properties.TryGetValue(SparkParameters.AccessToken, out string? access_token); Properties.TryGetValue(AdbcOptions.Username, out string? username); Properties.TryGetValue(AdbcOptions.Password, out string? password); if (!string.IsNullOrEmpty(token) && (authType == SparkAuthType.Empty || authType == SparkAuthType.Token)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme); return new AuthenticationHeaderValue(BearerAuthenticationScheme, token); } else if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == SparkAuthType.Empty || authType == SparkAuthType.Basic)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"))); } else if (!string.IsNullOrEmpty(username) && (authType == SparkAuthType.Empty || authType == SparkAuthType.UsernameOnly)) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme); return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:"))); } else if (!string.IsNullOrEmpty(access_token) && authType == SparkAuthType.OAuth) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, BearerAuthenticationScheme); return new AuthenticationHeaderValue(BearerAuthenticationScheme, access_token); } else if (authType == SparkAuthType.None) { + activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme); return null; } else diff --git a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs index d7c61ead9d..4ca8ce7d8b 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Net; using System.Net.Security; using System.Security.Cryptography.X509Certificates; @@ -104,6 +105,8 @@ protected override void ValidateConnection() protected override TTransport CreateTransport() { + Activity? activity = Activity.Current; + // Assumption: hostName and port have already been validated. Properties.TryGetValue(SparkParameters.HostName, out string? hostName); Properties.TryGetValue(SparkParameters.Port, out string? port); @@ -137,15 +140,21 @@ protected override TTransport CreateTransport() { baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator); } + activity?.AddTag(ActivityKeys.Encrypted, trustedCert != null); } else { baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig); + activity?.AddTag(ActivityKeys.Encrypted, false); } + activity?.AddTag(ActivityKeys.Host, hostName); + activity?.AddTag(ActivityKeys.Port, port); + TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport); switch (authTypeValue) { case SparkAuthType.None: + activity?.AddTag(ActivityKeys.TransportType, "buffered_socket"); return bufferedTransport; case SparkAuthType.Basic: @@ -159,6 +168,7 @@ protected override TTransport CreateTransport() PlainSaslMechanism saslMechanism = new(username, password); TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); + activity?.AddTag(ActivityKeys.TransportType, "sasl_buffered_socket"); return new TFramedTransport(saslTransport); default: @@ -174,6 +184,8 @@ protected override async Task CreateProtocolAsync(TTransport transpor protected override TOpenSessionReq CreateSessionRequest() { + Activity? activity = Activity.Current; + // Assumption: user name and password have already been validated. Properties.TryGetValue(AdbcOptions.Username, out string? username); Properties.TryGetValue(AdbcOptions.Password, out string? password); @@ -202,6 +214,13 @@ protected override TOpenSessionReq CreateSessionRequest() request.Password = password!; break; } + + authTypeValue = authTypeValue == SparkAuthType.Empty && !string.IsNullOrEmpty(username) + ? !string.IsNullOrEmpty(password) + ? SparkAuthType.Basic + : SparkAuthType.UsernameOnly + : authTypeValue; + activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString()); return request; } diff --git a/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs index ad33ee809a..f5bf6c1aa5 100644 --- a/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs +++ b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs @@ -124,7 +124,7 @@ public async Task EmulateWorkAsync(string key, string value) { await this.TraceActivityAsync(async (activity) => { - activity?.SetTag(key, value); + activity?.AddTag(key, value); // Simulate some work await Task.Yield(); });