Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 93 additions & 14 deletions csharp/src/Reader/DatabricksCompositeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ internal class DatabricksCompositeReader : TracingReader

private IOperationStatusPoller? operationStatusPoller;
private bool _disposed;
private bool _operationClosed;
private readonly HttpClient _httpClient;

/// <summary>
Expand Down Expand Up @@ -196,6 +197,13 @@ protected virtual BaseDatabricksReader CreateDatabricksReader(TFetchResultsResp

public override async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
// If we already closed the operation (previous call returned null), return null
// immediately to avoid re-initializing a reader on a closed operation.
if (_operationClosed)
{
return null;
}

return await this.TraceActivityAsync(async activity =>
{
if (_activeReader != null)
Expand All @@ -204,11 +212,17 @@ protected virtual BaseDatabricksReader CreateDatabricksReader(TFetchResultsResp
}

var result = await ReadNextRecordBatchInternalAsync(cancellationToken, activity);
// Stop the poller when we've reached the end of results
if (result == null)
{
activity?.AddEvent("composite_reader.end_of_results");
StopOperationStatusPoller();

// Close the server-side operation immediately when results are exhausted.
// Without this, the server command stays open until Dispose() is called.
// Power BI's M engine has no deterministic disposal — it relies on GC,
// which leaves commands open for ~22 min (CommandInactivityTimeout),
// blocking warehouse autostop.
CloseOperationBestEffort(activity);
}
else
{
Expand All @@ -220,6 +234,16 @@ protected virtual BaseDatabricksReader CreateDatabricksReader(TFetchResultsResp
});
}

/// <summary>
/// Destructor ensures server-side operations are cleaned up if Dispose() is never called.
/// This is a safety net for environments like Power BI's M engine where IDisposable
/// resources may be garbage-collected without explicit disposal.
/// </summary>
~DatabricksCompositeReader()
{
Dispose(disposing: false);
}

protected override void Dispose(bool disposing)
{
this.TraceActivity(activity =>
Expand All @@ -237,23 +261,37 @@ protected override void Dispose(bool disposing)
{
activity?.AddEvent("composite_reader.disposing");
StopOperationStatusPoller();
if (_activeReader == null)
{
activity?.AddEvent("composite_reader.close_operation_no_reader");
_ = HiveServer2Reader.CloseOperationAsync(_statement, _response)
.ConfigureAwait(false).GetAwaiter().GetResult();
}
else

// If the operation was already closed (e.g., on result exhaustion),
// skip the close to avoid sending a duplicate TCloseOperationReq.
if (!_operationClosed)
{
activity?.AddEvent("composite_reader.disposing_active_reader", [
new("reader_type", _activeReader.GetType().Name)
]);
// Note: Have the contained reader close the operation to avoid duplicate calls.
_activeReader.Dispose();
_activeReader = null;
if (_activeReader == null)
{
activity?.AddEvent("composite_reader.close_operation_no_reader");
_ = HiveServer2Reader.CloseOperationAsync(_statement, _response)
.ConfigureAwait(false).GetAwaiter().GetResult();
}
else
{
activity?.AddEvent("composite_reader.disposing_active_reader", [
new("reader_type", _activeReader.GetType().Name)
]);
// Note: Have the contained reader close the operation to avoid duplicate calls.
_activeReader.Dispose();
_activeReader = null;
}
_operationClosed = true;
}
activity?.AddEvent("composite_reader.disposed");
}
else
{
// Called from finalizer (disposing: false).
// Best-effort cleanup: stop the poller and attempt to close the operation.
// Managed objects may already be finalized, so catch all exceptions.
CloseOperationBestEffort(activity: null);
}
}
}
finally
Expand All @@ -264,6 +302,47 @@ protected override void Dispose(bool disposing)
}, activityName: nameof(DatabricksCompositeReader) + "." + nameof(Dispose));
}

/// <summary>
/// Closes the server-side operation on a best-effort basis, catching all exceptions.
/// Used when results are exhausted or from the finalizer — must never throw.
/// </summary>
private void CloseOperationBestEffort(Activity? activity)
{
if (_operationClosed)
{
return;
}

try
{
StopOperationStatusPoller();

if (_activeReader != null)
{
activity?.AddEvent("composite_reader.auto_close_active_reader", [
new("reader_type", _activeReader.GetType().Name)
]);
_activeReader.Dispose();
_activeReader = null;
}
else
{
activity?.AddEvent("composite_reader.auto_close_no_reader");
_ = HiveServer2Reader.CloseOperationAsync(_statement, _response)
.ConfigureAwait(false).GetAwaiter().GetResult();
}

_operationClosed = true;
}
catch (Exception)
{
// Best-effort: the close may fail if the connection is already dead
// (e.g., called from finalizer after managed objects are finalized).
// The server will eventually clean up via CommandInactivityTimeout.
_operationClosed = true;
}
}

private void StopOperationStatusPoller()
{
operationStatusPoller?.Stop();
Expand Down
Loading