Skip to content

Commit d419680

Browse files
Jade Wangclaude
andcommitted
fix(csharp): update StatementExecutionStatement to use protocol-agnostic CloudFetch pattern
Update CreateExternalLinksReader to use the protocol-agnostic CloudFetch pattern: - Use StatementExecutionResultFetcher with 3-parameter constructor (no manager resources) - Use CloudFetchConfiguration.FromProperties to parse config - Use CloudFetchDownloadManager with config (calls Initialize() on fetcher) - Use CloudFetchReader with ITracingStatement and nullable IResponse Also fix StatementExecutionResultFetcher constructor that was reverted during conflict resolution: - Remove memoryManager and downloadQueue parameters - Pass null to base constructor for late initialization - Resources will be injected via Initialize() by CloudFetchDownloadManager This completes the CloudFetch REST API integration using the unified protocol-agnostic pattern. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 0d5c6cf commit d419680

File tree

2 files changed

+26
-69
lines changed

2 files changed

+26
-69
lines changed

csharp/src/Reader/CloudFetch/StatementExecutionResultFetcher.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ internal class StatementExecutionResultFetcher : BaseResultFetcher
4545

4646
/// <summary>
4747
/// Initializes a new instance of the <see cref="StatementExecutionResultFetcher"/> class.
48+
/// Resources (memoryManager, downloadQueue) will be initialized by CloudFetchDownloadManager
49+
/// via the Initialize() method.
4850
/// </summary>
4951
/// <param name="client">The Statement Execution API client.</param>
5052
/// <param name="statementId">The statement ID for fetching results.</param>
@@ -54,13 +56,12 @@ public StatementExecutionResultFetcher(
5456
string statementId,
5557
GetStatementResponse initialResponse)
5658
: base(null, null) // Resources will be injected via Initialize()
59+
/// <param name="manifest">The result manifest containing chunk information.</param>
5760
public StatementExecutionResultFetcher(
5861
IStatementExecutionClient client,
5962
string statementId,
60-
ResultManifest manifest,
61-
ICloudFetchMemoryBufferManager memoryManager,
62-
BlockingCollection<IDownloadResult> downloadQueue)
63-
: base(memoryManager, downloadQueue)
63+
ResultManifest manifest)
64+
: base(null, null) // Resources will be injected via Initialize()
6465
{
6566
_client = client ?? throw new ArgumentNullException(nameof(client));
6667
_statementId = statementId ?? throw new ArgumentNullException(nameof(statementId));

csharp/src/StatementExecution/StatementExecutionStatement.cs

Lines changed: 21 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -366,45 +366,8 @@ private IArrowArrayStream CreateExternalLinksReader(GetStatementResponse respons
366366
// Determine compression
367367
bool isLz4Compressed = response.Manifest.ResultCompression?.Equals("lz4", StringComparison.OrdinalIgnoreCase) == true;
368368

369-
// Create memory manager
370-
int memoryBufferSizeMB = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMemoryBufferSize, "200"));
371-
var memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB);
372-
373-
// Create download and result queues
374-
var downloadQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
375-
var resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
376-
377-
// If Result field has external links, add them to the download queue first
378-
// (Result contains the first chunk, Manifest may not include it for large results)
379-
if (response.Result?.ExternalLinks != null && response.Result.ExternalLinks.Any())
380-
{
381-
foreach (var link in response.Result.ExternalLinks)
382-
{
383-
var expirationTime = DateTime.UtcNow.AddHours(1);
384-
if (!string.IsNullOrEmpty(link.Expiration))
385-
{
386-
try
387-
{
388-
expirationTime = DateTime.Parse(link.Expiration, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.RoundtripKind);
389-
}
390-
catch (FormatException) { }
391-
}
392-
393-
var downloadResult = new DownloadResult(
394-
chunkIndex: link.ChunkIndex,
395-
fileUrl: link.ExternalLinkUrl,
396-
startRowOffset: link.RowOffset,
397-
rowCount: link.RowCount,
398-
byteCount: link.ByteCount,
399-
expirationTime: expirationTime,
400-
memoryManager: memoryManager,
401-
httpHeaders: link.HttpHeaders);
402-
403-
downloadQueue.Add(downloadResult);
404-
}
405-
}
406-
407-
// Create result fetcher
369+
// 1. Create REST-specific result fetcher
370+
// Resources (memory manager, download queue) will be initialized by CloudFetchDownloadManager
408371
var resultFetcher = new StatementExecutionResultFetcher(
409372
_client,
410373
response.StatementId,
@@ -467,37 +430,30 @@ private IArrowArrayStream CreateExternalLinksReader(GetStatementResponse respons
467430
memoryManager,
468431
downloadQueue);
469432

470-
// Create downloader with correct parameters
471-
int parallelDownloads = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchParallelDownloads, "3"));
472-
int maxRetries = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMaxRetries, "3"));
473-
int retryDelayMs = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchRetryDelayMs, "500"));
474-
int urlExpirationBufferSeconds = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, "60"));
475-
int maxUrlRefreshAttempts = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, "3"));
476-
477-
var downloader = new CloudFetchDownloader(
478-
this, // Pass this as ITracingStatement
479-
downloadQueue,
480-
resultQueue,
481-
memoryManager,
482-
_httpClient,
483-
resultFetcher,
484-
parallelDownloads,
485-
isLz4Compressed,
486-
maxRetries,
487-
retryDelayMs,
488-
maxUrlRefreshAttempts,
489-
urlExpirationBufferSeconds);
490-
491-
// Create download manager using test constructor (for REST API)
492-
var downloadManager = new CloudFetchDownloadManager(
433+
// 2. Parse configuration from REST properties (unified properties work for both Thrift and REST)
434+
var config = CloudFetchConfiguration.FromProperties(
435+
_properties,
493436
schema,
494-
resultFetcher,
495-
downloader);
437+
isLz4Compressed);
438+
439+
// 3. Create protocol-agnostic download manager
440+
// Manager creates shared resources and calls Initialize() on the fetcher
441+
var downloadManager = new CloudFetchDownloadManager(
442+
resultFetcher, // Protocol-specific fetcher
443+
_httpClient,
444+
config,
445+
this); // ITracingStatement for tracing
496446

497-
// Start the download manager
447+
// 4. Start the manager
498448
downloadManager.StartAsync().GetAwaiter().GetResult();
499449

500450
// Create and return a simple reader that uses the download manager
451+
// 5. Create protocol-agnostic reader
452+
return new CloudFetchReader(
453+
this, // ITracingStatement (both Thrift and REST implement this)
454+
schema,
455+
null, // IResponse (REST doesn't use IResponse)
456+
downloadManager);
501457
}
502458

503459
/// <summary>

0 commit comments

Comments
 (0)