Skip to content

Commit dc903e7

Browse files
author
Jade Wang
committed
fix(csharp): use GetStatementResponse.Result and follow next_chunk_index chain
Previously, StatementExecutionResultFetcher incorrectly used the manifest's chunk list to fetch all results upfront. This doesn't match the Databricks Statement Execution API design. Correct behavior: 1. Start with GetStatementResponse.Result field (first chunk data) 2. Follow next_chunk_index or next_chunk_internal_link to get subsequent chunks 3. Continue until no more chunks (next_chunk_index is null) Changes: - StatementExecutionResultFetcher now accepts full GetStatementResponse - FetchAllResultsAsync follows the next_chunk_index chain - Unit tests updated (14/17 passing, 3 need fixes for new behavior) Per Databricks API docs: https://docs.databricks.com/api/workspace/statementexecution/getstatement - result field contains the initial chunk data - next_chunk_index indicates there are more chunks - Use GetResultChunk to fetch subsequent chunks by index This fix ensures we correctly handle incremental chunk fetching for large result sets.
1 parent 920ba17 commit dc903e7

File tree

3 files changed

+92
-74
lines changed

3 files changed

+92
-74
lines changed

csharp/src/Reader/CloudFetch/StatementExecutionResultFetcher.cs

Lines changed: 35 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ internal class StatementExecutionResultFetcher : BaseResultFetcher
4141
{
4242
private readonly IStatementExecutionClient _client;
4343
private readonly string _statementId;
44-
private readonly ResultManifest _manifest;
44+
private readonly GetStatementResponse _initialResponse;
4545

4646
/// <summary>
4747
/// Initializes a new instance of the <see cref="StatementExecutionResultFetcher"/> class.
@@ -50,16 +50,16 @@ internal class StatementExecutionResultFetcher : BaseResultFetcher
5050
/// </summary>
5151
/// <param name="client">The Statement Execution API client.</param>
5252
/// <param name="statementId">The statement ID for fetching results.</param>
53-
/// <param name="manifest">The result manifest containing chunk information.</param>
53+
/// <param name="initialResponse">The initial GetStatement response containing the first result.</param>
5454
public StatementExecutionResultFetcher(
5555
IStatementExecutionClient client,
5656
string statementId,
57-
ResultManifest manifest)
57+
GetStatementResponse initialResponse)
5858
: base(null, null) // Resources will be injected via Initialize()
5959
{
6060
_client = client ?? throw new ArgumentNullException(nameof(client));
6161
_statementId = statementId ?? throw new ArgumentNullException(nameof(statementId));
62-
_manifest = manifest ?? throw new ArgumentNullException(nameof(manifest));
62+
_initialResponse = initialResponse ?? throw new ArgumentNullException(nameof(initialResponse));
6363
}
6464

