Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions csharp/src/Drivers/Apache/ActivityKeys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Apache.Arrow.Adbc.Drivers.Apache
{
internal static class ActivityKeys
{
public const string AuthType = "auth_type";
public const string Encrypted = "encrypted";
public const string TransportType = "transport_type";
public const string Host = "host";
public const string Port = "port";

internal static class Http
{
public const string Key = "http";
public const string UserAgent = Key + ".user_agent";
public const string Uri = Key + ".uri";
public const string AuthScheme = Key + ".auth_scheme";
}

internal static class Thrift
{
public const string Key = "thrift";
public const string MaxMessageSize = Key + ".max_message_size";
public const string MaxFrameSize = Key + ".max_frame_size";
}
}
}
117 changes: 79 additions & 38 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ internal abstract class HiveServer2Connection : TracingConnection
internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection));
internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection));
private const int ConnectTimeoutMillisecondsDefault = 30000;
private const string ClassName = nameof(HiveServer2Connection);
private TTransport? _transport;
private TCLIService.IAsync? _client;
private readonly Lazy<string> _vendorVersion;
Expand Down Expand Up @@ -387,7 +388,7 @@ await this.TraceActivityAsync(async activity =>
// Handle other exceptions if necessary
throw new HiveServer2Exception($"An unexpected error occurred while opening the session. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, ClassName + "." + nameof(OpenAsync));
}

private static bool IsUnauthorized(HttpRequestException httpEx)
Expand Down Expand Up @@ -687,7 +688,7 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata
{
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, ClassName + "." + nameof(GetObjects));
}

public override IArrowArrayStream GetTableTypes()
Expand Down Expand Up @@ -729,28 +730,39 @@ public override IArrowArrayStream GetTableTypes()
{
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, ClassName + "." + nameof(GetTableTypes));
}

internal static async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default)
internal async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default)
{
TGetOperationStatusResp? statusResponse = null;
do
{
if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); }
TGetOperationStatusReq request = new(operationHandle);
statusResponse = await client.GetOperationStatus(request, cancellationToken);
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);

// Must be in the finished state to be valid. If not, typically a server error or timeout has occurred.
if (statusResponse.OperationState != TOperationState.FINISHED_STATE)
await this.TraceActivityAsync(async activity =>
{
activity?.AddEvent("hive2.thrift.poll_start");
TGetOperationStatusResp? statusResponse = null;
int attempts = 0;
do
{
if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); }
TGetOperationStatusReq request = new(operationHandle);
attempts++;
statusResponse = await client.GetOperationStatus(request, cancellationToken);
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
activity?.AddEvent("hive2.thrift.poll_end",
[
new("hive2.thrift.poll_attempts", attempts),
new("hive2.thrift.operation_state", statusResponse.OperationState.ToString()),
]);

// Must be in the finished state to be valid. If not, typically a server error or timeout has occurred.
if (statusResponse.OperationState != TOperationState.FINISHED_STATE)
{
#pragma warning disable CS0618 // Type or member is obsolete
throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState)
.SetSqlState(statusResponse.SqlState)
.SetNativeError(statusResponse.ErrorCode);
throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState)
.SetSqlState(statusResponse.SqlState)
.SetNativeError(statusResponse.ErrorCode);
#pragma warning restore CS0618 // Type or member is obsolete
}
}
}, ClassName + "." + nameof(PollForResponseAsync));
}

private string GetInfoTypeStringValue(TGetInfoType infoType)
Expand Down Expand Up @@ -779,7 +791,7 @@ private string GetInfoTypeStringValue(TGetInfoType infoType)
{
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, nameof(HiveServer2Connection) + "." + nameof(GetInfoTypeStringValue));
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -813,10 +825,10 @@ private void DisposeClient()
_transport = null;
_client = null;
}
});
}, ClassName + "." + nameof(DisposeClient));
}

internal static async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
internal async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
{
TGetResultSetMetadataReq request = new(operationHandle);
TGetResultSetMetadataResp response = await client.GetResultSetMetadata(request, cancellationToken);
Expand Down Expand Up @@ -1062,7 +1074,7 @@ internal async Task<TGetCatalogsResp> GetCatalogsAsync(CancellationToken cancell
HandleThriftResponse(resp.Status, activity);

return resp;
});
}, ClassName + "." + nameof(GetCatalogsAsync));
}

internal async Task<TGetSchemasResp> GetSchemasAsync(
Expand Down Expand Up @@ -1092,7 +1104,7 @@ internal async Task<TGetSchemasResp> GetSchemasAsync(
HandleThriftResponse(resp.Status, activity);

return resp;
});
}, ClassName + "." + nameof(GetSchemasAsync));
}

internal async Task<TGetTablesResp> GetTablesAsync(
Expand Down Expand Up @@ -1132,7 +1144,7 @@ internal async Task<TGetTablesResp> GetTablesAsync(
HandleThriftResponse(resp.Status, activity);

return resp;
});
}, ClassName + "." + nameof(GetTablesAsync));
}

internal async Task<TGetColumnsResp> GetColumnsAsync(
Expand Down Expand Up @@ -1172,7 +1184,7 @@ internal async Task<TGetColumnsResp> GetColumnsAsync(
HandleThriftResponse(resp.Status, activity);

return resp;
});
}, ClassName + "." + nameof(GetColumnsAsync));
}

internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
Expand Down Expand Up @@ -1207,7 +1219,7 @@ internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
HandleThriftResponse(resp.Status, activity);

return resp;
});
}, ClassName + "." + nameof(GetPrimaryKeysAsync));
}

internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
Expand Down Expand Up @@ -1256,7 +1268,7 @@ internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
TGetCrossReferenceResp resp = await Client.GetCrossReference(req, cancellationToken);
HandleThriftResponse(resp.Status, activity);
return resp;
});
}, ClassName + "." + nameof(GetCrossReferenceAsync));
}

private static StructArray GetColumnSchema(TableInfo tableInfo)
Expand Down Expand Up @@ -1394,7 +1406,7 @@ public override Schema GetTableSchema(string? catalog, string? dbSchema, string?
{
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, ClassName + "." + nameof(GetTableSchema));
}

private static IArrowType GetArrowType(int columnTypeId, string typeName, bool isColumnSizeValid, int? columnSize, int? decimalDigits)
Expand Down Expand Up @@ -1459,16 +1471,20 @@ private static IArrowType GetArrowType(int columnTypeId, string typeName, bool i

internal async Task<TRowSet> FetchResultsAsync(TOperationHandle operationHandle, long batchSize = BatchSizeDefault, CancellationToken cancellationToken = default)
{
await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken);

TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken);
if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
return await this.TraceActivityAsync(async activity =>
{
throw new HiveServer2Exception(fetchResp.Status.ErrorMessage)
.SetNativeError(fetchResp.Status.ErrorCode)
.SetSqlState(fetchResp.Status.SqlState);
}
return fetchResp.Results;
await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken);

TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken);
if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new HiveServer2Exception(fetchResp.Status.ErrorMessage)
.SetNativeError(fetchResp.Status.ErrorCode)
.SetSqlState(fetchResp.Status.SqlState);
}
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, HiveServer2Reader.GetRowCount(fetchResp.Results, fetchResp.Results.Columns.Count));
return fetchResp.Results;
}, ClassName + "." + nameof(FetchResultsAsync));
}

private static async Task<TFetchResultsResp> FetchNextAsync(TOperationHandle operationHandle, TCLIService.IAsync client, long batchSize, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -1628,7 +1644,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode> codes)
StandardSchemas.GetInfoSchema.Validate(dataArrays);

return new HiveInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
});
}, ClassName + "." + nameof(GetInfo));
}

internal struct TableInfo(string type)
Expand Down Expand Up @@ -1723,19 +1739,44 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus
protected TConfiguration GetTconfiguration()
{
var thriftConfig = new TConfiguration();
Activity? activity = Activity.Current;

Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize);
if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0)
{
activity?.AddTag(ActivityKeys.Thrift.MaxMessageSize, maxMessageSizeValue);
thriftConfig.MaxMessageSize = maxMessageSizeValue;
}

Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize);
if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0)
{
activity?.AddTag(ActivityKeys.Thrift.MaxFrameSize, maxFrameSizeValue);
thriftConfig.MaxFrameSize = maxFrameSizeValue;
}
return thriftConfig;
}

protected static class ActivityKeys
{
private const string Prefix = "hive2.";

public const string Encrypted = Prefix + Apache.ActivityKeys.Encrypted;
public const string TransportType = Prefix + Apache.ActivityKeys.TransportType;
public const string AuthType = Prefix + Apache.ActivityKeys.AuthType;

public static class Http
{
public const string AuthScheme = Prefix + Apache.ActivityKeys.Http.AuthScheme;
public const string UserAgent = Prefix + Apache.ActivityKeys.Http.UserAgent;
public const string Uri = Prefix + Apache.ActivityKeys.Http.Uri;
}

public static class Thrift
{
public const string MaxMessageSize = Prefix + Apache.ActivityKeys.Thrift.MaxMessageSize;
public const string MaxFrameSize = Prefix + Apache.ActivityKeys.Thrift.MaxFrameSize;
}
}
}
}
15 changes: 15 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Net.Http;
Expand All @@ -35,6 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
internal class HiveServer2HttpConnection : HiveServer2ExtendedConnection
{
private const string BasicAuthenticationScheme = "Basic";
private const string AnonymousAuthenticationScheme = "Anonymous";

private readonly HiveServer2ProxyConfigurator _proxyConfigurator;

Expand Down Expand Up @@ -125,6 +127,8 @@ protected override void ValidateOptions()

protected override TTransport CreateTransport()
{
Activity? activity = Activity.Current;

// Assumption: parameters have already been validated.
Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName);
Properties.TryGetValue(HiveServer2Parameters.Path, out string? path);
Expand All @@ -150,6 +154,12 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;

activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled);
activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme);
activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent);
activity?.AddTag(ActivityKeys.Http.Uri, baseAddress);

TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
Expand All @@ -163,16 +173,21 @@ protected override TTransport CreateTransport()

private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(HiveServer2AuthType authType, string? username, string? password)
{
Activity? activity = Activity.Current;

if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.Basic))
{
activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
}
else if (!string.IsNullOrEmpty(username) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.UsernameOnly))
{
activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:")));
}
else if (authType == HiveServer2AuthType.None)
{
activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme);
return null;
}
else
Expand Down
3 changes: 2 additions & 1 deletion csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ internal class HiveServer2Reader : TracingReader
private const int SecondSubsecondSepIndex = 19;
private const int SubsecondIndex = 20;
private const int MillisecondDecimalPlaces = 3;
private const string ClassName = nameof(HiveServer2Reader);
private readonly IHiveServer2Statement _statement;
private readonly IResponse _response;
private bool _disposed;
Expand Down Expand Up @@ -129,7 +130,7 @@ public HiveServer2Reader(
{
throw new HiveServer2Exception($"An unexpected error occurred while fetching results. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
}
});
}, ClassName + "." + nameof(ReadNextRecordBatchAsync));
}

private RecordBatch CreateBatch(TFetchResultsResp response, int columnCount, int rowCount)
Expand Down
Loading
Loading