Skip to content

Commit 778f780

Browse files
authored
feat(csharp/src/Drivers/Apache): increase telemetry instrumentation for Apache drivers (#3794)
Adds more telemetry information for Apache drivers - increase connection information. - add activities for more methods to track durations. - now using explicit activity names to improve context - using `Activity.AddTag` instead of `Activity.SetTag` as `SetTag` is more expensive.
1 parent b99a179 commit 778f780

13 files changed

+288
-69
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace Apache.Arrow.Adbc.Drivers.Apache
19+
{
20+
internal static class ActivityKeys
21+
{
22+
public const string AuthType = "auth_type";
23+
public const string Encrypted = "encrypted";
24+
public const string TransportType = "transport_type";
25+
public const string Host = "host";
26+
public const string Port = "port";
27+
28+
internal static class Http
29+
{
30+
public const string Key = "http";
31+
public const string UserAgent = Key + ".user_agent";
32+
public const string Uri = Key + ".uri";
33+
public const string AuthScheme = Key + ".auth_scheme";
34+
}
35+
36+
internal static class Thrift
37+
{
38+
public const string Key = "thrift";
39+
public const string MaxMessageSize = Key + ".max_message_size";
40+
public const string MaxFrameSize = Key + ".max_frame_size";
41+
}
42+
}
43+
}

csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs

Lines changed: 79 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ internal abstract class HiveServer2Connection : TracingConnection
5050
internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection));
5151
internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection));
5252
private const int ConnectTimeoutMillisecondsDefault = 30000;
53+
private const string ClassName = nameof(HiveServer2Connection);
5354
private TTransport? _transport;
5455
private TCLIService.IAsync? _client;
5556
private readonly Lazy<string> _vendorVersion;
@@ -387,7 +388,7 @@ await this.TraceActivityAsync(async activity =>
387388
// Handle other exceptions if necessary
388389
throw new HiveServer2Exception($"An unexpected error occurred while opening the session. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
389390
}
390-
});
391+
}, ClassName + "." + nameof(OpenAsync));
391392
}
392393

393394
private static bool IsUnauthorized(HttpRequestException httpEx)
@@ -687,7 +688,7 @@ public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? cata
687688
{
688689
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
689690
}
690-
});
691+
}, ClassName + "." + nameof(GetObjects));
691692
}
692693

693694
public override IArrowArrayStream GetTableTypes()
@@ -729,28 +730,39 @@ public override IArrowArrayStream GetTableTypes()
729730
{
730731
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
731732
}
732-
});
733+
}, ClassName + "." + nameof(GetTableTypes));
733734
}
734735

735-
internal static async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default)
736+
internal async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds, CancellationToken cancellationToken = default)
736737
{
737-
TGetOperationStatusResp? statusResponse = null;
738-
do
739-
{
740-
if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); }
741-
TGetOperationStatusReq request = new(operationHandle);
742-
statusResponse = await client.GetOperationStatus(request, cancellationToken);
743-
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
744-
745-
// Must be in the finished state to be valid. If not, typically a server error or timeout has occurred.
746-
if (statusResponse.OperationState != TOperationState.FINISHED_STATE)
738+
await this.TraceActivityAsync(async activity =>
747739
{
740+
activity?.AddEvent("hive2.thrift.poll_start");
741+
TGetOperationStatusResp? statusResponse = null;
742+
int attempts = 0;
743+
do
744+
{
745+
if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds, cancellationToken); }
746+
TGetOperationStatusReq request = new(operationHandle);
747+
attempts++;
748+
statusResponse = await client.GetOperationStatus(request, cancellationToken);
749+
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
750+
activity?.AddEvent("hive2.thrift.poll_end",
751+
[
752+
new("hive2.thrift.poll_attempts", attempts),
753+
new("hive2.thrift.operation_state", statusResponse.OperationState.ToString()),
754+
]);
755+
756+
// Must be in the finished state to be valid. If not, typically a server error or timeout has occurred.
757+
if (statusResponse.OperationState != TOperationState.FINISHED_STATE)
758+
{
748759
#pragma warning disable CS0618 // Type or member is obsolete
749-
throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState)
750-
.SetSqlState(statusResponse.SqlState)
751-
.SetNativeError(statusResponse.ErrorCode);
760+
throw new HiveServer2Exception(statusResponse.ErrorMessage, AdbcStatusCode.InvalidState)
761+
.SetSqlState(statusResponse.SqlState)
762+
.SetNativeError(statusResponse.ErrorCode);
752763
#pragma warning restore CS0618 // Type or member is obsolete
753-
}
764+
}
765+
}, ClassName + "." + nameof(PollForResponseAsync));
754766
}
755767

