-
Notifications
You must be signed in to change notification settings - Fork 11
feat(csharp): implement EnableComplexDatatypeSupport for consistent complex type behavior (PECO-2938) #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
97e1984
5030e3b
26235c8
373b75d
b0018c8
aff5b05
fb4af79
6d20bca
8613c52
4b9bc30
e884fdc
2f94c5c
93427d0
f5b9227
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,184 @@ | ||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||
| * Copyright (c) 2025 ADBC Drivers Contributors | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| using System; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Collections.Generic; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Text.Json; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Threading; | ||||||||||||||||||||||||||||||||||||||||||
| using System.Threading.Tasks; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Adbc.Extensions; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Ipc; | ||||||||||||||||||||||||||||||||||||||||||
| using Apache.Arrow.Types; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| namespace AdbcDrivers.Databricks | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||
| /// Wraps an <see cref="IArrowArrayStream"/> and converts columns of complex Arrow types | ||||||||||||||||||||||||||||||||||||||||||
| /// (LIST, MAP represented as LIST of STRUCTs, STRUCT) into STRING columns containing | ||||||||||||||||||||||||||||||||||||||||||
| /// their JSON representation. | ||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||
| /// This is applied when EnableComplexDatatypeSupport=false (the default), so that SEA | ||||||||||||||||||||||||||||||||||||||||||
| /// results match the legacy Thrift behavior of returning JSON strings for complex types. | ||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||
| internal sealed class ComplexTypeSerializingStream : IArrowArrayStream | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| private readonly IArrowArrayStream _inner; | ||||||||||||||||||||||||||||||||||||||||||
| private readonly Schema _schema; | ||||||||||||||||||||||||||||||||||||||||||
| private readonly HashSet<int> _complexColumnIndices; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public ComplexTypeSerializingStream(IArrowArrayStream inner) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| _inner = inner ?? throw new ArgumentNullException(nameof(inner)); | ||||||||||||||||||||||||||||||||||||||||||
| (_schema, _complexColumnIndices) = BuildStringSchema(inner.Schema); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public Schema Schema => _schema; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| RecordBatch? batch = await _inner.ReadNextRecordBatchAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||
| if (batch == null) | ||||||||||||||||||||||||||||||||||||||||||
| return null; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+51
to
+56
|
||||||||||||||||||||||||||||||||||||||||||
| if (_complexColumnIndices.Count == 0) | ||||||||||||||||||||||||||||||||||||||||||
| return batch; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| return ConvertComplexColumns(batch); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| public void Dispose() => _inner.Dispose(); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| private RecordBatch ConvertComplexColumns(RecordBatch batch) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| IArrowArray[] arrays = new IArrowArray[batch.ColumnCount]; | ||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < batch.ColumnCount; i++) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| arrays[i] = _complexColumnIndices.Contains(i) ? SerializeToStringArray(batch.Column(i)) : batch.Column(i); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
| return new RecordBatch(_schema, arrays, batch.Length); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| private static StringArray SerializeToStringArray(IArrowArray array) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| StringArray.Builder builder = new StringArray.Builder(); | ||||||||||||||||||||||||||||||||||||||||||
| for (int i = 0; i < array.Length; i++) | ||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||
| if (array.IsNull(i)) | ||||||||||||||||||||||||||||||||||||||||||
| builder.AppendNull(); | ||||||||||||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||||||||||||
| builder.Append(JsonSerializer.Serialize(ToObject(array, i))); | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+81
to
+83
|
||||||||||||||||||||||||||||||||||||||||||
| builder.AppendNull(); | |
| else | |
| builder.Append(JsonSerializer.Serialize(ToObject(array, i))); | |
| { | |
| builder.AppendNull(); | |
| } | |
| else | |
| { | |
| var value = ToObject(array, i); | |
| if (value is IDictionary<string, object?> dict) | |
| { | |
| // Wrap in a SortedDictionary to ensure deterministic key ordering | |
| var ordered = new SortedDictionary<string, object?>(dict); | |
| builder.Append(JsonSerializer.Serialize(ordered)); | |
| } | |
| else | |
| { | |
| builder.Append(JsonSerializer.Serialize(value)); | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — changed ToMapDict to return SortedDictionary<string, object?> so MAP keys are always emitted in sorted order, giving deterministic JSON output across all target frameworks.
This comment was generated with GitHub MCP.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement | |
| private bool enableMultipleCatalogSupport; | ||
| private bool enablePKFK; | ||
| private bool runAsyncInThrift; | ||
| private bool enableComplexDatatypeSupport; | ||
| private Dictionary<string, string>? confOverlay; | ||
| internal string? StatementId { get; set; } | ||
|
|
||
|
|
@@ -90,6 +91,7 @@ public DatabricksStatement(DatabricksConnection connection) | |
| enablePKFK = connection.EnablePKFK; | ||
|
|
||
| runAsyncInThrift = connection.RunAsyncInThrift; | ||
| enableComplexDatatypeSupport = connection.EnableComplexDatatypeSupport; | ||
|
|
||
| // Override the Apache base default (500ms) with Databricks-specific poll interval (100ms) | ||
| if (!connection.Properties.ContainsKey(ApacheParameters.PollTimeMilliseconds)) | ||
|
|
@@ -260,11 +262,7 @@ protected override void SetStatementProperties(TExecuteStatementReq statement) | |
| { | ||
| TimestampAsArrow = true, | ||
| DecimalAsArrow = true, | ||
|
|
||
| // set to false so they return as string | ||
| // otherwise, they return as ARRAY_TYPE but you can't determine | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Somehow keep this comment
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ Done — restored an expanded comment on
This comment was generated with GitHub MCP. |
||
| // the object type of the items in the array | ||
| ComplexTypesAsArrow = false, | ||
| ComplexTypesAsArrow = enableComplexDatatypeSupport, | ||
| IntervalTypesAsArrow = false, | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using Apache.Arrow.Ipc;appears unused in this file and will trigger CS8019; the repo treats warnings as errors, so this can break the build. Remove the unused using (or reference an Ipc type if it’s intended).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done — removed
using Apache.Arrow.Ipc;from bothComplexTypeSerializingStream.csandComplexTypesValueTests.cs.This comment was generated with GitHub MCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverting this —
IArrowArrayStreamis defined inApache.Arrow.Ipc, so the using is required. Removing it caused a build failure (CS0246). Restored.This comment was generated with GitHub MCP.