Skip to content

Commit 332e145

Browse files
authored
feat(csharp): fix powerbi hang when reading cloudfetch result in Databricks driver (#2747)
### Summary This PR fixes an issue where PowerBI would hang when reading CloudFetch results and significantly improves the logging capabilities in the CloudFetch downloader component. ### Problem 1. The CloudFetchReader was not properly disposing of the download manager after completing downloads, causing resource leaks that led to PowerBI hanging. 2. The CloudFetchDownloader was using Debug.WriteLine for logging, which is inadequate for production scenarios and doesn't provide sufficient diagnostic information. ### Solution - Fixed resource management in CloudFetchReader by properly disposing the download manager after all files are processed - Replaced Debug.WriteLine calls with more comprehensive Trace logging - Added detailed performance metrics and diagnostics: - Download start/completion timestamps - File sizes and throughput calculations - Decompression metrics - Overall download statistics (total files, success/failure counts) - Added URL sanitization for secure logging - Added proper error tracking and reporting ### Testing - Enhanced CloudFetchE2ETest to verify that the reader properly completes after all data is consumed - Verified that PowerBI no longer hangs when reading CloudFetch results
1 parent aa6168f commit 332e145

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloader.cs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
5555
/// <param name="httpClient">The HTTP client to use for downloads.</param>
5656
/// <param name="maxParallelDownloads">The maximum number of parallel downloads.</param>
5757
/// <param name="isLz4Compressed">Whether the results are LZ4 compressed.</param>
58+
/// <param name="logger">The logger instance.</param>
5859
/// <param name="maxRetries">The maximum number of retry attempts.</param>
5960
/// <param name="retryDelayMs">The delay between retry attempts in milliseconds.</param>
6061
public CloudFetchDownloader(
@@ -184,6 +185,12 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
184185
{
185186
await Task.Yield();
186187

188+
int totalFiles = 0;
189+
int successfulDownloads = 0;
190+
int failedDownloads = 0;
191+
long totalBytes = 0;
192+
var overallStopwatch = Stopwatch.StartNew();
193+
187194
try
188195
{
189196
// Keep track of active download tasks
@@ -193,6 +200,8 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
193200
// Process items from the download queue until it's completed
194201
foreach (var downloadResult in _downloadQueue.GetConsumingEnumerable(cancellationToken))
195202
{
203+
totalFiles++;
204+
196205
// Check if there's an error before processing more downloads
197206
if (HasError)
198207
{
@@ -213,7 +222,7 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
213222
}
214223
catch (Exception ex)
215224
{
216-
Debug.WriteLine($"Error waiting for downloads to complete: {ex.Message}");
225+
Trace.TraceWarning($"Error waiting for downloads to complete: {ex.Message}");
217226
// Don't set error here, as individual download tasks will handle their own errors
218227
}
219228
}
@@ -245,17 +254,23 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
245254
if (t.IsFaulted)
246255
{
247256
Exception ex = t.Exception?.InnerException ?? new Exception("Unknown error");
248-
Debug.WriteLine($"Download failed: {ex.Message}");
257+
Trace.TraceError($"Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
249258

250259
// Set the download as failed
251260
downloadResult.SetFailed(ex);
261+
failedDownloads++;
252262

253263
// Set the error state to stop the download process
254264
SetError(ex);
255265

256266
// Signal that we should stop processing downloads
257267
downloadTaskCompletionSource.TrySetException(ex);
258268
}
269+
else if (!t.IsFaulted && !t.IsCanceled)
270+
{
271+
successfulDownloads++;
272+
totalBytes += downloadResult.Size;
273+
}
259274
}, cancellationToken);
260275

261276
// Add the task to the dictionary
@@ -274,14 +289,21 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
274289
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
275290
{
276291
// Expected when cancellation is requested
292+
Trace.TraceInformation("Download process was cancelled");
277293
}
278294
catch (Exception ex)
279295
{
280-
Debug.WriteLine($"Error in download loop: {ex.Message}");
296+
Trace.TraceError($"Error in download loop: {ex.Message}");
281297
SetError(ex);
282298
}
283299
finally
284300
{
301+
overallStopwatch.Stop();
302+
303+
Trace.TraceInformation(
304+
$"Download process completed. Total files: {totalFiles}, Successful: {successfulDownloads}, " +
305+
$"Failed: {failedDownloads}, Total size: {totalBytes / 1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds / 1000.0:F2} sec");
306+
285307
// If there's an error, add the error to the result queue
286308
if (HasError)
287309
{
@@ -293,11 +315,18 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
293315
private async Task DownloadFileAsync(IDownloadResult downloadResult, CancellationToken cancellationToken)
294316
{
295317
string url = downloadResult.Link.FileLink;
318+
string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
296319
byte[]? fileData = null;
297320

298321
// Use the size directly from the download result
299322
long size = downloadResult.Size;
300323

324+
// Create a stopwatch to track download time
325+
var stopwatch = Stopwatch.StartNew();
326+
327+
// Log download start
328+
Trace.TraceInformation($"Starting download of file {sanitizedUrl}, expected size: {size / 1024.0:F2} KB");
329+
301330
// Acquire memory before downloading
302331
await _memoryManager.AcquireMemoryAsync(size, cancellationToken).ConfigureAwait(false);
303332

@@ -318,7 +347,7 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
318347
long? contentLength = response.Content.Headers.ContentLength;
319348
if (contentLength.HasValue && contentLength.Value > 0)
320349
{
321-
Debug.WriteLine($"Downloading file of size: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
350+
Trace.TraceInformation($"Actual file size for {sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
322351
}
323352

324353
// Read the file data
@@ -328,36 +357,50 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
328357
catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested)
329358
{
330359
// Log the error and retry
331-
Debug.WriteLine($"Error downloading file (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
360+
Trace.TraceError($"Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
361+
332362
await Task.Delay(_retryDelayMs * (retry + 1), cancellationToken).ConfigureAwait(false);
333363
}
334364
}
335365

336366
if (fileData == null)
337367
{
368+
stopwatch.Stop();
369+
Trace.TraceError($"Failed to download file {sanitizedUrl} after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} ms");
370+
338371
// Release the memory we acquired
339372
_memoryManager.ReleaseMemory(size);
340373
throw new InvalidOperationException($"Failed to download file from {url} after {_maxRetries} attempts.");
341374
}
342375

343376
// Process the downloaded file data
344377
MemoryStream dataStream;
378+
long actualSize = fileData.Length;
345379

346380
// If the data is LZ4 compressed, decompress it
347381
if (_isLz4Compressed)
348382
{
349383
try
350384
{
385+
var decompressStopwatch = Stopwatch.StartNew();
351386
dataStream = new MemoryStream();
352387
using (var inputStream = new MemoryStream(fileData))
353388
using (var decompressor = LZ4Stream.Decode(inputStream))
354389
{
355390
await decompressor.CopyToAsync(dataStream, 81920, cancellationToken).ConfigureAwait(false);
356391
}
357392
dataStream.Position = 0;
393+
decompressStopwatch.Stop();
394+
395+
Trace.TraceInformation($"Decompressed file {sanitizedUrl} in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize / 1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");
396+
397+
actualSize = dataStream.Length;
358398
}
359399
catch (Exception ex)
360400
{
401+
stopwatch.Stop();
402+
Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms");
403+
361404
// Release the memory we acquired
362405
_memoryManager.ReleaseMemory(size);
363406
throw new InvalidOperationException($"Error decompressing data: {ex.Message}", ex);
@@ -368,6 +411,10 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
368411
dataStream = new MemoryStream(fileData);
369412
}
370413

414+
// Stop the stopwatch and log download completion
415+
stopwatch.Stop();
416+
Trace.TraceInformation($"Completed download of file {sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: {stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");
417+
371418
// Set the download as completed with the original size
372419
downloadResult.SetCompleted(dataStream, size);
373420
}
@@ -378,6 +425,7 @@ private void SetError(Exception ex)
378425
{
379426
if (_error == null)
380427
{
428+
Trace.TraceError($"Setting error state: {ex.Message}");
381429
_error = ex;
382430
}
383431
}
@@ -395,7 +443,22 @@ private void CompleteWithError()
395443
}
396444
catch (Exception ex)
397445
{
398-
Debug.WriteLine($"Error completing with error: {ex.Message}");
446+
Trace.TraceError($"Error completing with error: {ex.Message}");
447+
}
448+
}
449+
450+
// Helper method to sanitize URLs for logging (to avoid exposing sensitive information)
451+
private string SanitizeUrl(string url)
452+
{
453+
try
454+
{
455+
var uri = new Uri(url);
456+
return $"{uri.Scheme}://{uri.Host}/{Path.GetFileName(uri.LocalPath)}";
457+
}
458+
catch
459+
{
460+
// If URL parsing fails, return a generic identifier
461+
return "cloud-storage-url";
399462
}
400463
}
401464
}

csharp/src/Drivers/Databricks/CloudFetch/CloudFetchReader.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ internal sealed class CloudFetchReader : IArrowArrayStream
3636
{
3737
private readonly Schema schema;
3838
private readonly bool isLz4Compressed;
39-
private readonly ICloudFetchDownloadManager downloadManager;
39+
private ICloudFetchDownloadManager? downloadManager;
4040
private ArrowStreamReader? currentReader;
4141
private IDownloadResult? currentDownloadResult;
4242
private bool isPrefetchEnabled;
@@ -136,6 +136,8 @@ public CloudFetchReader(DatabricksStatement statement, Schema schema, bool isLz4
136136
this.currentDownloadResult = await this.downloadManager.GetNextDownloadedFileAsync(cancellationToken);
137137
if (this.currentDownloadResult == null)
138138
{
139+
this.downloadManager.Dispose();
140+
this.downloadManager = null;
139141
// No more files
140142
return null;
141143
}

csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ private async Task TestRealDatabricksCloudFetch(string query, int rowCount, bool
9090

9191
Assert.True(totalRows >= rowCount);
9292

93+
Assert.Null(await result.Stream.ReadNextRecordBatchAsync());
94+
9395
// Also log to the test output helper if available
9496
OutputHelper?.WriteLine($"Read {totalRows} rows from range function");
9597
}

0 commit comments

Comments
 (0)