Skip to content

Commit e9fff87

Browse files
feat(csharp): implement EnableComplexDatatypeSupport for consistent complex type behavior (PECO-2938) (#311)
## Summary Implements JDBC-equivalent `EnableComplexDatatypeSupport` parameter that unifies complex type (ARRAY, MAP, STRUCT) behavior across Thrift and SEA protocols. ### Problem Previously, Thrift and SEA returned complex types differently: - **Thrift**: JSON strings (e.g. `[1,2,3]`, `{"a":1}`) via `ComplexTypesAsArrow=false` - **SEA**: Native Arrow types (`ListType`, `MapType`, `StructType`) This caused test divergence and required `IsSeaMode` branching. ### Solution Adds `adbc.databricks.enable_complex_datatype_support` parameter (default `false`) that makes both protocols behave consistently — matching JDBC's `EnableComplexDatatypeSupport` flag. **Default (`false`) — JSON strings from both protocols:** - Thrift: `ComplexTypesAsArrow=false` (server returns JSON strings natively) - SEA: new `ComplexTypeSerializingStream` wraps result and serializes native Arrow types to JSON strings via `ArrowComplexTypeJsonSerializer` **When enabled (`true`) — native Arrow types from both protocols:** - Thrift: `ComplexTypesAsArrow=true` (server returns native Arrow types) - SEA: native Arrow types returned as-is ### New Files - `ArrowComplexTypeJsonSerializer.cs` — serializes Arrow LIST/MAP/STRUCT values to JSON using `Utf8JsonWriter`, handles nested complex types - `ComplexTypeSerializingStream.cs` — `IArrowArrayStream` wrapper that converts complex columns to `StringArray` when `EnableComplexDatatypeSupport=false` ### Changes - `DatabricksParameters.cs` — adds `EnableComplexDatatypeSupport` constant - `DatabricksConnection.cs` — reads and exposes the parameter - `DatabricksStatement.cs` — drives `ComplexTypesAsArrow` from the parameter - `StatementExecutionStatement.cs` — wraps result stream with `ComplexTypeSerializingStream` when disabled - `ComplexTypesValueTests.cs` — simplified to assert unified JSON string output without `IsSeaMode` branching ## Test Plan - [x] All 466 unit tests pass - [ ] E2E COMPLEX-001..014 tests pass against live Databricks (Thrift and SEA) - [ ] Verify `[1,2,3]` JSON string output for ARRAY in both protocols - [ ] Verify `{"a":1,"b":2}` JSON string output for MAP in both protocols - [ ] Verify `{"id":1,"name":"Alice"}` JSON string output for STRUCT in both protocols 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1d6fe13 commit e9fff87

File tree

7 files changed

+424
-6
lines changed

7 files changed

+424
-6
lines changed
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text.Json;
20+
using System.Threading;
21+
using System.Threading.Tasks;
22+
using Apache.Arrow;
23+
using Apache.Arrow.Adbc.Extensions;
24+
using Apache.Arrow.Ipc;
25+
using Apache.Arrow.Types;
26+
27+
namespace AdbcDrivers.Databricks
28+
{
29+
/// <summary>
30+
/// Wraps an <see cref="IArrowArrayStream"/> and converts columns of complex Arrow types
31+
/// (LIST, MAP represented as LIST of STRUCTs, STRUCT) into STRING columns containing
32+
/// their JSON representation.
33+
///
34+
/// This is applied when EnableComplexDatatypeSupport=false (the default), so that SEA
35+
/// results match the legacy Thrift behavior of returning JSON strings for complex types.
36+
/// </summary>
37+
internal sealed class ComplexTypeSerializingStream : IArrowArrayStream
38+
{
39+
private readonly IArrowArrayStream _inner;
40+
private readonly Schema _schema;
41+
private readonly HashSet<int> _complexColumnIndices;
42+
43+
public ComplexTypeSerializingStream(IArrowArrayStream inner)
44+
{
45+
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
46+
(_schema, _complexColumnIndices) = BuildStringSchema(inner.Schema);
47+
}
48+
49+
public Schema Schema => _schema;
50+
51+
public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
52+
{
53+
RecordBatch? batch = await _inner.ReadNextRecordBatchAsync(cancellationToken).ConfigureAwait(false);
54+
if (batch == null)
55+
return null;
56+
57+
if (_complexColumnIndices.Count == 0)
58+
return batch;
59+
60+
return ConvertComplexColumns(batch);
61+
}
62+
63+
public void Dispose() => _inner.Dispose();
64+
65+
private RecordBatch ConvertComplexColumns(RecordBatch batch)
66+
{
67+
IArrowArray[] arrays = new IArrowArray[batch.ColumnCount];
68+
for (int i = 0; i < batch.ColumnCount; i++)
69+
{
70+
arrays[i] = _complexColumnIndices.Contains(i) ? SerializeToStringArray(batch.Column(i)) : batch.Column(i);
71+
}
72+
return new RecordBatch(_schema, arrays, batch.Length);
73+
}
74+
75+
private static StringArray SerializeToStringArray(IArrowArray array)
76+
{
77+
StringArray.Builder builder = new StringArray.Builder();
78+
for (int i = 0; i < array.Length; i++)
79+
{
80+
if (array.IsNull(i))
81+
builder.AppendNull();
82+
else
83+
builder.Append(JsonSerializer.Serialize(ToObject(array, i)));
84+
}
85+
return builder.Build();
86+
}
87+
88+
/// <summary>
89+
/// Builds a new schema where all complex-type fields are replaced with StringType,
90+
/// and returns the set of column indices that were converted.
91+
/// </summary>
92+
private static (Schema schema, HashSet<int> complexIndices) BuildStringSchema(Schema original)
93+
{
94+
List<Field> fields = new List<Field>(original.FieldsList.Count);
95+
HashSet<int> indices = new HashSet<int>();
96+
97+
for (int i = 0; i < original.FieldsList.Count; i++)
98+
{
99+
Field field = original.FieldsList[i];
100+
if (IsComplexType(field.DataType))
101+
{
102+
fields.Add(new Field(field.Name, StringType.Default, field.IsNullable, field.Metadata));
103+
indices.Add(i);
104+
}
105+
else
106+
{
107+
fields.Add(field);
108+
}
109+
}
110+
111+
return (new Schema(fields, original.Metadata), indices);
112+
}
113+
114+
private static bool IsComplexType(IArrowType type) =>
115+
type is ListType || type is MapType || type is StructType;
116+
117+
// --- JSON serialization helpers ---
118+
119+
private static object? ToObject(IArrowArray array, int index)
120+
{
121+
if (array.IsNull(index))
122+
return null;
123+
124+
// Handle complex types with recursive traversal, and types needing specific
125+
// string formatting. All other primitives delegate to ValueAt().
126+
return array switch
127+
{
128+
ListArray la => ToListOrMap(la, index),
129+
StructArray sa => ToDict(sa, index),
130+
Decimal128Array dec => dec.GetString(index), // preserve precision as string
131+
Date32Array d32 => d32.GetDateTime(index)?.ToString("yyyy-MM-dd"),
132+
_ => array.ValueAt(index, StructResultType.Object) // int, long, float, bool, string, timestamp, etc.
133+
};
134+
}
135+
136+
private static object ToListOrMap(ListArray listArray, int index)
137+
{
138+
IArrowArray values = listArray.Values;
139+
int start = (int)listArray.ValueOffsets[index];
140+
int end = (int)listArray.ValueOffsets[index + 1];
141+
142+
// Arrow MAP is stored as List<Struct<key, value>>
143+
if (values is StructArray structValues && IsMapStruct(structValues))
144+
return ToMapDict(structValues, start, end);
145+
146+
List<object?> list = new List<object?>();
147+
for (int i = start; i < end; i++)
148+
list.Add(ToObject(values, i));
149+
return list;
150+
}
151+
152+
private static bool IsMapStruct(StructArray structArray)
153+
{
154+
StructType type = (StructType)structArray.Data.DataType;
155+
return type.Fields.Count == 2 &&
156+
type.Fields[0].Name == "key" &&
157+
type.Fields[1].Name == "value";
158+
}
159+
160+
private static SortedDictionary<string, object?> ToMapDict(StructArray entries, int start, int end)
161+
{
162+
IArrowArray keyArray = entries.Fields[0];
163+
IArrowArray valueArray = entries.Fields[1];
164+
// Use SortedDictionary for deterministic key ordering in the JSON output
165+
SortedDictionary<string, object?> result = new SortedDictionary<string, object?>();
166+
for (int i = start; i < end; i++)
167+
{
168+
// Convert any key type to its string representation; treat null keys as "null"
169+
string key = ToObject(keyArray, i)?.ToString() ?? "null";
170+
result[key] = ToObject(valueArray, i);
171+
}
172+
return result;
173+
}
174+
175+
private static Dictionary<string, object?> ToDict(StructArray structArray, int index)
176+
{
177+
StructType type = (StructType)structArray.Data.DataType;
178+
Dictionary<string, object?> dict = new Dictionary<string, object?>();
179+
for (int i = 0; i < type.Fields.Count; i++)
180+
dict[type.Fields[i].Name] = ToObject(structArray.Fields[i], index);
181+
return dict;
182+
}
183+
}
184+
}

csharp/src/DatabricksConnection.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ internal class DatabricksConnection : SparkHttpConnection
7070
private bool _enableMultipleCatalogSupport = true;
7171
private bool _enablePKFK = true;
7272
private bool _runAsyncInThrift = true;
73+
private bool _enableComplexDatatypeSupport = false;
7374

7475
// DirectQuery configuration
7576
private const long DefaultDirectResultMaxBytes = 10 * 1024 * 1024; // 10MB for direct query results size limit
@@ -195,6 +196,7 @@ private void ValidateProperties()
195196
_canDecompressLz4 = PropertyHelper.GetBooleanPropertyWithValidation(Properties, DatabricksParameters.CanDecompressLz4, _canDecompressLz4);
196197
_useDescTableExtended = PropertyHelper.GetBooleanPropertyWithValidation(Properties, DatabricksParameters.UseDescTableExtended, _useDescTableExtended);
197198
_runAsyncInThrift = PropertyHelper.GetBooleanPropertyWithValidation(Properties, DatabricksParameters.EnableRunAsyncInThriftOp, _runAsyncInThrift);
199+
_enableComplexDatatypeSupport = PropertyHelper.GetBooleanPropertyWithValidation(Properties, DatabricksParameters.EnableComplexDatatypeSupport, _enableComplexDatatypeSupport);
198200

199201
if (Properties.ContainsKey(DatabricksParameters.MaxBytesPerFile))
200202
{
@@ -370,6 +372,11 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
370372
/// </summary>
371373
public bool RunAsyncInThrift => _runAsyncInThrift;
372374

375+
/// <summary>
376+
/// Whether to return complex types as native Arrow types (true) or JSON strings (false).
377+
/// </summary>
378+
internal bool EnableComplexDatatypeSupport => _enableComplexDatatypeSupport;
379+
373380
/// <summary>
374381
/// Gets a value indicating whether to retry requests that receive retryable responses (408, 502, 503, 504) .
375382
/// </summary>

csharp/src/DatabricksParameters.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,15 @@ public class DatabricksParameters : SparkParameters
376376
/// Default value is 900 seconds (15 minutes) if not specified.
377377
/// </summary>
378378
public const string FeatureFlagCacheTtlSeconds = "adbc.databricks.feature_flag_cache_ttl_seconds";
379+
380+
/// <summary>
381+
/// Whether to return complex types (ARRAY, MAP, STRUCT) as native Arrow types.
382+
/// When false (default): complex types are serialized to JSON strings, matching legacy behavior.
383+
/// When true: complex types are returned as native Arrow types (ListType, MapType, StructType).
384+
/// This applies to both Thrift and SEA protocols, providing consistent behavior across protocols.
385+
/// Default value is false if not specified.
386+
/// </summary>
387+
public const string EnableComplexDatatypeSupport = "adbc.databricks.enable_complex_datatype_support";
379388
}
380389

381390
/// <summary>

csharp/src/DatabricksStatement.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement
6262
private bool enableMultipleCatalogSupport;
6363
private bool enablePKFK;
6464
private bool runAsyncInThrift;
65+
private bool enableComplexDatatypeSupport;
6566
private Dictionary<string, string>? confOverlay;
6667
internal string? StatementId { get; set; }
6768

@@ -90,6 +91,7 @@ public DatabricksStatement(DatabricksConnection connection)
9091
enablePKFK = connection.EnablePKFK;
9192

9293
runAsyncInThrift = connection.RunAsyncInThrift;
94+
enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport;
9395

9496
// Override the Apache base default (500ms) with Databricks-specific poll interval (100ms)
9597
if (!connection.Properties.ContainsKey(ApacheParameters.PollTimeMilliseconds))
@@ -260,11 +262,11 @@ protected override void SetStatementProperties(TExecuteStatementReq statement)
260262
{
261263
TimestampAsArrow = true,
262264
DecimalAsArrow = true,
263-
264-
// set to false so they return as string
265-
// otherwise, they return as ARRAY_TYPE but you can't determine
266-
// the object type of the items in the array
267-
ComplexTypesAsArrow = false,
265+
// When false (default), complex types (ARRAY, MAP, STRUCT) are returned as JSON-encoded
266+
// strings by the Thrift server. When true, the server returns native Arrow types.
267+
// Note: Thrift ARRAY_TYPE responses do not embed element type info, so callers cannot
268+
// reliably determine element types; returning strings is the safe default.
269+
ComplexTypesAsArrow = enableComplexDatatypeSupport,
268270
IntervalTypesAsArrow = false,
269271
};
270272

csharp/src/StatementExecution/StatementExecutionConnection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ internal class StatementExecutionConnection : TracingConnection, IGetObjectsData
7171
private readonly bool _tracePropagationEnabled;
7272
private readonly string _traceParentHeaderName;
7373
private readonly bool _traceStateEnabled;
74+
private readonly bool _enableComplexDatatypeSupport;
7475

7576
// Authentication support
7677
private readonly string? _identityFederationClientId;
@@ -229,6 +230,7 @@ private StatementExecutionConnection(
229230
_tracePropagationEnabled = PropertyHelper.GetBooleanPropertyWithValidation(properties, DatabricksParameters.TracePropagationEnabled, true);
230231
_traceParentHeaderName = PropertyHelper.GetStringProperty(properties, DatabricksParameters.TraceParentHeaderName, "traceparent");
231232
_traceStateEnabled = PropertyHelper.GetBooleanPropertyWithValidation(properties, DatabricksParameters.TraceStateEnabled, false);
233+
_enableComplexDatatypeSupport = PropertyHelper.GetBooleanPropertyWithValidation(properties, DatabricksParameters.EnableComplexDatatypeSupport, false);
232234

233235
// Authentication configuration
234236
if (properties.TryGetValue(DatabricksParameters.IdentityFederationClientId, out string? identityFederationClientId))
@@ -851,6 +853,8 @@ public override void Dispose()
851853
}
852854

853855
// TracingConnection provides IActivityTracer implementation
856+
internal bool EnableComplexDatatypeSupport => _enableComplexDatatypeSupport;
857+
854858
public override string AssemblyVersion => GetType().Assembly.GetName().Version?.ToString() ?? "1.0.0";
855859
public override string AssemblyName => "AdbcDrivers.Databricks";
856860
}

