Skip to content

Commit 3c3ebde

Browse files
author
Jade Wang
committed
squash and merge
1 parent 8b28861 commit 3c3ebde

17 files changed

+3349
-500
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
.github/pull_request_template.md
1616
.gitmodules
1717
*.csproj
18+
*.sln

csharp/Apache.Arrow.Adbc.Drivers.Databricks.sln

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
2-
Microsoft Visual Studio Solution File, Format Version 12.00
1+
Microsoft Visual Studio Solution File, Format Version 12.00
32
# Visual Studio Version 17
43
# Copyright (c) 2025 ADBC Drivers Contributors
54
#

csharp/doc/PECO-2788-cloudfetch-protocol-agnostic-design.md

Lines changed: 2246 additions & 0 deletions
Large diffs are not rendered by default.

csharp/src/Reader/BaseDatabricksReader.cs

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,70 +30,49 @@
3030
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
3131
{
3232
/// <summary>
33-
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader
33+
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader.
34+
/// Protocol-agnostic - works with both Thrift and REST implementations.
3435
/// </summary>
3536
internal abstract class BaseDatabricksReader : TracingReader
3637
{
37-
protected IHiveServer2Statement statement;
3838
protected readonly Schema schema;
39-
protected readonly IResponse response;
39+
protected readonly IResponse? response; // Nullable for protocol-agnostic usage
4040
protected readonly bool isLz4Compressed;
4141
protected bool hasNoMoreRows = false;
4242
private bool isDisposed;
43-
private bool isClosed;
4443

45-
protected BaseDatabricksReader(IHiveServer2Statement statement, Schema schema, IResponse response, bool isLz4Compressed)
44+
/// <summary>
45+
/// Gets the statement for this reader. Subclasses can decide how to provide it.
46+
/// Used for Thrift operations in DatabricksReader. Not used in CloudFetchReader.
47+
/// </summary>
48+
protected abstract ITracingStatement Statement { get; }
49+
50+
/// <summary>
51+
/// Protocol-agnostic constructor.
52+
/// </summary>
53+
/// <param name="statement">The tracing statement (both Thrift and REST implement ITracingStatement).</param>
54+
/// <param name="schema">The Arrow schema.</param>
55+
/// <param name="response">The query response (nullable for REST API).</param>
56+
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
57+
protected BaseDatabricksReader(ITracingStatement statement, Schema schema, IResponse? response, bool isLz4Compressed)
4658
: base(statement)
4759
{
4860
this.schema = schema;
4961
this.response = response;
5062
this.isLz4Compressed = isLz4Compressed;
51-
this.statement = statement;
5263
}
5364

5465
public override Schema Schema { get { return schema; } }
5566

5667
protected override void Dispose(bool disposing)
5768
{
58-
try
59-
{
60-
if (!isDisposed)
61-
{
62-
if (disposing)
63-
{
64-
_ = CloseOperationAsync().Result;
65-
}
66-
}
67-
}
68-
finally
69+
if (!isDisposed)
6970
{
7071
base.Dispose(disposing);
7172
isDisposed = true;
7273
}
7374
}
7475

75-
/// <summary>
76-
/// Closes the current operation.
77-
/// </summary>
78-
/// <returns>Returns true if the close operation completes successfully, false otherwise.</returns>
79-
/// <exception cref="HiveServer2Exception" />
80-
public async Task<bool> CloseOperationAsync()
81-
{
82-
try
83-
{
84-
if (!isClosed)
85-
{
86-
_ = await HiveServer2Reader.CloseOperationAsync(this.statement, this.response);
87-
return true;
88-
}
89-
return false;
90-
}
91-
finally
92-
{
93-
isClosed = true;
94-
}
95-
}
96-
9776
protected void ThrowIfDisposed()
9877
{
9978
if (isDisposed)
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* This file has been modified from its original version, which is
5+
* under the Apache License:
6+
*
7+
* Licensed to the Apache Software Foundation (ASF) under one
8+
* or more contributor license agreements. See the NOTICE file
9+
* distributed with this work for additional information
10+
* regarding copyright ownership. The ASF licenses this file
11+
* to you under the Apache License, Version 2.0 (the
12+
* "License"); you may not use this file except in compliance
13+
* with the License. You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
24+
using System;
25+
using System.Collections.Concurrent;
26+
using System.Collections.Generic;
27+
using System.Diagnostics;
28+
using System.Threading;
29+
using System.Threading.Tasks;
30+
using Apache.Arrow.Adbc.Tracing;
31+
32+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
33+
{
34+
/// <summary>
35+
/// Base class for result fetchers that extract common pipeline management logic.
36+
/// Subclasses implement protocol-specific fetching logic (Thrift, REST, etc.).
37+
/// </summary>
38+
internal abstract class BaseResultFetcher : ICloudFetchResultFetcher
39+
{
40+
protected BlockingCollection<IDownloadResult>? _downloadQueue;
41+
protected ICloudFetchMemoryBufferManager? _memoryManager;
42+
protected volatile bool _hasMoreResults;
43+
protected volatile bool _isCompleted;
44+
protected Exception? _error;
45+
private Task? _fetchTask;
46+
private CancellationTokenSource? _cancellationTokenSource;
47+
48+
/// <summary>
49+
/// Initializes a new instance of the <see cref="BaseResultFetcher"/> class.
50+
/// </summary>
51+
/// <param name="memoryManager">The memory buffer manager (can be null, will be initialized later).</param>
52+
/// <param name="downloadQueue">The queue to add download tasks to (can be null, will be initialized later).</param>
53+
protected BaseResultFetcher(
54+
ICloudFetchMemoryBufferManager? memoryManager,
55+
BlockingCollection<IDownloadResult>? downloadQueue)
56+
{
57+
_memoryManager = memoryManager;
58+
_downloadQueue = downloadQueue;
59+
_hasMoreResults = true;
60+
_isCompleted = false;
61+
}
62+
63+
/// <inheritdoc />
64+
public virtual void Initialize(
65+
ICloudFetchMemoryBufferManager memoryManager,
66+
BlockingCollection<IDownloadResult> downloadQueue)
67+
{
68+
_memoryManager = memoryManager ?? throw new ArgumentNullException(nameof(memoryManager));
69+
_downloadQueue = downloadQueue ?? throw new ArgumentNullException(nameof(downloadQueue));
70+
}
71+
72+
/// <inheritdoc />
73+
public bool HasMoreResults => _hasMoreResults;
74+
75+
/// <inheritdoc />
76+
public bool IsCompleted => _isCompleted;
77+
78+
/// <inheritdoc />
79+
public bool HasError => _error != null;
80+
81+
/// <inheritdoc />
82+
public Exception? Error => _error;
83+
84+
/// <inheritdoc />
85+
public async Task StartAsync(CancellationToken cancellationToken)
86+
{
87+
if (_fetchTask != null)
88+
{
89+
throw new InvalidOperationException("Fetcher is already running.");
90+
}
91+
92+
// Reset state
93+
_hasMoreResults = true;
94+
_isCompleted = false;
95+
_error = null;
96+
ResetState();
97+
98+
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
99+
_fetchTask = FetchResultsWrapperAsync(_cancellationTokenSource.Token);
100+
101+
await Task.Yield();
102+
}
103+
104+
/// <inheritdoc />
105+
public async Task StopAsync()
106+
{
107+
if (_fetchTask == null)
108+
{
109+
return;
110+
}
111+
112+
_cancellationTokenSource?.Cancel();
113+
114+
try
115+
{
116+
await _fetchTask.ConfigureAwait(false);
117+
}
118+
catch (OperationCanceledException)
119+
{
120+
// Expected when cancellation is requested
121+
}
122+
catch (Exception ex)
123+
{
124+
Activity.Current?.AddEvent("cloudfetch.fetcher_stop_error", [
125+
new("error_message", ex.Message)
126+
]);
127+
}
128+
finally
129+
{
130+
_cancellationTokenSource?.Dispose();
131+
_cancellationTokenSource = null;
132+
_fetchTask = null;
133+
}
134+
}
135+
136+
/// <summary>
137+
/// Gets a download result for the specified offset, fetching or refreshing as needed.
138+
/// </summary>
139+
/// <param name="offset">The row offset for which to get a download result.</param>
140+
/// <param name="cancellationToken">The cancellation token.</param>
141+
/// <returns>The download result for the specified offset, or null if not available.</returns>
142+
public abstract Task<IDownloadResult?> GetDownloadResultAsync(long offset, CancellationToken cancellationToken);
143+
144+
/// <summary>
145+
/// Re-fetches URLs for chunks in the specified range.
146+
/// Used when URLs expire before download completes.
147+
/// </summary>
148+
/// <param name="startRowOffset">The starting row offset to fetch from (for Thrift protocol).</param>
149+
/// <param name="startChunkIndex">The starting chunk index (inclusive, for REST protocol).</param>
150+
/// <param name="endChunkIndex">The ending chunk index (inclusive, for REST protocol).</param>
151+
/// <param name="cancellationToken">The cancellation token.</param>
152+
/// <returns>A collection of download results with refreshed URLs.</returns>
153+
public abstract Task<IEnumerable<IDownloadResult>> RefreshUrlsAsync(long startRowOffset, long startChunkIndex, long endChunkIndex, CancellationToken cancellationToken);
154+
155+
/// <summary>
156+
/// Resets the fetcher state. Called at the beginning of StartAsync.
157+
/// Subclasses can override to reset protocol-specific state.
158+
/// </summary>
159+
protected virtual void ResetState()
160+
{
161+
// Base implementation does nothing. Subclasses can override.
162+
}
163+
164+
/// <summary>
165+
/// Protocol-specific logic to fetch all results and populate the download queue.
166+
/// This method must add IDownloadResult objects to _downloadQueue using AddDownloadResult().
167+
/// It should also set _hasMoreResults appropriately and throw exceptions on error.
168+
/// </summary>
169+
/// <param name="cancellationToken">The cancellation token.</param>
170+
/// <returns>A task representing the asynchronous operation.</returns>
171+
protected abstract Task FetchAllResultsAsync(CancellationToken cancellationToken);
172+
173+
/// <summary>
174+
/// Helper method for subclasses to add download results to the queue.
175+
/// </summary>
176+
/// <param name="result">The download result to add.</param>
177+
/// <param name="cancellationToken">The cancellation token.</param>
178+
protected void AddDownloadResult(IDownloadResult result, CancellationToken cancellationToken)
179+
{
180+
if (_downloadQueue == null)
181+
throw new InvalidOperationException("Fetcher not initialized. Call Initialize() first.");
182+
183+
_downloadQueue.Add(result, cancellationToken);
184+
}
185+
186+
private async Task FetchResultsWrapperAsync(CancellationToken cancellationToken)
187+
{
188+
try
189+
{
190+
await FetchAllResultsAsync(cancellationToken).ConfigureAwait(false);
191+
192+
// Add the end of results guard to the queue
193+
if (_downloadQueue == null)
194+
throw new InvalidOperationException("Fetcher not initialized. Call Initialize() first.");
195+
196+
_downloadQueue.Add(EndOfResultsGuard.Instance, cancellationToken);
197+
_isCompleted = true;
198+
}
199+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
200+
{
201+
// Expected when cancellation is requested
202+
_isCompleted = true;
203+
204+
// Add the end of results guard to the queue
205+
try
206+
{
207+
_downloadQueue.TryAdd(EndOfResultsGuard.Instance, 0);
208+
}
209+
catch (Exception)
210+
{
211+
// Ignore any errors when adding the guard
212+
}
213+
}
214+
catch (Exception ex)
215+
{
216+
Activity.Current?.AddEvent("cloudfetch.fetcher_unhandled_error", [
217+
new("error_message", ex.Message),
218+
new("error_type", ex.GetType().Name)
219+
]);
220+
_error = ex;
221+
_hasMoreResults = false;
222+
_isCompleted = true;
223+
224+
// Add the end of results guard to the queue even in case of error
225+
try
226+
{
227+
_downloadQueue.TryAdd(EndOfResultsGuard.Instance, 0);
228+
}
229+
catch (Exception)
230+
{
231+
// Ignore any errors when adding the guard in case of error
232+
}
233+
}
234+
}
235+
}
236+
}

0 commit comments

Comments
 (0)