Skip to content

Commit 3a7d15d

Browse files
Jade Wangclaude
andcommitted
fix(csharp): update StatementExecutionStatement to use protocol-agnostic CloudFetch pattern
Update CreateExternalLinksReader to use the protocol-agnostic CloudFetch pattern: - Use StatementExecutionResultFetcher with 3-parameter constructor (no manager resources) - Use CloudFetchConfiguration.FromProperties to parse config - Use CloudFetchDownloadManager with config (calls Initialize() on fetcher) - Use CloudFetchReader with ITracingStatement and nullable IResponse Also fix StatementExecutionResultFetcher constructor that was reverted during conflict resolution: - Remove memoryManager and downloadQueue parameters - Pass null to base constructor for late initialization - Resources will be injected via Initialize() by CloudFetchDownloadManager This completes the CloudFetch REST API integration using the unified protocol-agnostic pattern. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 3220adb commit 3a7d15d

File tree

2 files changed

+27
-77
lines changed

2 files changed

+27
-77
lines changed

csharp/src/Reader/CloudFetch/StatementExecutionResultFetcher.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,17 @@ internal class StatementExecutionResultFetcher : BaseResultFetcher
4545

4646
/// <summary>
4747
/// Initializes a new instance of the <see cref="StatementExecutionResultFetcher"/> class.
48+
/// Resources (memoryManager, downloadQueue) will be initialized by CloudFetchDownloadManager
49+
/// via the Initialize() method.
4850
/// </summary>
4951
/// <param name="client">The Statement Execution API client.</param>
5052
/// <param name="statementId">The statement ID for fetching results.</param>
5153
/// <param name="manifest">The result manifest containing chunk information.</param>
52-
/// <param name="memoryManager">The memory buffer manager.</param>
53-
/// <param name="downloadQueue">The queue to add download tasks to.</param>
5454
public StatementExecutionResultFetcher(
5555
IStatementExecutionClient client,
5656
string statementId,
57-
ResultManifest manifest,
58-
ICloudFetchMemoryBufferManager memoryManager,
59-
BlockingCollection<IDownloadResult> downloadQueue)
60-
: base(memoryManager, downloadQueue)
57+
ResultManifest manifest)
58+
: base(null, null) // Resources will be injected via Initialize()
6159
{
6260
_client = client ?? throw new ArgumentNullException(nameof(client));
6361
_statementId = statementId ?? throw new ArgumentNullException(nameof(statementId));

csharp/src/StatementExecution/StatementExecutionStatement.cs

Lines changed: 23 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -365,84 +365,36 @@ private IArrowArrayStream CreateExternalLinksReader(GetStatementResponse respons
365365
// Determine compression
366366
bool isLz4Compressed = response.Manifest.ResultCompression?.Equals("lz4", StringComparison.OrdinalIgnoreCase) == true;
367367

368-
// Create memory manager
369-
int memoryBufferSizeMB = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMemoryBufferSize, "200"));
370-
var memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB);
371-
372-
// Create download and result queues
373-
var downloadQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
374-
var resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
375-
376-
// If Result field has external links, add them to the download queue first
377-
// (Result contains the first chunk, Manifest may not include it for large results)
378-
if (response.Result?.ExternalLinks != null && response.Result.ExternalLinks.Any())
379-
{
380-
foreach (var link in response.Result.ExternalLinks)
381-
{
382-
var expirationTime = DateTime.UtcNow.AddHours(1);
383-
if (!string.IsNullOrEmpty(link.Expiration))
384-
{
385-
try
386-
{
387-
expirationTime = DateTime.Parse(link.Expiration, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.RoundtripKind);
388-
}
389-
catch (FormatException) { }
390-
}
391-
392-
var downloadResult = new DownloadResult(
393-
chunkIndex: link.ChunkIndex,
394-
fileUrl: link.ExternalLinkUrl,
395-
startRowOffset: link.RowOffset,
396-
rowCount: link.RowCount,
397-
byteCount: link.ByteCount,
398-
expirationTime: expirationTime,
399-
memoryManager: memoryManager,
400-
httpHeaders: link.HttpHeaders);
401-
402-
downloadQueue.Add(downloadResult);
403-
}
404-
}
405-
406-
// Create result fetcher
368+
// 1. Create REST-specific result fetcher
369+
// Resources (memory manager, download queue) will be initialized by CloudFetchDownloadManager
407370
var resultFetcher = new StatementExecutionResultFetcher(
408371
_client,
409372
response.StatementId,
410-
response.Manifest,
411-
memoryManager,
412-
downloadQueue);
413-
414-
// Create downloader with correct parameters
415-
int parallelDownloads = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchParallelDownloads, "3"));
416-
int maxRetries = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMaxRetries, "3"));
417-
int retryDelayMs = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchRetryDelayMs, "500"));
418-
int urlExpirationBufferSeconds = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, "60"));
419-
int maxUrlRefreshAttempts = int.Parse(GetPropertyOrDefault(DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, "3"));
420-
421-
var downloader = new CloudFetchDownloader(
422-
this, // Pass this as ITracingStatement
423-
downloadQueue,
424-
resultQueue,
425-
memoryManager,
426-
_httpClient,
427-
resultFetcher,
428-
parallelDownloads,
429-
isLz4Compressed,
430-
maxRetries,
431-
retryDelayMs,
432-
maxUrlRefreshAttempts,
433-
urlExpirationBufferSeconds);
434-
435-
// Create download manager using test constructor (for REST API)
436-
var downloadManager = new CloudFetchDownloadManager(
373+
response.Manifest);
374+
375+
// 2. Parse configuration from REST properties (unified properties work for both Thrift and REST)
376+
var config = CloudFetchConfiguration.FromProperties(
377+
_properties,
437378
schema,
438-
resultFetcher,
439-
downloader);
379+
isLz4Compressed);
440380

441-
// Start the download manager
381+
// 3. Create protocol-agnostic download manager
382+
// Manager creates shared resources and calls Initialize() on the fetcher
383+
var downloadManager = new CloudFetchDownloadManager(
384+
resultFetcher, // Protocol-specific fetcher
385+
_httpClient,
386+
config,
387+
this); // ITracingStatement for tracing
388+
389+
// 4. Start the manager
442390
downloadManager.StartAsync().GetAwaiter().GetResult();
443391

444-
// Create and return a simple reader that uses the download manager
445-
return new SimpleCloudFetchReader(downloadManager, response.Manifest.ResultCompression, schema);
392+
// 5. Create protocol-agnostic reader
393+
return new CloudFetchReader(
394+
this, // ITracingStatement (both Thrift and REST implement this)
395+
schema,
396+
null, // IResponse (REST doesn't use IResponse)
397+
downloadManager);
446398
}
447399

448400
/// <summary>

0 commit comments

Comments
 (0)