csharp/src/StatementExecution/StatementExecutionStatement.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using System.Net.Http;
2222
using System.Threading;
2323
using System.Threading.Tasks;
24+
using AdbcDrivers.Databricks;
2425
using AdbcDrivers.Databricks.Reader.CloudFetch;
2526
using AdbcDrivers.Databricks.StatementExecution.MetadataCommands;
2627
using AdbcDrivers.Databricks.Result;
@@ -65,6 +66,9 @@ internal class StatementExecutionStatement : TracingStatement
6566
// HTTP client for CloudFetch downloads
6667
private readonly HttpClient _httpClient;
6768

69+
// Complex type configuration
70+
private readonly bool _enableComplexDatatypeSupport;
71+
6872
// Connection reference for metadata queries
6973
private readonly StatementExecutionConnection _connection;
7074

@@ -119,6 +123,7 @@ public StatementExecutionStatement(
119123
_recyclableMemoryStreamManager = recyclableMemoryStreamManager ?? throw new ArgumentNullException(nameof(recyclableMemoryStreamManager));
120124
_lz4BufferPool = lz4BufferPool ?? throw new ArgumentNullException(nameof(lz4BufferPool));
121125
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
126+
_enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport;
122127
}
123128

124129
/// <summary>
@@ -280,6 +285,13 @@ public async Task<QueryResult> ExecuteQueryAsync(
280285
// Create appropriate reader based on result disposition
281286
IArrowArrayStream reader = CreateReader(response, cancellationToken);
282287

288+
// When EnableComplexDatatypeSupport=false (default), serialize complex Arrow types to JSON strings
289+
// so that SEA behavior matches Thrift (which sets ComplexTypesAsArrow=false).
290+
if (!_enableComplexDatatypeSupport)
291+
{
292+
reader = new ComplexTypeSerializingStream(reader);
293+
}
294+
283295
// Get schema from reader
284296
var schema = reader.Schema;
285297

0 commit comments

Comments
 (0)