-
Notifications
You must be signed in to change notification settings - Fork 11
feat(csharp): implement EnableComplexDatatypeSupport for consistent complex type behavior (PECO-2938) #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
97e1984
5030e3b
26235c8
373b75d
b0018c8
aff5b05
fb4af79
6d20bca
8613c52
4b9bc30
e884fdc
2f94c5c
93427d0
f5b9227
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,182 @@ | ||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||
| * Copyright (c) 2025 ADBC Drivers Contributors | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| using System; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Collections.Generic; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Text.Json; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Threading; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Threading.Tasks; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Adbc.Extensions; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Ipc; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Types; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| namespace AdbcDrivers.Databricks | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||
| /// Wraps an <see cref="IArrowArrayStream"/> and converts columns of complex Arrow types | ||||||||||||||||||||||||||||||||||||||||||
| /// (LIST, MAP represented as LIST of STRUCTs, STRUCT) into STRING columns containing | ||||||||||||||||||||||||||||||||||||||||||
| /// their JSON representation. | ||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||
| /// This is applied when EnableComplexDatatypeSupport=false (the default), so that SEA | ||||||||||||||||||||||||||||||||||||||||||
| /// results match the legacy Thrift behavior of returning JSON strings for complex types. | ||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||
| internal sealed class ComplexTypeSerializingStream : IArrowArrayStream | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| private readonly IArrowArrayStream _inner; | ||||||||||||||||||||||||||||||||||||||||||
| private readonly Schema _schema; | ||||||||||||||||||||||||||||||||||||||||||
| private readonly HashSet<int> _complexColumnIndices; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public ComplexTypeSerializingStream(IArrowArrayStream inner) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| _inner = inner ?? throw new ArgumentNullException(nameof(inner)); | ||||||||||||||||||||||||||||||||||||||||||
| (_schema, _complexColumnIndices) = BuildStringSchema(inner.Schema); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public Schema Schema => _schema; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| var batch = await _inner.ReadNextRecordBatchAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||
| if (batch == null) | ||||||||||||||||||||||||||||||||||||||||||
| return null; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+51
to
+56
|
||||||||||||||||||||||||||||||||||||||||||
| if (_complexColumnIndices.Count == 0) | ||||||||||||||||||||||||||||||||||||||||||
| return batch; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| return ConvertComplexColumns(batch); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public void Dispose() => _inner.Dispose(); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| private RecordBatch ConvertComplexColumns(RecordBatch batch) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| var arrays = new IArrowArray[batch.ColumnCount]; | ||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < batch.ColumnCount; i++) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| arrays[i] = _complexColumnIndices.Contains(i) ? SerializeToStringArray(batch.Column(i)) : batch.Column(i); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
| return new RecordBatch(_schema, arrays, batch.Length); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| private static StringArray SerializeToStringArray(IArrowArray array) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| var builder = new StringArray.Builder(); | ||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < array.Length; i++) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| if (array.IsNull(i)) | ||||||||||||||||||||||||||||||||||||||||||
| builder.AppendNull(); | ||||||||||||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||||||||||||
| builder.Append(JsonSerializer.Serialize(ToObject(array, i))); | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+81
to
+83
|
||||||||||||||||||||||||||||||||||||||||||
| builder.AppendNull(); | |
| else | |
| builder.Append(JsonSerializer.Serialize(ToObject(array, i))); | |
| { | |
| builder.AppendNull(); | |
| } | |
| else | |
| { | |
| var value = ToObject(array, i); | |
| if (value is IDictionary<string, object?> dict) | |
| { | |
| // Wrap in a SortedDictionary to ensure deterministic key ordering | |
| var ordered = new SortedDictionary<string, object?>(dict); | |
| builder.Append(JsonSerializer.Serialize(ordered)); | |
| } | |
| else | |
| { | |
| builder.Append(JsonSerializer.Serialize(value)); | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — changed ToMapDict to return SortedDictionary<string, object?> so MAP keys are always emitted in sorted order, giving deterministic JSON output across all target frameworks.
This comment was generated with GitHub MCP.
Outdated
Copilot
AI
Mar 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When converting a complex field to StringType, the new field is always created with nullable: true, which can change schema nullability semantics compared to the original field. Consider preserving the original field’s nullability (e.g., use the original field’s nullable flag) while only changing the DataType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — changed nullable: true to field.IsNullable to preserve the original field's nullability.
This comment was generated with GitHub MCP.
Outdated
Copilot
AI
Mar 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAP key handling currently falls back to the literal string "null" whenever the key column isn’t a StringArray (or contains null), which silently corrupts maps with non-string keys (e.g., MAP<INT, ...>) and can also collide with a real "null" key. Consider converting key values to a string representation (e.g., via ValueAt(...).ToString() with invariant culture) and decide explicitly how to handle null keys (reject/throw vs. serialize).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — fixed to ToObject(keyArray, i)?.ToString() ?? "null". This correctly handles all Arrow primitive key types (INT, BIGINT, DATE, BOOLEAN, DECIMAL, etc.) using the existing ToObject helper, eliminating the data loss bug with non-string keys.
This comment was generated with GitHub MCP.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement | |
| private bool enableMultipleCatalogSupport; | ||
| private bool enablePKFK; | ||
| private bool runAsyncInThrift; | ||
| private bool enableComplexDatatypeSupport; | ||
| private Dictionary<string, string>? confOverlay; | ||
|
|
||
| public override long BatchSize { get; protected set; } = DatabricksBatchSizeDefault; | ||
|
|
@@ -84,6 +85,7 @@ public DatabricksStatement(DatabricksConnection connection) | |
| enablePKFK = connection.EnablePKFK; | ||
|
|
||
| runAsyncInThrift = connection.RunAsyncInThrift; | ||
| enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport; | ||
|
|
||
| // Override the Apache base default (500ms) with Databricks-specific poll interval (100ms) | ||
| if (!connection.Properties.ContainsKey(ApacheParameters.PollTimeMilliseconds)) | ||
|
|
@@ -136,11 +138,7 @@ protected override void SetStatementProperties(TExecuteStatementReq statement) | |
| { | ||
| TimestampAsArrow = true, | ||
| DecimalAsArrow = true, | ||
|
|
||
| // set to false so they return as string | ||
| // otherwise, they return as ARRAY_TYPE but you can't determine | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Somehow keep this comment
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ Done — restored an expanded comment on
This comment was generated with GitHub MCP. |
||
| // the object type of the items in the array | ||
| ComplexTypesAsArrow = false, | ||
| ComplexTypesAsArrow = enableComplexDatatypeSupport, | ||
| IntervalTypesAsArrow = false, | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using Apache.Arrow.Ipc;appears unused in this file and will trigger CS8019; the repo treats warnings as errors, so this can break the build. Remove the unused using (or reference an Ipc type if it’s intended).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — removed
using Apache.Arrow.Ipc;from bothComplexTypeSerializingStream.csandComplexTypesValueTests.cs.This comment was generated with GitHub MCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverting this —
IArrowArrayStreamis defined inApache.Arrow.Ipc, so the using is required. Removing it caused a build failure (CS0246). Restored.This comment was generated with GitHub MCP.