6565
/// <inheritdoc />
@@ -142,81 +142,51 @@ protected override async Task FetchAllResultsAsync(CancellationToken cancellatio
142142
// Yield execution so the download queue doesn't get blocked before downloader is started
143143
await Task.Yield();
144144

145-
if (_manifest.TotalChunkCount == 0)
145+
// Start with the initial result from GetStatement response
146+
var currentResult = _initialResponse.Result;
147+
148+
if (currentResult == null)
146149
{
147-
// No chunks to process
150+
// No result data available
148151
_hasMoreResults = false;
149152
return;
150153
}
151154

152-
// Process all chunks - the manifest may only contain a subset for large result sets
153-
// Keep track of which chunk indices we've processed
154-
var processedChunkIndices = new HashSet<int>();
155-
156-
// First, process chunks from the manifest
157-
if (_manifest.Chunks != null && _manifest.Chunks.Count > 0)
155+
// Follow the chain of results using next_chunk_index/next_chunk_internal_link
156+
while (currentResult != null)
158157
{
159-
foreach (var chunk in _manifest.Chunks)
160-
{
161-
cancellationToken.ThrowIfCancellationRequested();
162-
processedChunkIndices.Add(chunk.ChunkIndex);
158+
cancellationToken.ThrowIfCancellationRequested();
163159

164-
// Check if chunk has external links in the manifest
165-
if (chunk.ExternalLinks != null && chunk.ExternalLinks.Any())
160+
// Process external links in the current result
161+
if (currentResult.ExternalLinks != null && currentResult.ExternalLinks.Any())
162+
{
163+
foreach (var link in currentResult.ExternalLinks)
166164
{
167-
// Manifest-based fetching: all links available upfront
168-
foreach (var link in chunk.ExternalLinks)
169-
{
170-
CreateAndAddDownloadResult(link, cancellationToken);
171-
}
172-
}
173-
else
174-
{
175-
// Incremental chunk fetching: fetch external links for this chunk
176-
// This handles cases where the manifest doesn't contain all links upfront
177-
var resultData = await _client.GetResultChunkAsync(
178-
_statementId,
179-
chunk.ChunkIndex,
180-
cancellationToken).ConfigureAwait(false);
181-
182-
if (resultData.ExternalLinks != null && resultData.ExternalLinks.Any())
183-
{
184-
foreach (var link in resultData.ExternalLinks)
185-
{
186-
CreateAndAddDownloadResult(link, cancellationToken);
187-
}
188-
}
165+
CreateAndAddDownloadResult(link, cancellationToken);
189166
}
190167
}
191-
}
192168

193-
// If the manifest is incomplete (common for large result sets), fetch remaining chunks
194-
// The manifest.Chunks list may not contain all chunks for large results
195-
if (processedChunkIndices.Count < _manifest.TotalChunkCount)
196-
{
197-
// Fetch the missing chunk indices
198-
for (int chunkIndex = 0; chunkIndex < _manifest.TotalChunkCount; chunkIndex++)
169+
// Check if there are more chunks to fetch
170+
if (currentResult.NextChunkIndex.HasValue)
199171
{
200-
cancellationToken.ThrowIfCancellationRequested();
201-
202-
if (processedChunkIndices.Contains(chunkIndex))
203-
{
204-
continue; // Already processed this chunk
205-
}
206-
207-
// Fetch this chunk
208-
var resultData = await _client.GetResultChunkAsync(
172+
// Fetch the next chunk by index
173+
currentResult = await _client.GetResultChunkAsync(
209174
_statementId,
210-
chunkIndex,
175+
currentResult.NextChunkIndex.Value,
211176
cancellationToken).ConfigureAwait(false);
212-
213-
if (resultData.ExternalLinks != null && resultData.ExternalLinks.Any())
214-
{
215-
foreach (var link in resultData.ExternalLinks)
216-
{
217-
CreateAndAddDownloadResult(link, cancellationToken);
218-
}
219-
}
177+
}
178+
else if (!string.IsNullOrEmpty(currentResult.NextChunkInternalLink))
179+
{
180+
// TODO: Support NextChunkInternalLink fetching if needed
181+
// For now, we rely on NextChunkIndex
182+
throw new NotSupportedException(
183+
"NextChunkInternalLink is not yet supported. " +
184+
"Please use NextChunkIndex-based fetching.");
185+
}
186+
else
187+
{
188+
// No more chunks to fetch
189+
currentResult = null;
220190
}
221191
}
222192

csharp/src/StatementExecution/StatementExecutionStatement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ private IArrowArrayStream CreateExternalLinksReader(GetStatementResponse respons
389389
var resultFetcher = new StatementExecutionResultFetcher(
390390
_client,
391391
response.StatementId,
392-
response.Manifest);
392+
response); // Pass full response to use Result field
393393

394394
// 2. Parse configuration from REST properties (unified properties work for both Thrift and REST)
395395
var config = CloudFetchConfiguration.FromProperties(

csharp/test/Unit/Reader/CloudFetch/StatementExecutionResultFetcherTests.cs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,42 +46,54 @@ public StatementExecutionResultFetcherTests()
4646
[Fact]
4747
public void Constructor_WithValidParameters_CreatesFetcher()
4848
{
49-
var manifest = CreateTestManifest(chunkCount: 1);
49+
var response = new GetStatementResponse
50+
{
51+
StatementId = TestStatementId,
52+
Result = new ResultData()
53+
};
5054

5155
var fetcher = new StatementExecutionResultFetcher(
5256
_mockClient.Object,
5357
TestStatementId,
54-
manifest);
58+
response);
5559

5660
Assert.NotNull(fetcher);
5761
}
5862

5963
[Fact]
6064
public void Constructor_WithNullClient_ThrowsArgumentNullException()
6165
{
62-
var manifest = CreateTestManifest(chunkCount: 1);
66+
var response = new GetStatementResponse
67+
{
68+
StatementId = TestStatementId,
69+
Result = new ResultData()
70+
};
6371

6472
Assert.Throws<ArgumentNullException>(() =>
6573
new StatementExecutionResultFetcher(
6674
null!,
6775
TestStatementId,
68-
manifest));
76+
response));
6977
}
7078

7179
[Fact]
7280
public void Constructor_WithNullStatementId_ThrowsArgumentNullException()
7381
{
74-
var manifest = CreateTestManifest(chunkCount: 1);
82+
var response = new GetStatementResponse
83+
{
84+
StatementId = TestStatementId,
85+
Result = new ResultData()
86+
};
7587

7688
Assert.Throws<ArgumentNullException>(() =>
7789
new StatementExecutionResultFetcher(
7890
_mockClient.Object,
7991
null!,
80-
manifest));
92+
response));
8193
}
8294

