Skip to content

Commit 79bf70f

Browse files
author
Jade Wang
committed
feat(csharp): add JSON_ARRAY format support and complete GetObjects implementation for Statement Execution API
Adds comprehensive support for Statement Execution API when warehouses don't support Arrow format inline results, and implements full metadata operations. **JSON_ARRAY Format Support:** - Created JsonArrayReader to convert JSON data to Arrow format - Supports multiple Arrow types: Int32, Int64, Double, String, Boolean, Date32, Timestamp - Handles null values and type conversion with proper error handling - Automatically detects format from response manifest **GetObjects Metadata Implementation:** - Implemented proper nested structure builders for hierarchical catalog/schema/table/column data - Added BuildListArray() to manually construct ListArrays with struct types - Added helper methods: BuildDbSchemasStruct(), BuildTablesStruct(), BuildColumnsStruct() - Includes CreateEmptyArray() for empty array creation across various types - Returns complete metadata per ADBC standard schema **E2E Test Infrastructure Improvements:** - Rewrote StatementExecutionConnectionE2ETests to use proper driver flow (Driver → Database → Connection) - Removed complex mocking infrastructure (527 lines → 168 lines) - Added DatabricksTestHelpers.GetPropertiesWithStatementExecutionEnabled for REST API config - Created StatementExecutionFeatureParityTests with 8 comprehensive E2E tests **Authentication & Connection Fixes:** - Added Bearer token authentication support in DatabricksConnection - Fixed null reference in TracingDelegatingHandler for W3C trace context - Always send warehouse_id in requests (required by Databricks API) **Test Results (all passing):** ✓ GetTableTypes_ReturnsStandardTypes ✓ GetObjects_CatalogDepth_ReturnsCatalogs ✓ GetTableSchema_KnownTable_ReturnsSchema ✓ ExecuteQuery_InlineResults_ReturnsData ✓ ExecuteQuery_InlineResults_MultipleRows_ReturnsAllData ✓ ExecuteQuery_HybridDisposition_ReturnsData ✓ ExecuteQuery_ExternalLinks_LargeResult_UsesCloudFetch ✓ ExecuteUpdate_DDLStatements_WorksCorrectly
1 parent 3dc875b commit 79bf70f

File tree

9 files changed

+1764
-500
lines changed

9 files changed

+1764
-500
lines changed

csharp/src/DatabricksConnection.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -722,10 +722,10 @@ internal static (HttpClient httpClient, string host) CreateHttpClientForRestApi(
722722

723723
// Create base HTTP handler with TLS configuration
724724
TlsProperties tlsOptions = HiveServer2TlsImpl.GetHttpTlsOptions(properties);
725-
// Note: Proxy support not yet implemented for REST API connections
726-
// TODO: Add proxy configurator support if needed
727-
HttpMessageHandler baseHandler = HiveServer2TlsImpl.NewHttpClientHandler(tlsOptions, null);
728-
HttpMessageHandler baseAuthHandler = HiveServer2TlsImpl.NewHttpClientHandler(tlsOptions, null);
725+
// Create no-op proxy configurator (proxy support not yet fully implemented for REST API)
726+
var proxyConfigurator = new HiveServer2ProxyConfigurator(useProxy: false);
727+
HttpMessageHandler baseHandler = HiveServer2TlsImpl.NewHttpClientHandler(tlsOptions, proxyConfigurator);
728+
HttpMessageHandler baseAuthHandler = HiveServer2TlsImpl.NewHttpClientHandler(tlsOptions, proxyConfigurator);
729729

730730
// Build handler chain (same order as CreateHttpHandler)
731731
// Order: Tracing (innermost) → Retry → ThriftErrorMessage → OAuth (outermost)
@@ -807,8 +807,16 @@ internal static (HttpClient httpClient, string host) CreateHttpClientForRestApi(
807807
}
808808
}
809809

