Skip to content

Commit 489f2e4

Browse files
msrathore-dbclaude
andcommitted
refactor(csharp): abstract statement metadata methods for SEA compatibility
Refactor GetCatalogs, GetSchemas, GetTables, GetPrimaryKeys, and GetCrossReference to use shared metadata abstractions, enabling protocol-agnostic implementations that work for both Thrift and SEA. Changes: - Add 5 virtual conversion methods in HiveServer2Connection - Add generic GetMetadataAsRecordBatch helper in HiveServer2Statement - Refactor 6 metadata methods to use the generic helper (3-6 lines each) - Remove duplicate GetQueryResult helper method Architecture separates protocol layer (ConvertTo*Records), domain layer (metadata records), and presentation layer (MetadataSchemaBuilder). SEA can now override ConvertTo*Records() methods and reuse all existing metadata infrastructure without code duplication. Verification: - Build: 0 errors, 0 warnings - Tests: 8/8 passed - Baseline: Output matches pre-refactoring - Special types (DECIMAL, INTERVAL) preserved - BASE_TYPE_NAME enhancement working Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent fd35ac8 commit 489f2e4

File tree

6 files changed

+354
-357
lines changed

6 files changed

+354
-357
lines changed

csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs

Lines changed: 175 additions & 197 deletions
Large diffs are not rendered by default.

csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -68,54 +68,6 @@ internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, IRe
6868
response,
6969
dataTypeConversion: statement.Connection.DataTypeConversion);
7070

71-
internal override void SetPrecisionScaleAndTypeName(
72-
short colType,
73-
string typeName,
74-
TableInfo? tableInfo,
75-
int columnSize,
76-
int decimalDigits)
77-
{
78-
// Keep the original type name
79-
tableInfo?.TypeName.Add(typeName);
80-
switch (colType)
81-
{
82-
case (short)ColumnTypeId.DECIMAL:
83-
case (short)ColumnTypeId.NUMERIC:
84-
{
85-
// Precision/scale is provide in the API call.
86-
SqlDecimalParserResult result = SqlTypeNameParser<SqlDecimalParserResult>.Parse(typeName, colType);
87-
tableInfo?.Precision.Add(columnSize);
88-
tableInfo?.Scale.Add((short)decimalDigits);
89-
tableInfo?.BaseTypeName.Add(result.BaseTypeName);
90-
break;
91-
}
92-
93-
case (short)ColumnTypeId.CHAR:
94-
case (short)ColumnTypeId.NCHAR:
95-
case (short)ColumnTypeId.VARCHAR:
96-
case (short)ColumnTypeId.LONGVARCHAR:
97-
case (short)ColumnTypeId.LONGNVARCHAR:
98-
case (short)ColumnTypeId.NVARCHAR:
99-
{
100-
// Precision is provide in the API call.
101-
SqlCharVarcharParserResult result = SqlTypeNameParser<SqlCharVarcharParserResult>.Parse(typeName, colType);
102-
tableInfo?.Precision.Add(columnSize);
103-
tableInfo?.Scale.Add(null);
104-
tableInfo?.BaseTypeName.Add(result.BaseTypeName);
105-
break;
106-
}
107-
108-
default:
109-
{
110-
SqlTypeNameParserResult result = SqlTypeNameParser<SqlTypeNameParserResult>.Parse(typeName, colType);
111-
tableInfo?.Precision.Add(null);
112-
tableInfo?.Scale.Add(null);
113-
tableInfo?.BaseTypeName.Add(result.BaseTypeName);
114-
break;
115-
}
116-
}
117-
}
118-
11971
protected override ColumnsMetadataColumnNames GetColumnsMetadataColumnNames()
12072
{
12173
return new ColumnsMetadataColumnNames()

csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ protected virtual void SetStatementProperties(TExecuteStatementReq statement)
7070
}
7171

