Skip to content

Commit 8ec57f0

Browse files
fix(csharp): always send CloseOperation from DatabricksCompositeReader.Dispose (#360)
## Problem `DatabricksCompositeReader.Dispose` only called `CloseOperation` when `_activeReader` was `null`. When a `CloudFetchReader` was active, it delegated `Dispose` to the reader — but `CloudFetchReader` is protocol-agnostic (it downloads from cloud storage over HTTP) and never sends `CloseOperation` to the Thrift endpoint. **Result:** every CloudFetch query orphaned the server-side operation for ~1 hour, until SQL Gateway's `driver-router` detected inactivity and fired `CommandInactivityTimeout`. This produced `thriftOperationCloseReason=CommandInactivityTimeout` in usage logs for all CloudFetch queries in connection-pooled scenarios (where `CloseSession` is never sent). ## Root cause The bug was introduced in commit `0039ee3` — *"refactor(csharp): make CloudFetch pipeline protocol-agnostic (#14)"*. Before that commit: - `BaseDatabricksReader` had a public `CloseOperationAsync()` method that sent the Thrift RPC - `DatabricksCompositeReader.Dispose` called `_activeReader.CloseOperationAsync()` — correct for both `DatabricksReader` and `CloudFetchReader` (both inherited from `BaseDatabricksReader`) The protocol-agnostic refactor made three changes that together caused the regression: 1. **Removed** `CloseOperationAsync()` from `BaseDatabricksReader` 2. Changed `DatabricksCompositeReader.Dispose` to call `_activeReader.Dispose()` instead of `_activeReader.CloseOperationAsync()`, with the comment *"Have the contained reader close the operation to avoid duplicate calls"* 3. Made `CloudFetchReader` protocol-agnostic (no Thrift dependency) — its `Dispose` only cleans up HTTP/download resources and never sends `CloseOperation` `DatabricksReader` got its own `CloseOperationAsync()` called from `Dispose`, so inline results remained correct. But the CloudFetch path silently lost its cleanup. ## Fix Move `CloseOperation` ownership entirely to `DatabricksCompositeReader.Dispose`, which holds both `_statement` (Thrift client) and `_response` (operation handle). `HiveServer2Reader.CloseOperationAsync` already handles the DirectResults case correctly — it is a no-op when the server already closed the operation inline. Remove `CloseOperation` from `DatabricksReader.Dispose` to avoid a duplicate call; `DatabricksReader` is only ever constructed from `DatabricksCompositeReader.CreateDatabricksReader`. ## Behavior after fix | Result delivery path | CloseOperation sent? | |---|---| | Inline + DirectResults enabled | No (server closed inline — `CloseOperationAsync` is a no-op) | | Inline + DirectResults disabled | Yes (explicit Thrift RPC from composite reader) | | CloudFetch | Yes (explicit Thrift RPC from composite reader — was previously missing) | ## Testing Validated with 4 new proxy-based regression tests in `databricks/databricks-driver-test` (CLOUDFETCH-013 through 016) that simulate connection pooling (reader disposed without closing the connection) and assert the correct number of `CloseOperation` Thrift calls for each path.
1 parent d4083ce commit 8ec57f0

File tree

3 files changed

+198
-12
lines changed

3 files changed

+198
-12
lines changed

csharp/src/Reader/DatabricksCompositeReader.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,18 +237,17 @@ protected override void Dispose(bool disposing)
237237
{
238238
activity?.AddEvent("composite_reader.disposing");
239239
StopOperationStatusPoller();
240-
if (_activeReader == null)
241-
{
242-
activity?.AddEvent("composite_reader.close_operation_no_reader");
243-
_ = HiveServer2Reader.CloseOperationAsync(_statement, _response)
244-
.ConfigureAwait(false).GetAwaiter().GetResult();
245-
}
246-
else
240+
// Always close the operation here at the composite level.
241+
// CloudFetchReader is protocol-agnostic and does not send CloseOperation,
242+
// so we must not rely on the contained reader to do it.
243+
activity?.AddEvent("composite_reader.close_operation");
244+
_ = HiveServer2Reader.CloseOperationAsync(_statement, _response)
245+
.ConfigureAwait(false).GetAwaiter().GetResult();
246+
if (_activeReader != null)
247247
{
248248
activity?.AddEvent("composite_reader.disposing_active_reader", [
249249
new("reader_type", _activeReader.GetType().Name)
250250
]);
251-
// Note: Have the contained reader close the operation to avoid duplicate calls.
252251
_activeReader.Dispose();
253252
_activeReader = null;
254253
}

csharp/src/Reader/DatabricksReader.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,6 @@ _ when ex.GetType().Name.Contains("LZ4") => $"Batch {this.index}: LZ4 decompress
241241

242242
protected override void Dispose(bool disposing)
243243
{
244-
if (disposing)
245-
{
246-
_ = CloseOperationAsync().Result;
247-
}
248244
base.Dispose(disposing);
249245
}
250246

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Diagnostics;
20+
using System.Linq;
21+
using System.Threading.Tasks;
22+
using Apache.Arrow;
23+
using Apache.Arrow.Adbc;
24+
using Apache.Arrow.Adbc.Tests;
25+
using Xunit;
26+
using Xunit.Abstractions;
27+
28+
namespace AdbcDrivers.Databricks.Tests
29+
{
30+
/// <summary>
31+
/// E2E regression tests for CloseOperation behavior in DatabricksCompositeReader.
32+
///
33+
/// Validates that the driver executes the CloseOperation code path in Dispose across
34+
/// all three result delivery modes when the reader is disposed without closing the
35+
/// connection (simulating connection pooling).
36+
///
37+
/// Uses ActivityListener to capture the composite_reader.close_operation trace event
38+
/// emitted inside DatabricksCompositeReader.Dispose. This event is only present after
39+
/// the fix; without it, CloudFetch operations are orphaned server-side for ~1 hour,
40+
/// producing thriftOperationCloseReason=CommandInactivityTimeout.
41+
///
42+
/// Bug root cause: refactor(csharp): make CloudFetch pipeline protocol-agnostic (#14)
43+
/// removed CloseOperationAsync() from BaseDatabricksReader and changed
44+
/// DatabricksCompositeReader.Dispose to call _activeReader.Dispose() instead.
45+
/// CloudFetchReader.Dispose() is protocol-agnostic and never sends CloseOperation.
46+
/// </summary>
47+
public class CloseOperationE2ETest : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>, IDisposable
48+
{
49+
private readonly List<(string ActivityName, string EventName)> _capturedEvents = new();
50+
private readonly object _capturedEventsLock = new();
51+
private readonly ActivityListener _activityListener;
52+
private bool _disposed;
53+
54+
public CloseOperationE2ETest(ITestOutputHelper? outputHelper)
55+
: base(outputHelper, new DatabricksTestEnvironment.Factory())
56+
{
57+
Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
58+
59+
_activityListener = new ActivityListener
60+
{
61+
ShouldListenTo = source => source.Name == "AdbcDrivers.Databricks",
62+
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
63+
ActivityStopped = activity =>
64+
{
65+
lock (_capturedEventsLock)
66+
{
67+
foreach (var evt in activity.Events)
68+
{
69+
_capturedEvents.Add((activity.OperationName, evt.Name));
70+
}
71+
}
72+
}
73+
};
74+
ActivitySource.AddActivityListener(_activityListener);
75+
}
76+
77+
protected override void Dispose(bool disposing)
78+
{
79+
if (!_disposed)
80+
{
81+
if (disposing)
82+
{
83+
_activityListener.Dispose();
84+
}
85+
_disposed = true;
86+
}
87+
base.Dispose(disposing);
88+
}
89+
90+
/// <summary>
91+
/// Test cases: (description, query, useCloudFetch, enableDirectResults)
92+
/// </summary>
93+
public static IEnumerable<object[]> TestCases() =>
94+
[
95+
// Scenario 1: Inline + DirectResults enabled.
96+
// Server closes the operation inline (DirectResults.CloseOperation).
97+
// DatabricksCompositeReader.Dispose still calls CloseOperationAsync which is a no-op,
98+
// but the composite_reader.close_operation trace event must be emitted to confirm
99+
// the correct code path was reached.
100+
new object[] { "Inline+DirectResults", "SELECT 1 AS val", false, true },
101+
102+
// Scenario 2: Inline + DirectResults disabled.
103+
// Server does NOT close inline; driver must send explicit CloseOperation on reader
104+
// dispose. composite_reader.close_operation event confirms the code path was reached.
105+
new object[] { "Inline+NoDirectResults", "SELECT * FROM range(1, 100)", false, false },
106+
107+
// Scenario 3: CloudFetch.
108+
// CloudFetchReader is protocol-agnostic and never sends CloseOperation.
109+
// DatabricksCompositeReader.Dispose must own the cleanup.
110+
// Without the fix, composite_reader.close_operation is never emitted for this path.
111+
new object[] { "CloudFetch", "SELECT * FROM main.tpcds_sf100_delta.store_sales LIMIT 1000000", true, true },
112+
];
113+
114+
/// <summary>
115+
/// Validates that DatabricksCompositeReader.Dispose emits the composite_reader.close_operation
116+
/// trace event for all result delivery modes when the reader is disposed without closing
117+
/// the underlying connection (simulating connection pooling).
118+
///
119+
/// The composite_reader.close_operation event is only present in the code path introduced
120+
/// by the fix. Without the fix:
121+
/// - Inline (DirectResults or not): event missing because _activeReader != null caused
122+
/// the old code to delegate to _activeReader.Dispose() instead.
123+
/// - CloudFetch: same delegation, but CloseOperation is never sent at all, orphaning
124+
/// the server operation for ~1 hour.
125+
///
126+
/// For Thrift wire-level assertions, see proxy-based tests in databricks-driver-test:
127+
/// CLOUDFETCH-013 through CLOUDFETCH-016.
128+
/// </summary>
129+
[Theory]
130+
[MemberData(nameof(TestCases))]
131+
public async Task DisposeEmitsCloseOperationEvent(string description, string query, bool useCloudFetch, bool enableDirectResults)
132+
{
133+
lock (_capturedEventsLock) { _capturedEvents.Clear(); }
134+
135+
var parameters = new Dictionary<string, string>
136+
{
137+
[DatabricksParameters.Protocol] = "thrift",
138+
[DatabricksParameters.UseCloudFetch] = useCloudFetch.ToString(),
139+
[DatabricksParameters.EnableDirectResults] = enableDirectResults.ToString(),
140+
};
141+
142+
// Keep connection alive without disposing — simulates a connection pool.
143+
// In a pool CloseSession is never sent, so CloseOperation is the only mechanism
144+
// that releases the server-side operation promptly.
145+
var connection = NewConnection(TestConfiguration, parameters);
146+
try
147+
{
148+
var statement = connection.CreateStatement();
149+
statement.SqlQuery = query;
150+
var result = await statement.ExecuteQueryAsync();
151+
152+
long totalRows = 0;
153+
using (var reader = result.Stream!)
154+
{
155+
RecordBatch? batch;
156+
while ((batch = await reader.ReadNextRecordBatchAsync()) != null)
157+
{
158+
totalRows += batch.Length;
159+
}
160+
}
161+
// reader.Dispose() called here — DatabricksCompositeReader.Dispose runs.
162+
statement.Dispose();
163+
164+
OutputHelper?.WriteLine($"[{description}] Read {totalRows} rows, reader disposed.");
165+
166+
// Collect the events emitted by DatabricksCompositeReader.Dispose.
167+
List<string> disposeEvents;
168+
lock (_capturedEventsLock)
169+
{
170+
disposeEvents = _capturedEvents
171+
.Where(e => e.ActivityName == "DatabricksCompositeReader.Dispose")
172+
.Select(e => e.EventName)
173+
.ToList();
174+
}
175+
176+
OutputHelper?.WriteLine($"[{description}] Dispose events: [{string.Join(", ", disposeEvents)}]");
177+
178+
// The composite_reader.close_operation event is only present after the fix.
179+
// Without it, the CloudFetch path silently skips CloseOperation entirely.
180+
Assert.True(disposeEvents.Contains("composite_reader.close_operation"),
181+
$"[{description}] composite_reader.close_operation event not found in " +
182+
$"DatabricksCompositeReader.Dispose. Without the fix, server operations are " +
183+
$"orphaned until SQL Gateway closes them with CommandInactivityTimeout (~1 hour).");
184+
}
185+
finally
186+
{
187+
connection.Dispose();
188+
}
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)