810-
// Create and return the HTTP client
810+
// Create the HTTP client
811811
HttpClient httpClient = new HttpClient(baseHandler);
812+
813+
// Set Authorization header for simple token authentication
814+
// Note: This is separate from OAuth which uses delegating handlers
815+
if (properties.TryGetValue(SparkParameters.Token, out string? token) && !string.IsNullOrEmpty(token))
816+
{
817+
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
818+
}
819+
812820
return (httpClient, host);
813821
}
814822

Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Globalization;
20+
using System.Linq;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Apache.Arrow;
24+
using Apache.Arrow.Ipc;
25+
using Apache.Arrow.Types;
26+
using Apache.Arrow.Adbc.Drivers.Databricks.StatementExecution;
27+
28+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
29+
{
30+
/// <summary>
31+
/// Reader for JSON_ARRAY format results from Statement Execution API.
32+
/// Converts JSON data to Arrow format.
33+
/// </summary>
34+
internal class JsonArrayReader : IArrowArrayStream
35+
{
36+
private readonly Schema _schema;
37+
private readonly List<List<string>> _data;
38+
private bool _hasReadBatch;
39+
private bool _disposed;
40+
41+
public JsonArrayReader(ResultManifest manifest, List<List<string>> data)
42+
{
43+
if (manifest?.Schema == null)
44+
{
45+
throw new ArgumentException("Manifest must contain schema", nameof(manifest));
46+
}
47+
48+
_schema = ConvertSchema(manifest.Schema);
49+
_data = data ?? new List<List<string>>();
50+
_hasReadBatch = false;
51+
_disposed = false;
52+
}
53+
54+
public Schema Schema => _schema;
55+
56+
public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
57+
{
58+
if (_disposed)
59+
{
60+
throw new ObjectDisposedException(nameof(JsonArrayReader));
61+
}
62+
63+
// JSON_ARRAY format returns all data in a single batch
64+
if (_hasReadBatch || _data.Count == 0)
65+
{
66+
return null;
67+
}
68+
69+
_hasReadBatch = true;
70+
71+
return await Task.Run(() => ConvertToRecordBatch(), cancellationToken);
72+
}
73+
74+
private RecordBatch ConvertToRecordBatch()
75+
{
76+
int rowCount = _data.Count;
77+
var arrays = new IArrowArray[_schema.FieldsList.Count];
78+
79+
for (int colIndex = 0; colIndex < _schema.FieldsList.Count; colIndex++)
80+
{
81+
var field = _schema.FieldsList[colIndex];
82+
arrays[colIndex] = ConvertColumnToArrowArray(field, colIndex, rowCount);
83+
}
84+
85+
return new RecordBatch(_schema, arrays, rowCount);
86+
}
87+
88+
private IArrowArray ConvertColumnToArrowArray(Field field, int columnIndex, int rowCount)
89+
{
90+
var dataType = field.DataType;
91+
92+
// Handle different Arrow types
93+
switch (dataType.TypeId)
94+
{
95+
case ArrowTypeId.Int32:
96+
return ConvertToInt32Array(columnIndex, rowCount);
97+
case ArrowTypeId.Int64:
98+
return ConvertToInt64Array(columnIndex, rowCount);
99+
case ArrowTypeId.Double:
100+
return ConvertToDoubleArray(columnIndex, rowCount);
101+
case ArrowTypeId.String:
102+
return ConvertToStringArray(columnIndex, rowCount);
103+
case ArrowTypeId.Boolean:
104+
return ConvertToBooleanArray(columnIndex, rowCount);
105+
case ArrowTypeId.Date32:
106+
return ConvertToDate32Array(columnIndex, rowCount);
107+
case ArrowTypeId.Timestamp:
108+
return ConvertToTimestampArray(columnIndex, rowCount, (TimestampType)dataType);
109+
default:
110+
// Default to string for unknown types
111+
return ConvertToStringArray(columnIndex, rowCount);
112+
}
113+
}
114+
115+
private IArrowArray ConvertToInt32Array(int columnIndex, int rowCount)
116+
{
117+
var builder = new Int32Array.Builder();
118+
for (int i = 0; i < rowCount; i++)
119+
{
120+
var value = GetCellValue(i, columnIndex);
121+
if (string.IsNullOrEmpty(value) || value == "null")
122+
{
123+
builder.AppendNull();
124+
}
125+
else if (int.TryParse(value, out int result))
126+
{
127+
builder.Append(result);
128+
}
129+
else
130+
{
131+
builder.AppendNull();
132+
}
133+
}
134+
return builder.Build();
135+
}
136+
137+
private IArrowArray ConvertToInt64Array(int columnIndex, int rowCount)
138+
{
139+
var builder = new Int64Array.Builder();
140+
for (int i = 0; i < rowCount; i++)
141+
{
142+
var value = GetCellValue(i, columnIndex);
143+
if (string.IsNullOrEmpty(value) || value == "null")
144+
{
145+
builder.AppendNull();
146+
}
147+
else if (long.TryParse(value, out long result))
148+
{
149+
builder.Append(result);
150+
}
151+
else
152+
{
153+
builder.AppendNull();
154+
}
155+
}
156+
return builder.Build();
157+
}
158+
159+
private IArrowArray ConvertToDoubleArray(int columnIndex, int rowCount)
160+
{
161+
var builder = new DoubleArray.Builder();
162+
for (int i = 0; i < rowCount; i++)
163+
{
164+
var value = GetCellValue(i, columnIndex);
165+
if (string.IsNullOrEmpty(value) || value == "null")
166+
{
167+
builder.AppendNull();
168+
}
169+
else if (double.TryParse(value, NumberStyles.Any, CultureInfo.InvariantCulture, out double result))
170+
{
171+
builder.Append(result);
172+
}
173+
else
174+
{
175+
builder.AppendNull();
176+
}
177+
}
178+
return builder.Build();
179+
}
180+
181+
private IArrowArray ConvertToStringArray(int columnIndex, int rowCount)
182+
{
183+
var builder = new StringArray.Builder();
184+
for (int i = 0; i < rowCount; i++)
185+
{
186+
var value = GetCellValue(i, columnIndex);
187+
if (value == "null")
188+
{
189+
builder.AppendNull();
190+
}
191+
else
192+
{
193+
builder.Append(value ?? string.Empty);
194+
}
195+
}
196+
return builder.Build();
197+
}
198+
199+
private IArrowArray ConvertToBooleanArray(int columnIndex, int rowCount)
200+
{
201+
var builder = new BooleanArray.Builder();
202+
for (int i = 0; i < rowCount; i++)
203+
{
204+
var value = GetCellValue(i, columnIndex);
205+
if (string.IsNullOrEmpty(value) || value == "null")
206+
{
207+
builder.AppendNull();
208+
}
209+
else if (bool.TryParse(value, out bool result))
210+
{
211+
builder.Append(result);
212+
}
213+
else
214+
{
215+
builder.AppendNull();
216+
}
217+
}
218+
return builder.Build();
219+
}
220+
221+
private IArrowArray ConvertToDate32Array(int columnIndex, int rowCount)
222+
{
223+
var builder = new Date32Array.Builder();
224+
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
225+
226+
for (int i = 0; i < rowCount; i++)
227+
{
228+
var value = GetCellValue(i, columnIndex);
229+
if (string.IsNullOrEmpty(value) || value == "null")
230+
{
231+
builder.AppendNull();
232+
}
233+
else if (DateTime.TryParse(value, out DateTime date))
234+
{
235+
builder.Append(date.Date);
236+
}
237+
else
238+
{
239+
builder.AppendNull();
240+
}
241+
}
242+
return builder.Build();
243+
}
244+
245+
private IArrowArray ConvertToTimestampArray(int columnIndex, int rowCount, TimestampType timestampType)
246+
{
247+
var builder = new TimestampArray.Builder(timestampType);
248+
249+
for (int i = 0; i < rowCount; i++)
250+
{
251+
var value = GetCellValue(i, columnIndex);
252+
if (string.IsNullOrEmpty(value) || value == "null")
253+
{
254+
builder.AppendNull();
255+
}
256+
else if (DateTimeOffset.TryParse(value, out DateTimeOffset timestamp))
257+
{
258+
builder.Append(timestamp);
259+
}
260+
else
261+
{
262+
builder.AppendNull();
263+
}
264+
}
265+
return builder.Build();
266+
}
267+
268+
private string? GetCellValue(int rowIndex, int columnIndex)
269+
{
270+
if (rowIndex >= _data.Count)
271+
{
272+
return null;
273+
}
274+
275+
var row = _data[rowIndex];
276+
if (columnIndex >= row.Count)
277+
{
278+
return null;
279+
}
280+
281+
return row[columnIndex];
282+
}
283+
284+
private static Schema ConvertSchema(ResultSchema schema)
285+
{
286+
if (schema.Columns == null || schema.Columns.Count == 0)
287+
{
288+
return new Schema.Builder().Build();
289+
}
290+
291+
var fields = new List<Field>();
292+
foreach (var column in schema.Columns.OrderBy(c => c.Position))
293+
{
294+
var arrowType = ConvertType(column.TypeName, column.TypeText);
295+
fields.Add(new Field(column.Name, arrowType, nullable: true));
296+
}
297+
298+
return new Schema(fields, null);
299+
}
300+
301+
private static IArrowType ConvertType(string? typeName, string? typeText)
302+
{
303+
// Use typeText if available, fall back to typeName
304+
string type = (typeText ?? typeName ?? "STRING").ToUpperInvariant();
305+
306+
// Map Databricks types to Arrow types
307+
if (type.Contains("INT") || type == "INTEGER")
308+
{
309+
return Int32Type.Default;
310+
}
311+
else if (type.Contains("BIGINT") || type == "LONG")
312+
{
313+
return Int64Type.Default;
314+
}
315+
else if (type.Contains("DOUBLE") || type == "FLOAT")
316+
{
317+
return DoubleType.Default;
318+
}
319+
else if (type.Contains("BOOLEAN") || type == "BOOL")
320+
{
321+
return BooleanType.Default;
322+
}
323+
else if (type.Contains("DATE"))
324+
{
325+
return Date32Type.Default;
326+
}
327+
else if (type.Contains("TIMESTAMP"))
328+
{
329+
return new TimestampType(TimeUnit.Microsecond, timezone: "UTC");
330+
}
331+
else
332+
{
333+
// Default to string for all other types
334+
return StringType.Default;
335+
}
336+
}
337+
338+
public void Dispose()
339+
{
340+
if (!_disposed)
341+
{
342+
_disposed = true;
343+
}
344+
}
345+
}
346+
}

0 commit comments

Comments
 (0)