756768
private string GetInfoTypeStringValue(TGetInfoType infoType)
@@ -779,7 +791,7 @@ private string GetInfoTypeStringValue(TGetInfoType infoType)
779791
{
780792
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
781793
}
782-
});
794+
}, nameof(HiveServer2Connection) + "." + nameof(GetInfoTypeStringValue));
783795
}
784796

785797
protected override void Dispose(bool disposing)
@@ -813,10 +825,10 @@ private void DisposeClient()
813825
_transport = null;
814826
_client = null;
815827
}
816-
});
828+
}, ClassName + "." + nameof(DisposeClient));
817829
}
818830

819-
internal static async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
831+
internal async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
820832
{
821833
TGetResultSetMetadataReq request = new(operationHandle);
822834
TGetResultSetMetadataResp response = await client.GetResultSetMetadata(request, cancellationToken);
@@ -1062,7 +1074,7 @@ internal async Task<TGetCatalogsResp> GetCatalogsAsync(CancellationToken cancell
10621074
HandleThriftResponse(resp.Status, activity);
10631075

10641076
return resp;
1065-
});
1077+
}, ClassName + "." + nameof(GetCatalogsAsync));
10661078
}
10671079

10681080
internal async Task<TGetSchemasResp> GetSchemasAsync(
@@ -1092,7 +1104,7 @@ internal async Task<TGetSchemasResp> GetSchemasAsync(
10921104
HandleThriftResponse(resp.Status, activity);
10931105

10941106
return resp;
1095-
});
1107+
}, ClassName + "." + nameof(GetSchemasAsync));
10961108
}
10971109

10981110
internal async Task<TGetTablesResp> GetTablesAsync(
@@ -1132,7 +1144,7 @@ internal async Task<TGetTablesResp> GetTablesAsync(
11321144
HandleThriftResponse(resp.Status, activity);
11331145

11341146
return resp;
1135-
});
1147+
}, ClassName + "." + nameof(GetTablesAsync));
11361148
}
11371149

11381150
internal async Task<TGetColumnsResp> GetColumnsAsync(
@@ -1172,7 +1184,7 @@ internal async Task<TGetColumnsResp> GetColumnsAsync(
11721184
HandleThriftResponse(resp.Status, activity);
11731185

11741186
return resp;
1175-
});
1187+
}, ClassName + "." + nameof(GetColumnsAsync));
11761188
}
11771189

11781190
internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
@@ -1207,7 +1219,7 @@ internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
12071219
HandleThriftResponse(resp.Status, activity);
12081220

12091221
return resp;
1210-
});
1222+
}, ClassName + "." + nameof(GetPrimaryKeysAsync));
12111223
}
12121224

12131225
internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
@@ -1256,7 +1268,7 @@ internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
12561268
TGetCrossReferenceResp resp = await Client.GetCrossReference(req, cancellationToken);
12571269
HandleThriftResponse(resp.Status, activity);
12581270
return resp;
1259-
});
1271+
}, ClassName + "." + nameof(GetCrossReferenceAsync));
12601272
}
12611273

12621274
private static StructArray GetColumnSchema(TableInfo tableInfo)
@@ -1394,7 +1406,7 @@ public override Schema GetTableSchema(string? catalog, string? dbSchema, string?
13941406
{
13951407
throw new HiveServer2Exception($"An unexpected error occurred while running metadata query. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
13961408
}
1397-
});
1409+
}, ClassName + "." + nameof(GetTableSchema));
13981410
}
13991411