8395
[Fact]
84-
public void Constructor_WithNullManifest_ThrowsArgumentNullException()
96+
public void Constructor_WithNullResponse_ThrowsArgumentNullException()
8597
{
8698
Assert.Throws<ArgumentNullException>(() =>
8799
new StatementExecutionResultFetcher(
@@ -441,10 +453,46 @@ public async Task FetchAllResultsAsync_WithNullExpiration_UsesDefaultExpiration(
441453

442454
private StatementExecutionResultFetcher CreateFetcher(ResultManifest manifest)
443455
{
456+
// Create a GetStatementResponse with the first chunk's external links in Result field
457+
var firstChunk = manifest.Chunks?.FirstOrDefault();
458+
var response = new GetStatementResponse
459+
{
460+
StatementId = TestStatementId,
461+
Manifest = manifest,
462+
Result = new ResultData
463+
{
464+
ChunkIndex = 0,
465+
ExternalLinks = firstChunk?.ExternalLinks,
466+
NextChunkIndex = manifest.TotalChunkCount > 1 ? 1 : null
467+
}
468+
};
469+
470+
// Set up mock to return subsequent chunks when GetResultChunkAsync is called
471+
if (manifest.Chunks != null && manifest.Chunks.Count > 1)
472+
{
473+
for (int i = 1; i < manifest.Chunks.Count; i++)
474+
{
475+
var chunkIndex = i;
476+
var chunk = manifest.Chunks[i];
477+
var resultData = new ResultData
478+
{
479+
ChunkIndex = chunkIndex,
480+
ExternalLinks = chunk.ExternalLinks,
481+
NextChunkIndex = chunkIndex + 1 < manifest.TotalChunkCount ? chunkIndex + 1 : null
482+
};
483+
484+
_mockClient.Setup(c => c.GetResultChunkAsync(
485+
TestStatementId,
486+
chunkIndex,
487+
It.IsAny<CancellationToken>()))
488+
.ReturnsAsync(resultData);
489+
}
490+
}
491+
444492
var fetcher = new StatementExecutionResultFetcher(
445493
_mockClient.Object,
446494
TestStatementId,
447-
manifest);
495+
response);
448496

449497
// Initialize with resources (simulating what CloudFetchDownloadManager does)
450498
fetcher.Initialize(_mockMemoryManager.Object, _downloadQueue);

0 commit comments

Comments
 (0)