Skip to content

Commit 9051acf

Browse files
authored
fix(csharp/src/Drivers/BigQuery): handle dispose of Statement before Stream (apache#3608)
This fix handles the possibility that the parent statement disposes before the created stream.
1 parent f6d5cfe commit 9051acf

File tree

2 files changed

+65
-8
lines changed

2 files changed

+65
-8
lines changed

csharp/src/Drivers/BigQuery/BigQueryStatement.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
106106
activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
107107
}
108108

109-
JobCancellationContext cancellationContext = new JobCancellationContext(cancellationRegistry, job);
109+
using JobCancellationContext cancellationContext = new JobCancellationContext(cancellationRegistry, job);
110+
110111
// We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
111112
// When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
112113
Func<Task<BigQueryResults>> getJobResults = async () =>
@@ -215,7 +216,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
215216
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
216217

217218
// Note: MultiArrowReader must dispose the cancellationContext.
218-
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, cancellationContext);
219+
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry));
219220
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows);
220221
return new QueryResult(totalRows, stream);
221222
});
@@ -641,20 +642,27 @@ public JobCancellationContext(CancellationRegistry cancellationRegistry, BigQuer
641642
private sealed class CancellationRegistry : IDisposable
642643
{
643644
private readonly ConcurrentDictionary<CancellationContext, byte> contexts = new();
645+
private bool disposed;
644646

645647
public CancellationContext Register(CancellationContext context)
646648
{
649+
if (disposed) throw new ObjectDisposedException(nameof(CancellationRegistry));
650+
647651
contexts.TryAdd(context, 0);
648652
return context;
649653
}
650654

651655
public bool Unregister(CancellationContext context)
652656
{
657+
if (disposed) return false;
658+
653659
return contexts.TryRemove(context, out _);
654660
}
655661

656662
public void CancelAll()
657663
{
664+
if (disposed) throw new ObjectDisposedException(nameof(CancellationRegistry));
665+
658666
foreach (CancellationContext context in contexts.Keys)
659667
{
660668
context.Cancel();
@@ -663,11 +671,11 @@ public void CancelAll()
663671

664672
public void Dispose()
665673
{
666-
foreach (CancellationContext context in contexts.Keys)
674+
if (!disposed)
667675
{
668-
context.Dispose();
676+
contexts.Clear();
677+
disposed = true;
669678
}
670-
contexts.Clear();
671679
}
672680
}
673681

@@ -706,6 +714,7 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable<
706714

707715
while (true)
708716
{
717+
linkedCts.Token.ThrowIfCancellationRequested();
709718
if (this.reader == null)
710719
{
711720
if (!this.readers.MoveNext())

csharp/test/Drivers/BigQuery/StatementTests.cs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,62 @@ public async Task CanCancelStatement()
114114
}
115115

116116
[Fact]
117-
public async Task CanCancelStreamFromStatement()
117+
public async Task CanCancelStreamAndDisposeStatement()
118118
{
119119
foreach (BigQueryTestEnvironment environment in _environments)
120120
{
121-
AdbcConnection adbcConnection = GetAdbcConnection(environment.Name);
121+
using AdbcConnection adbcConnection = GetAdbcConnection(environment.Name);
122122

123123
AdbcStatement statement = adbcConnection.CreateStatement();
124124

125+
// Execute the query/cancel multiple times to validate consistent behavior
126+
const int iterations = 3;
127+
QueryResult[] results = new QueryResult[iterations];
128+
for (int i = 0; i < iterations; i++)
129+
{
130+
_outputHelper?.WriteLine($"Iteration {i + 1} of {iterations}");
131+
// Generate unique column names so query will not be served from cache
132+
string columnName1 = Guid.NewGuid().ToString("N");
133+
string columnName2 = Guid.NewGuid().ToString("N");
134+
statement.SqlQuery = $"SELECT `{columnName2}` AS `{columnName1}` FROM UNNEST(GENERATE_ARRAY(1, 100)) AS `{columnName2}`";
135+
_outputHelper?.WriteLine($"Query: {statement.SqlQuery}");
136+
137+
// Expect this to take about 10 seconds without cancellation
138+
results[i] = statement.ExecuteQuery();
139+
}
140+
statement.Cancel();
141+
statement.Dispose();
142+
for (int index = 0; index < iterations; index++)
143+
{
144+
try
145+
{
146+
QueryResult queryResult = results[index];
147+
using IArrowArrayStream? stream = queryResult.Stream;
148+
Assert.NotNull(stream);
149+
RecordBatch batch = await stream.ReadNextRecordBatchAsync();
150+
151+
Assert.Fail("Expecting OperationCanceledException to be thrown.");
152+
}
153+
catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
154+
{
155+
_outputHelper?.WriteLine($"Received expected OperationCanceledException: {ex.Message}");
156+
}
157+
catch (Exception ex) when (ex is not FailException)
158+
{
159+
Assert.Fail($"Expecting OperationCanceledException to be thrown. Instead, received {ex.GetType().Name}: {ex.Message}");
160+
}
161+
}
162+
}
163+
}
164+
165+
[Fact]
166+
public async Task CanCancelStreamFromStatement()
167+
{
168+
foreach (BigQueryTestEnvironment environment in _environments)
169+
{
170+
using AdbcConnection adbcConnection = GetAdbcConnection(environment.Name);
171+
using AdbcStatement statement = adbcConnection.CreateStatement();
172+
125173
// Execute the query/cancel multiple times to validate consistent behavior
126174
const int iterations = 3;
127175
QueryResult[] results = new QueryResult[iterations];
@@ -143,7 +191,7 @@ public async Task CanCancelStreamFromStatement()
143191
try
144192
{
145193
QueryResult queryResult = results[index];
146-
IArrowArrayStream? stream = queryResult.Stream;
194+
using IArrowArrayStream? stream = queryResult.Stream;
147195
Assert.NotNull(stream);
148196
RecordBatch batch = await stream.ReadNextRecordBatchAsync();
149197

0 commit comments

Comments
 (0)