14001412
private static IArrowType GetArrowType(int columnTypeId, string typeName, bool isColumnSizeValid, int? columnSize, int? decimalDigits)
@@ -1459,16 +1471,20 @@ private static IArrowType GetArrowType(int columnTypeId, string typeName, bool i
14591471

14601472
internal async Task<TRowSet> FetchResultsAsync(TOperationHandle operationHandle, long batchSize = BatchSizeDefault, CancellationToken cancellationToken = default)
14611473
{
1462-
await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken);
1463-
1464-
TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken);
1465-
if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
1474+
return await this.TraceActivityAsync(async activity =>
14661475
{
1467-
throw new HiveServer2Exception(fetchResp.Status.ErrorMessage)
1468-
.SetNativeError(fetchResp.Status.ErrorCode)
1469-
.SetSqlState(fetchResp.Status.SqlState);
1470-
}
1471-
return fetchResp.Results;
1476+
await PollForResponseAsync(operationHandle, Client, PollTimeMillisecondsDefault, cancellationToken);
1477+
1478+
TFetchResultsResp fetchResp = await FetchNextAsync(operationHandle, Client, batchSize, cancellationToken);
1479+
if (fetchResp.Status.StatusCode == TStatusCode.ERROR_STATUS)
1480+
{
1481+
throw new HiveServer2Exception(fetchResp.Status.ErrorMessage)
1482+
.SetNativeError(fetchResp.Status.ErrorCode)
1483+
.SetSqlState(fetchResp.Status.SqlState);
1484+
}
1485+
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, HiveServer2Reader.GetRowCount(fetchResp.Results, fetchResp.Results.Columns.Count));
1486+
return fetchResp.Results;
1487+
}, ClassName + "." + nameof(FetchResultsAsync));
14721488
}
14731489

14741490
private static async Task<TFetchResultsResp> FetchNextAsync(TOperationHandle operationHandle, TCLIService.IAsync client, long batchSize, CancellationToken cancellationToken = default)
@@ -1628,7 +1644,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList<AdbcInfoCode> codes)
16281644
StandardSchemas.GetInfoSchema.Validate(dataArrays);
16291645

16301646
return new HiveInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
1631-
});
1647+
}, ClassName + "." + nameof(GetInfo));
16321648
}
16331649

16341650
internal struct TableInfo(string type)
@@ -1723,19 +1739,44 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus
17231739
protected TConfiguration GetTconfiguration()
17241740
{
17251741
var thriftConfig = new TConfiguration();
1742+
Activity? activity = Activity.Current;
17261743

17271744
Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize);
17281745
if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0)
17291746
{
1747+
activity?.AddTag(ActivityKeys.Thrift.MaxMessageSize, maxMessageSizeValue);
17301748
thriftConfig.MaxMessageSize = maxMessageSizeValue;
17311749
}
17321750

17331751
Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize);
17341752
if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0)
17351753
{
1754+
activity?.AddTag(ActivityKeys.Thrift.MaxFrameSize, maxFrameSizeValue);
17361755
thriftConfig.MaxFrameSize = maxFrameSizeValue;
17371756
}
17381757
return thriftConfig;
17391758
}
1759+
1760+
protected static class ActivityKeys
1761+
{
1762+
private const string Prefix = "hive2.";
1763+
1764+
public const string Encrypted = Prefix + Apache.ActivityKeys.Encrypted;
1765+
public const string TransportType = Prefix + Apache.ActivityKeys.TransportType;
1766+
public const string AuthType = Prefix + Apache.ActivityKeys.AuthType;
1767+
1768+
public static class Http
1769+
{
1770+
public const string AuthScheme = Prefix + Apache.ActivityKeys.Http.AuthScheme;
1771+
public const string UserAgent = Prefix + Apache.ActivityKeys.Http.UserAgent;
1772+
public const string Uri = Prefix + Apache.ActivityKeys.Http.Uri;
1773+
}
1774+
1775+
public static class Thrift
1776+
{
1777+
public const string MaxMessageSize = Prefix + Apache.ActivityKeys.Thrift.MaxMessageSize;
1778+
public const string MaxFrameSize = Prefix + Apache.ActivityKeys.Thrift.MaxFrameSize;
1779+
}
1780+
}
17401781
}
17411782
}

csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
using System;
1919
using System.Collections.Generic;
20+
using System.Diagnostics;
2021
using System.Globalization;
2122
using System.Net;
2223
using System.Net.Http;
@@ -35,6 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
3536
internal class HiveServer2HttpConnection : HiveServer2ExtendedConnection
3637
{
3738
private const string BasicAuthenticationScheme = "Basic";
39+
private const string AnonymousAuthenticationScheme = "Anonymous";
3840

3941
private readonly HiveServer2ProxyConfigurator _proxyConfigurator;
4042

@@ -125,6 +127,8 @@ protected override void ValidateOptions()
125127

126128
protected override TTransport CreateTransport()
127129
{
130+
Activity? activity = Activity.Current;
131+
128132
// Assumption: parameters have already been validated.
129133
Properties.TryGetValue(HiveServer2Parameters.HostName, out string? hostName);
130134
Properties.TryGetValue(HiveServer2Parameters.Path, out string? path);
@@ -150,6 +154,12 @@ protected override TTransport CreateTransport()
150154
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
151155
httpClient.DefaultRequestHeaders.ExpectContinue = false;
152156

157+
activity?.AddTag(ActivityKeys.Encrypted, TlsOptions.IsTlsEnabled);
158+
activity?.AddTag(ActivityKeys.TransportType, baseAddress.Scheme);
159+
activity?.AddTag(ActivityKeys.AuthType, authTypeValue.ToString());
160+
activity?.AddTag(ActivityKeys.Http.UserAgent, s_userAgent);
161+
activity?.AddTag(ActivityKeys.Http.Uri, baseAddress);
162+
153163
TConfiguration config = GetTconfiguration();
154164
THttpTransport transport = new(httpClient, config)
155165
{
@@ -163,16 +173,21 @@ protected override TTransport CreateTransport()
163173

164174
private static AuthenticationHeaderValue? GetAuthenticationHeaderValue(HiveServer2AuthType authType, string? username, string? password)
165175
{
176+
Activity? activity = Activity.Current;
177+
166178
if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.Basic))
167179
{
180+
activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
168181
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
169182
}
170183
else if (!string.IsNullOrEmpty(username) && (authType == HiveServer2AuthType.Empty || authType == HiveServer2AuthType.UsernameOnly))
171184
{
185+
activity?.AddTag(ActivityKeys.Http.AuthScheme, BasicAuthenticationScheme);
172186
return new AuthenticationHeaderValue(BasicAuthenticationScheme, Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:")));
173187
}
174188
else if (authType == HiveServer2AuthType.None)
175189
{
190+
activity?.AddTag(ActivityKeys.Http.AuthScheme, AnonymousAuthenticationScheme);
176191
return null;
177192
}
178193
else

csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ internal class HiveServer2Reader : TracingReader
5555
private const int SecondSubsecondSepIndex = 19;
5656
private const int SubsecondIndex = 20;
5757
private const int MillisecondDecimalPlaces = 3;
58+
private const string ClassName = nameof(HiveServer2Reader);
5859
private readonly IHiveServer2Statement _statement;
5960
private readonly IResponse _response;
6061
private bool _disposed;
@@ -129,7 +130,7 @@ public HiveServer2Reader(
129130
{
130131
throw new HiveServer2Exception($"An unexpected error occurred while fetching results. '{ApacheUtility.FormatExceptionMessage(ex)}'", ex);
131132
}
132-
});
133+
}, ClassName + "." + nameof(ReadNextRecordBatchAsync));
133134
}
134135

135136
private RecordBatch CreateBatch(TFetchResultsResp response, int columnCount, int rowCount)

0 commit comments

Comments
 (0)