Skip to content

Commit 47282c3

Browse files
Jade Wangclaude
andcommitted
feat(csharp): implement InlineReader for inline result disposition
Implements InlineReader class to handle INLINE disposition where results are embedded as Arrow IPC stream in REST API responses. This supports result sets ≤25 MiB that are returned inline in the Attachment field of ResultChunk. Key features: - Implements IArrowArrayStream interface for standard Arrow integration - Parses base64-encoded Arrow IPC stream data from ResultChunk.Attachment - Handles multiple chunks with proper ordering - Comprehensive unit tests (15 test cases) covering all scenarios This is part of PECO-2791-C (Inline Results Support) and completes PECO-2839. The integration with StatementExecutionStatement will be handled in a follow-up PR. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent ebc50b5 commit 47282c3

File tree

3 files changed

+742
-1
lines changed

3 files changed

+742
-1
lines changed

csharp/doc/statement-execution-api-design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2446,7 +2446,7 @@ internal class StatementExecutionResultFetcher : BaseResultFetcher
24462446
**Dependencies:** PECO-2791-B
24472447

24482448
**Scope:**
2449-
- [ ] Implement `InlineReader` class
2449+
- [x] Implement `InlineReader` class**COMPLETED (PECO-2839)**
24502450
- Parse inline Arrow stream data from `ResultChunk.Attachment`
24512451
- Handle multiple chunks in sequence
24522452
- [ ] Update `StatementExecutionStatement` to support hybrid disposition

csharp/src/Reader/InlineReader.cs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.IO;
20+
using System.Linq;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Apache.Arrow.Adbc.Drivers.Databricks.StatementExecution;
24+
using Apache.Arrow.Ipc;
25+
26+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
27+
{
28+
/// <summary>
29+
/// Reader for inline Arrow result data from Databricks Statement Execution REST API.
30+
/// Handles INLINE disposition where results are embedded as base64-encoded Arrow IPC stream in response.
31+
/// </summary>
32+
internal sealed class InlineReader : IArrowArrayStream
33+
{
34+
private readonly List<ResultChunk> _chunks;
35+
private int _currentChunkIndex;
36+
private ArrowStreamReader? _currentReader;
37+
private Schema? _schema;
38+
private bool _isDisposed;
39+
40+
/// <summary>
41+
/// Initializes a new instance of the <see cref="InlineReader"/> class.
42+
/// </summary>
43+
/// <param name="manifest">The result manifest containing inline data chunks.</param>
44+
/// <exception cref="ArgumentNullException">Thrown when manifest is null.</exception>
45+
/// <exception cref="ArgumentException">Thrown when manifest format is not arrow_stream.</exception>
46+
public InlineReader(ResultManifest manifest)
47+
{
48+
if (manifest == null)
49+
{
50+
throw new ArgumentNullException(nameof(manifest));
51+
}
52+
53+
if (manifest.Format != "arrow_stream")
54+
{
55+
throw new ArgumentException(
56+
$"InlineReader only supports arrow_stream format, but received: {manifest.Format}",
57+
nameof(manifest));
58+
}
59+
60+
// Filter chunks that have attachment data
61+
_chunks = manifest.Chunks?
62+
.Where(c => c.Attachment != null && c.Attachment.Length > 0)
63+
.OrderBy(c => c.ChunkIndex)
64+
.ToList() ?? new List<ResultChunk>();
65+
66+
_currentChunkIndex = 0;
67+
}
68+
69+
/// <summary>
70+
/// Gets the Arrow schema for the result set.
71+
/// </summary>
72+
/// <exception cref="InvalidOperationException">Thrown when schema cannot be determined.</exception>
73+
public Schema Schema
74+
{
75+
get
76+
{
77+
ThrowIfDisposed();
78+
79+
if (_schema != null)
80+
{
81+
return _schema;
82+
}
83+
84+
// Extract schema from the first chunk
85+
if (_chunks.Count == 0)
86+
{
87+
throw new InvalidOperationException("No chunks with attachment data found in result manifest");
88+
}
89+
90+
// Create a reader for the first chunk to extract the schema
91+
var firstChunk = _chunks[0];
92+
using (var stream = new MemoryStream(firstChunk.Attachment!))
93+
using (var reader = new ArrowStreamReader(stream))
94+
{
95+
_schema = reader.Schema;
96+
}
97+
98+
return _schema;
99+
}
100+
}
101+
102+
/// <summary>
103+
/// Reads the next record batch from the inline result data.
104+
/// </summary>
105+
/// <param name="cancellationToken">The cancellation token.</param>
106+
/// <returns>The next record batch, or null if there are no more batches.</returns>
107+
public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
108+
{
109+
ThrowIfDisposed();
110+
111+
while (true)
112+
{
113+
// If we have a current reader, try to read the next batch
114+
if (_currentReader != null)
115+
{
116+
RecordBatch? batch = await _currentReader.ReadNextRecordBatchAsync(cancellationToken);
117+
if (batch != null)
118+
{
119+
return batch;
120+
}
121+
else
122+
{
123+
// Clean up the current reader
124+
_currentReader.Dispose();
125+
_currentReader = null;
126+
_currentChunkIndex++;
127+
}
128+
}
129+
130+
// If we don't have a current reader, move to the next chunk
131+
if (_currentChunkIndex >= _chunks.Count)
132+
{
133+
// No more chunks
134+
return null;
135+
}
136+
137+
// Create a reader for the current chunk
138+
var chunk = _chunks[_currentChunkIndex];
139+
if (chunk.Attachment == null || chunk.Attachment.Length == 0)
140+
{
141+
// Skip chunks without attachment data
142+
_currentChunkIndex++;
143+
continue;
144+
}
145+
146+
try
147+
{
148+
var stream = new MemoryStream(chunk.Attachment);
149+
_currentReader = new ArrowStreamReader(stream, leaveOpen: false);
150+
151+
// Ensure schema is set
152+
if (_schema == null)
153+
{
154+
_schema = _currentReader.Schema;
155+
}
156+
157+
// Continue to read the first batch from this chunk
158+
continue;
159+
}
160+
catch (Exception ex)
161+
{
162+
throw new InvalidOperationException(
163+
$"Failed to read Arrow stream from chunk {chunk.ChunkIndex}: {ex.Message}",
164+
ex);
165+
}
166+
}
167+
}
168+
169+
/// <summary>
170+
/// Disposes the reader and releases all resources.
171+
/// </summary>
172+
public void Dispose()
173+
{
174+
if (!_isDisposed)
175+
{
176+
if (_currentReader != null)
177+
{
178+
_currentReader.Dispose();
179+
_currentReader = null;
180+
}
181+
182+
_isDisposed = true;
183+
}
184+
}
185+
186+
private void ThrowIfDisposed()
187+
{
188+
if (_isDisposed)
189+
{
190+
throw new ObjectDisposedException(nameof(InlineReader));
191+
}
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)