7272
/// <summary>
73-
/// Gets the schema from metadata response. Base implementation uses the standard schema parser.
74-
/// Subclasses can override to support alternative schema parsing strategies.
73+
/// Gets the schema from metadata response. Base implementation uses traditional Thrift schema.
74+
/// Subclasses can override to support Arrow schema parsing.
7575
/// </summary>
7676
/// <param name="metadata">The metadata response containing schema information</param>
7777
/// <returns>The Arrow schema</returns>
@@ -442,16 +442,15 @@ private async Task<QueryResult> ExecuteMetadataCommandQuery(CancellationToken ca
442442
/// since the backend treats these as exact match queries rather than pattern matches.
443443
protected virtual async Task<QueryResult> GetCrossReferenceAsForeignTableAsync(CancellationToken cancellationToken = default)
444444
{
445-
IResponse response = await Connection.GetCrossReferenceAsync(
446-
null,
447-
null,
448-
null,
449-
CatalogName,
450-
SchemaName,
451-
TableName,
445+
var response = await Connection.GetCrossReferenceAsync(
446+
null, null, null,
447+
CatalogName, SchemaName, TableName,
448+
cancellationToken);
449+
return await GetMetadataAsRecordBatch(
450+
response,
451+
Connection.ConvertToForeignKeyRecords,
452+
Metadata.MetadataSchemaBuilder.BuildFlatForeignKeysSchema,
452453
cancellationToken);
453-
454-
return await GetQueryResult(response, cancellationToken);
455454
}
456455

457456
/// <summary>
@@ -461,16 +460,15 @@ protected virtual async Task<QueryResult> GetCrossReferenceAsForeignTableAsync(C
461460
/// </summary>
462461
protected virtual async Task<QueryResult> GetCrossReferenceAsync(CancellationToken cancellationToken = default)
463462
{
464-
IResponse response = await Connection.GetCrossReferenceAsync(
465-
CatalogName,
466-
SchemaName,
467-
TableName,
468-
ForeignCatalogName,
469-
ForeignSchemaName,
470-
ForeignTableName,
463+
var response = await Connection.GetCrossReferenceAsync(
464+
CatalogName, SchemaName, TableName,
465+
ForeignCatalogName, ForeignSchemaName, ForeignTableName,
466+
cancellationToken);
467+
return await GetMetadataAsRecordBatch(
468+
response,
469+
Connection.ConvertToForeignKeyRecords,
470+
Metadata.MetadataSchemaBuilder.BuildFlatForeignKeysSchema,
471471
cancellationToken);
472-
473-
return await GetQueryResult(response, cancellationToken);
474472
}
475473

476474
/// <summary>
@@ -480,43 +478,52 @@ protected virtual async Task<QueryResult> GetCrossReferenceAsync(CancellationTok
480478
/// </summary>
481479
protected virtual async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
482480
{
483-
IResponse response = await Connection.GetPrimaryKeysAsync(
484-
CatalogName,
485-
SchemaName,
486-
TableName,
481+
var response = await Connection.GetPrimaryKeysAsync(
482+
CatalogName, SchemaName, TableName, cancellationToken);
483+
return await GetMetadataAsRecordBatch(
484+
response,
485+
Connection.ConvertToPrimaryKeyRecords,
486+
Metadata.MetadataSchemaBuilder.BuildFlatPrimaryKeysSchema,
487487
cancellationToken);
488-
489-
return await GetQueryResult(response, cancellationToken);
490488
}
491489

492490
protected virtual async Task<QueryResult> GetCatalogsAsync(CancellationToken cancellationToken = default)
493491
{
494-
IResponse response = await Connection.GetCatalogsAsync(cancellationToken);
495-
496-
return await GetQueryResult(response, cancellationToken);
492+
var response = await Connection.GetCatalogsAsync(cancellationToken);
493+
return await GetMetadataAsRecordBatch(
494+
response,
495+
Connection.ConvertToCatalogRecords,
496+
Metadata.MetadataSchemaBuilder.BuildFlatCatalogsSchema,
497+
cancellationToken);
497498
}
498499

499500
protected virtual async Task<QueryResult> GetSchemasAsync(CancellationToken cancellationToken = default)
500501
{
501-
IResponse response = await Connection.GetSchemasAsync(
502+
var response = await Connection.GetSchemasAsync(
502503
EscapePatternWildcardsInName(CatalogName),
503504
EscapePatternWildcardsInName(SchemaName),
504505
cancellationToken);
505-
506-
return await GetQueryResult(response, cancellationToken);
506+
return await GetMetadataAsRecordBatch(
507+
response,
508+
Connection.ConvertToSchemaRecords,
509+
Metadata.MetadataSchemaBuilder.BuildFlatSchemasSchema,
510+
cancellationToken);
507511
}
508512

509513
protected virtual async Task<QueryResult> GetTablesAsync(CancellationToken cancellationToken = default)
510514
{
511-
List<string>? tableTypesList = this.TableTypes?.Split(',').ToList();
512-
IResponse response = await Connection.GetTablesAsync(
515+
var tableTypesList = this.TableTypes?.Split(',').ToList();
516+
var response = await Connection.GetTablesAsync(
513517
EscapePatternWildcardsInName(CatalogName),
514518
EscapePatternWildcardsInName(SchemaName),
515519
EscapePatternWildcardsInName(TableName),
516520
tableTypesList,
517521
cancellationToken);
518-
519-
return await GetQueryResult(response, cancellationToken);
522+
return await GetMetadataAsRecordBatch(
523+
response,
524+
Connection.ConvertToTableRecords,
525+
Metadata.MetadataSchemaBuilder.BuildFlatTablesSchema,
526+
cancellationToken);
520527
}
521528

522529
protected virtual async Task<QueryResult> GetColumnsAsync(CancellationToken cancellationToken = default)
@@ -557,17 +564,45 @@ private async Task<Schema> GetResultSetSchemaAsync(TOperationHandle operationHan
557564
return GetSchemaFromMetadata(response);
558565
}
559566

560-
private async Task<QueryResult> GetQueryResult(IResponse response, CancellationToken cancellationToken)
567+
/// <summary>
568+
/// Generic helper for metadata methods that convert TRowSet to RecordBatch using metadata records.
569+
/// Handles both direct results and poll-fetch paths.
570+
/// Virtual to allow protocol-specific customization (e.g., SEA).
571+
/// </summary>
572+
/// <typeparam name="TRecord">The metadata record type (e.g., CatalogMetadataRecord)</typeparam>
573+
/// <param name="response">The Thrift response from the metadata RPC call</param>
574+
/// <param name="converter">Function to convert TRowSet to list of metadata records</param>
575+
/// <param name="builder">Function to build RecordBatch from metadata records</param>
576+
/// <param name="cancellationToken">Cancellation token</param>
577+
/// <returns>QueryResult containing the metadata as RecordBatch</returns>
578+
protected virtual async Task<QueryResult> GetMetadataAsRecordBatch<TRecord>(
579+
IResponse response,
580+
Func<TRowSet, List<TRecord>> converter,
581+
Func<IEnumerable<TRecord>, RecordBatch> builder,
582+
CancellationToken cancellationToken)
561583
{
562-
if (Connection.TryGetDirectResults(response.DirectResults, out QueryResult? result))
584+
TRowSet? rowSet;
585+
586+
// Try direct results first (fast path)
587+
if (Connection.TryGetDirectResults(response.DirectResults, out TGetResultSetMetadataResp? metadata, out rowSet))
563588
{
564-
return result!;
589+
var records = converter(rowSet!);
590+
var batch = builder(records);
591+
// Extract column arrays from RecordBatch for HiveInfoArrowStream
592+
var arrays = Enumerable.Range(0, batch.ColumnCount).Select(i => batch.Column(i)).ToList();
593+
return new QueryResult(batch.Length, new HiveServer2Connection.HiveInfoArrowStream(batch.Schema, arrays));
565594
}
566595

596+
// Poll and fetch path (slow path)
567597
await HiveServer2Connection.PollForResponseAsync(response.OperationHandle!, Connection.Client, PollTimeMilliseconds, cancellationToken);
568-
Schema schema = await GetResultSetSchemaAsync(response.OperationHandle!, Connection.Client, cancellationToken);
569-
570-
return new QueryResult(-1, Connection.NewReader(this, schema, response));
598+
metadata = await HiveServer2Connection.GetResultSetMetadataAsync(response.OperationHandle!, Connection.Client, cancellationToken);
599+
rowSet = await Connection.FetchResultsAsync(response.OperationHandle!, BatchSize, cancellationToken);
600+
601+
var recordList = converter(rowSet);
602+
var recordBatch = builder(recordList);
603+
// Extract column arrays from RecordBatch for HiveInfoArrowStream
604+
var columnArrays = Enumerable.Range(0, recordBatch.ColumnCount).Select(i => recordBatch.Column(i)).ToList();
605+
return new QueryResult(recordBatch.Length, new HiveServer2Connection.HiveInfoArrowStream(recordBatch.Schema, columnArrays));
571606
}
572607

573608
protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IReadOnlyList<IArrowArray> originalData,
@@ -624,7 +659,9 @@ protected internal QueryResult EnhanceGetColumnsResult(Schema originalSchema, IR
624659
customData: null
625660
);
626661

627-
// Extract values with fallback to protocol-provided values
662+
// Extract values with fallback to Thrift-provided values
663+
// This preserves the existing behavior where parsed values take precedence,
664+
// but Thrift values are used if parsing fails or returns null
628665
string baseTypeName = record.BaseTypeName ?? typeName ?? string.Empty;
629666
int finalColumnSize = record.XdbcColumnSize ?? columnSize;
630667
int finalDecimalDigits = record.XdbcDecimalDigits ?? decimalDigits;

csharp/src/Drivers/Apache/Hive2/Metadata/MetadataSchemaBuilder.cs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,103 @@ public static StructArray BuildColumnsStructArray(IEnumerable<ColumnMetadataReco
509509

510510
#endregion
511511

512+
#region Schema Building Methods
513+
514+
/// <summary>
515+
/// Converts XDBC type metadata to Arrow type.
516+
/// Handles all standard SQL types including DECIMAL with precision/scale.
517+
/// </summary>
518+
/// <param name="columnTypeId">JDBC/ODBC type code</param>
519+
/// <param name="typeName">Type name string (e.g., "DECIMAL(10,2)")</param>
520+
/// <param name="isColumnSizeValid">Whether column size can be trusted for DECIMAL types</param>
521+
/// <param name="columnSize">Column size/precision value</param>
522+
/// <param name="decimalDigits">Decimal digits/scale value</param>
523+
/// <returns>Arrow type corresponding to the XDBC type</returns>
524+
public static IArrowType GetArrowType(int columnTypeId, string typeName, bool isColumnSizeValid, int? columnSize, int? decimalDigits)
525+
{
526+
switch (columnTypeId)
527+
{
528+
case (int)HiveServer2Connection.ColumnTypeId.BOOLEAN:
529+
return BooleanType.Default;
530+
case (int)HiveServer2Connection.ColumnTypeId.TINYINT:
531+
return Int8Type.Default;
532+
case (int)HiveServer2Connection.ColumnTypeId.SMALLINT:
533+
return Int16Type.Default;
534+
case (int)HiveServer2Connection.ColumnTypeId.INTEGER:
535+
return Int32Type.Default;
536+
case (int)HiveServer2Connection.ColumnTypeId.BIGINT:
537+
return Int64Type.Default;
538+
case (int)HiveServer2Connection.ColumnTypeId.FLOAT:
539+
case (int)HiveServer2Connection.ColumnTypeId.REAL:
540+
return FloatType.Default;
541+
case (int)HiveServer2Connection.ColumnTypeId.DOUBLE:
542+
return DoubleType.Default;
543+
case (int)HiveServer2Connection.ColumnTypeId.VARCHAR:
544+
case (int)HiveServer2Connection.ColumnTypeId.NVARCHAR:
545+
case (int)HiveServer2Connection.ColumnTypeId.LONGVARCHAR:
546+
case (int)HiveServer2Connection.ColumnTypeId.LONGNVARCHAR:
547+
return StringType.Default;
548+
case (int)HiveServer2Connection.ColumnTypeId.TIMESTAMP:
549+
return new TimestampType(TimeUnit.Microsecond, timezone: (string?)null);
550+
case (int)HiveServer2Connection.ColumnTypeId.BINARY:
551+
case (int)HiveServer2Connection.ColumnTypeId.VARBINARY:
552+
case (int)HiveServer2Connection.ColumnTypeId.LONGVARBINARY:
553+
return BinaryType.Default;
554+
case (int)HiveServer2Connection.ColumnTypeId.DATE:
555+
return Date32Type.Default;
556+
case (int)HiveServer2Connection.ColumnTypeId.CHAR:
557+
case (int)HiveServer2Connection.ColumnTypeId.NCHAR:
558+
return StringType.Default;
559+
case (int)HiveServer2Connection.ColumnTypeId.DECIMAL:
560+
case (int)HiveServer2Connection.ColumnTypeId.NUMERIC:
561+
if (isColumnSizeValid && columnSize.HasValue && decimalDigits.HasValue)
562+
{
563+
return new Decimal128Type(columnSize.Value, decimalDigits.Value);
564+
}
565+
else
566+
{
567+
// Parse type name for precision/scale when not provided
568+
return SqlTypeNameParser<SqlDecimalParserResult>
569+
.Parse(typeName, columnTypeId)
570+
.Decimal128Type;
571+
}
572+
case (int)HiveServer2Connection.ColumnTypeId.NULL:
573+
return NullType.Default;
574+
case (int)HiveServer2Connection.ColumnTypeId.ARRAY:
575+
case (int)HiveServer2Connection.ColumnTypeId.JAVA_OBJECT:
576+
case (int)HiveServer2Connection.ColumnTypeId.STRUCT:
577+
return StringType.Default;
578+
default:
579+
throw new NotImplementedException($"Column type id: {columnTypeId} is not supported.");
580+
}
581+
}
582+
583+
/// <summary>
584+
/// Builds an Arrow Schema from column metadata records.
585+
/// Used by GetTableSchema to convert metadata into Arrow schema format.
586+
/// </summary>
587+
/// <param name="records">Collection of column metadata records</param>
588+
/// <param name="isColumnSizeValidForDecimal">Whether column size can be trusted for DECIMAL types</param>
589+
/// <returns>Arrow Schema with fields for each column</returns>
590+
public static Schema BuildSchemaFromColumnMetadata(IEnumerable<ColumnMetadataRecord> records, bool isColumnSizeValidForDecimal)
591+
{
592+
var fields = new List<Field>();
593+
foreach (var record in records)
594+
{
595+
IArrowType arrowType = GetArrowType(
596+
record.XdbcDataType ?? 0,
597+
record.TypeName ?? "",
598+
isColumnSizeValidForDecimal,
599+
record.XdbcColumnSize,
600+
record.XdbcDecimalDigits
601+
);
602+
fields.Add(new Field(record.ColumnName, arrowType, record.Nullable == 1));
603+
}
604+
return new Schema(fields.ToArray(), null);
605+
}
606+
607+
#endregion
608+
512609
#region Helper Methods
513610

514611
/// <summary>
@@ -547,10 +644,10 @@ private static void AppendOrNull(Int16Array.Builder builder, short? value)
547644
/// <summary>
548645
/// Appends a value or null to an Int8Array.Builder.
549646
/// </summary>
550-
private static void AppendOrNull(Int8Array.Builder builder, sbyte? value)
647+
private static void AppendOrNull(Int8Array.Builder builder, byte? value)
551648
{
552649
if (value.HasValue)
553-
builder.Append(value.Value);
650+
builder.Append((sbyte)value.Value);
554651
else
555652
builder.AppendNull();
556653
}

csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,6 @@ protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(IRe
6868
protected override Task<TRowSet> GetRowSetAsync(IResponse response, CancellationToken cancellationToken = default) =>
6969
FetchResultsAsync(response.OperationHandle!, cancellationToken: cancellationToken);
7070

71-
internal override void SetPrecisionScaleAndTypeName(
72-
short colType,
73-
string typeName,
74-
TableInfo? tableInfo,
75-
int columnSize,
76-
int decimalDigits)
77-
{
78-
tableInfo?.TypeName.Add(typeName);
79-
tableInfo?.Precision.Add(columnSize);
80-
tableInfo?.Scale.Add((short)decimalDigits);
81-
tableInfo?.BaseTypeName.Add(typeName);
82-
}
83-
8471
protected override string InfoDriverName => DriverName;
8572

8673
protected override string InfoDriverArrowVersion => ArrowVersion;

0 commit comments

Comments
 (0)