Skip to content

Commit 2900917

Browse files
author
Jade Wang
committed
reduce diff
squash and merge refactor
1 parent 8b28861 commit 2900917

17 files changed

+3187
-748
lines changed

csharp/doc/PECO-2788-cloudfetch-protocol-agnostic-design.md

Lines changed: 1966 additions & 0 deletions
Large diffs are not rendered by default.

csharp/src/Reader/BaseDatabricksReader.cs

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,70 +30,49 @@
3030
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
3131
{
3232
/// <summary>
33-
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader
33+
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader.
34+
/// Protocol-agnostic - works with both Thrift and REST implementations.
3435
/// </summary>
3536
internal abstract class BaseDatabricksReader : TracingReader
3637
{
37-
protected IHiveServer2Statement statement;
3838
protected readonly Schema schema;
39-
protected readonly IResponse response;
39+
protected readonly IResponse? response; // Nullable for protocol-agnostic usage
4040
protected readonly bool isLz4Compressed;
4141
protected bool hasNoMoreRows = false;
4242
private bool isDisposed;
43-
private bool isClosed;
4443

45-
protected BaseDatabricksReader(IHiveServer2Statement statement, Schema schema, IResponse response, bool isLz4Compressed)
44+
/// <summary>
45+
/// Gets the statement for this reader. Subclasses can decide how to provide it.
46+
/// Used for Thrift operations in DatabricksReader. Not used in CloudFetchReader.
47+
/// </summary>
48+
protected abstract ITracingStatement Statement { get; }
49+
50+
/// <summary>
51+
/// Protocol-agnostic constructor.
52+
/// </summary>
53+
/// <param name="statement">The tracing statement (both Thrift and REST implement ITracingStatement).</param>
54+
/// <param name="schema">The Arrow schema.</param>
55+
/// <param name="response">The query response (nullable for REST API).</param>
56+
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
57+
protected BaseDatabricksReader(ITracingStatement statement, Schema schema, IResponse? response, bool isLz4Compressed)
4658
: base(statement)
4759
{
4860
this.schema = schema;
4961
this.response = response;
5062
this.isLz4Compressed = isLz4Compressed;
51-
this.statement = statement;
5263
}
5364

5465
public override Schema Schema { get { return schema; } }
5566

5667
protected override void Dispose(bool disposing)
5768
{
58-
try
59-
{
60-
if (!isDisposed)
61-
{
62-
if (disposing)
63-
{
64-
_ = CloseOperationAsync().Result;
65-
}
66-
}
67-
}
68-
finally
69+
if (!isDisposed)
6970
{
7071
base.Dispose(disposing);
7172
isDisposed = true;
7273
}
7374
}
7475

75-
/// <summary>
76-
/// Closes the current operation.
77-
/// </summary>
78-
/// <returns>Returns true if the close operation completes successfully, false otherwise.</returns>
79-
/// <exception cref="HiveServer2Exception" />
80-
public async Task<bool> CloseOperationAsync()
81-
{
82-
try
83-
{
84-
if (!isClosed)
85-
{
86-
_ = await HiveServer2Reader.CloseOperationAsync(this.statement, this.response);
87-
return true;
88-
}
89-
return false;
90-
}
91-
finally
92-
{
93-
isClosed = true;
94-
}
95-
}
96-
9776
protected void ThrowIfDisposed()
9877
{
9978
if (isDisposed)
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* This file has been modified from its original version, which is
5+
* under the Apache License:
6+
*
7+
* Licensed to the Apache Software Foundation (ASF) under one
8+
* or more contributor license agreements. See the NOTICE file
9+
* distributed with this work for additional information
10+
* regarding copyright ownership. The ASF licenses this file
11+
* to you under the Apache License, Version 2.0 (the
12+
* "License"); you may not use this file except in compliance
13+
* with the License. You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
24+
using System;
25+
using System.Buffers;
26+
using System.Collections.Generic;
27+
using Microsoft.IO;
28+
29+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
30+
{
31+
/// <summary>
32+
/// Configuration for the CloudFetch download pipeline.
33+
/// Protocol-agnostic - works with both Thrift and REST implementations.
34+
/// </summary>
35+
internal sealed class CloudFetchConfiguration
36+
{
37+
// Default values
38+
internal const int DefaultParallelDownloads = 3;
39+
internal const int DefaultPrefetchCount = 2;
40+
internal const int DefaultMemoryBufferSizeMB = 100;
41+
internal const int DefaultTimeoutMinutes = 5;
42+
internal const int DefaultMaxRetries = 3;
43+
internal const int DefaultRetryDelayMs = 500;
44+
internal const int DefaultMaxUrlRefreshAttempts = 3;
45+
internal const int DefaultUrlExpirationBufferSeconds = 60;
46+
47+
/// <summary>
48+
/// Number of parallel downloads to perform.
49+
/// </summary>
50+
public int ParallelDownloads { get; set; } = DefaultParallelDownloads;
51+
52+
/// <summary>
53+
/// Number of files to prefetch ahead of the reader.
54+
/// </summary>
55+
public int PrefetchCount { get; set; } = DefaultPrefetchCount;
56+
57+
/// <summary>
58+
/// Memory buffer size limit in MB for buffered files.
59+
/// </summary>
60+
public int MemoryBufferSizeMB { get; set; } = DefaultMemoryBufferSizeMB;
61+
62+
/// <summary>
63+
/// HTTP client timeout for downloads (in minutes).
64+
/// </summary>
65+
public int TimeoutMinutes { get; set; } = DefaultTimeoutMinutes;
66+
67+
/// <summary>
68+
/// Maximum retry attempts for failed downloads.
69+
/// </summary>
70+
public int MaxRetries { get; set; } = DefaultMaxRetries;
71+
72+
/// <summary>
73+
/// Delay between retry attempts (in milliseconds).
74+
/// </summary>
75+
public int RetryDelayMs { get; set; } = DefaultRetryDelayMs;
76+
77+
/// <summary>
78+
/// Maximum attempts to refresh expired URLs.
79+
/// </summary>
80+
public int MaxUrlRefreshAttempts { get; set; } = DefaultMaxUrlRefreshAttempts;
81+
82+
/// <summary>
83+
/// Buffer time before URL expiration to trigger refresh (in seconds).
84+
/// </summary>
85+
public int UrlExpirationBufferSeconds { get; set; } = DefaultUrlExpirationBufferSeconds;
86+
87+
/// <summary>
88+
/// Whether the result data is LZ4 compressed.
89+
/// </summary>
90+
public bool IsLz4Compressed { get; set; }
91+
92+
/// <summary>
93+
/// The Arrow schema for the results.
94+
/// </summary>
95+
public Schema Schema { get; set; }
96+
97+
/// <summary>
98+
/// RecyclableMemoryStreamManager for LZ4 decompression output streams.
99+
/// If not provided, a new instance will be created when needed.
100+
/// For optimal memory pooling, this should be shared from the connection/database level.
101+
/// </summary>
102+
public RecyclableMemoryStreamManager? MemoryStreamManager { get; set; }
103+
104+
/// <summary>
105+
/// ArrayPool for LZ4 decompression buffers.
106+
/// If not provided, ArrayPool&lt;byte&gt;.Shared will be used.
107+
/// For optimal memory pooling, this should be shared from the connection/database level.
108+
/// </summary>
109+
public ArrayPool<byte>? Lz4BufferPool { get; set; }
110+
111+
/// <summary>
112+
/// Creates configuration from connection properties.
113+
/// Works with UNIFIED properties that are shared across ALL protocols (Thrift, REST, future protocols).
114+
/// Same property names (e.g., "adbc.databricks.cloudfetch.parallel_downloads") work for all protocols.
115+
/// </summary>
116+
/// <param name="properties">Connection properties from either Thrift or REST connection.</param>
117+
/// <param name="schema">Arrow schema for the results.</param>
118+
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
119+
/// <returns>CloudFetch configuration parsed from unified properties.</returns>
120+
public static CloudFetchConfiguration FromProperties(
121+
IReadOnlyDictionary<string, string> properties,
122+
Schema schema,
123+
bool isLz4Compressed)
124+
{
125+
var config = new CloudFetchConfiguration
126+
{
127+
Schema = schema ?? throw new ArgumentNullException(nameof(schema)),
128+
IsLz4Compressed = isLz4Compressed
129+
};
130+
131+
// Parse parallel downloads
132+
if (properties.TryGetValue(DatabricksParameters.CloudFetchParallelDownloads, out string? parallelStr))
133+
{
134+
if (int.TryParse(parallelStr, out int parallel) && parallel > 0)
135+
config.ParallelDownloads = parallel;
136+
else
137+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchParallelDownloads}: {parallelStr}. Expected a positive integer.");
138+
}
139+
140+
// Parse prefetch count
141+
if (properties.TryGetValue(DatabricksParameters.CloudFetchPrefetchCount, out string? prefetchStr))
142+
{
143+
if (int.TryParse(prefetchStr, out int prefetch) && prefetch > 0)
144+
config.PrefetchCount = prefetch;
145+
else
146+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchPrefetchCount}: {prefetchStr}. Expected a positive integer.");
147+
}
148+
149+
// Parse memory buffer size
150+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMemoryBufferSize, out string? memoryStr))
151+
{
152+
if (int.TryParse(memoryStr, out int memory) && memory > 0)
153+
config.MemoryBufferSizeMB = memory;
154+
else
155+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMemoryBufferSize}: {memoryStr}. Expected a positive integer.");
156+
}
157+
158+
// Parse timeout
159+
if (properties.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out string? timeoutStr))
160+
{
161+
if (int.TryParse(timeoutStr, out int timeout) && timeout > 0)
162+
config.TimeoutMinutes = timeout;
163+
else
164+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchTimeoutMinutes}: {timeoutStr}. Expected a positive integer.");
165+
}
166+
167+
// Parse max retries
168+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? retriesStr))
169+
{
170+
if (int.TryParse(retriesStr, out int retries) && retries > 0)
171+
config.MaxRetries = retries;
172+
else
173+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMaxRetries}: {retriesStr}. Expected a positive integer.");
174+
}
175+
176+
// Parse retry delay
177+
if (properties.TryGetValue(DatabricksParameters.CloudFetchRetryDelayMs, out string? retryDelayStr))
178+
{
179+
if (int.TryParse(retryDelayStr, out int retryDelay) && retryDelay > 0)
180+
config.RetryDelayMs = retryDelay;
181+
else
182+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchRetryDelayMs}: {retryDelayStr}. Expected a positive integer.");
183+
}
184+
185+
// Parse max URL refresh attempts
186+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, out string? maxUrlRefreshStr))
187+
{
188+
if (int.TryParse(maxUrlRefreshStr, out int maxUrlRefresh) && maxUrlRefresh > 0)
189+
config.MaxUrlRefreshAttempts = maxUrlRefresh;
190+
else
191+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMaxUrlRefreshAttempts}: {maxUrlRefreshStr}. Expected a positive integer.");
192+
}
193+
194+
// Parse URL expiration buffer
195+
if (properties.TryGetValue(DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, out string? urlExpirationStr))
196+
{
197+
if (int.TryParse(urlExpirationStr, out int urlExpiration) && urlExpiration > 0)
198+
config.UrlExpirationBufferSeconds = urlExpiration;
199+
else
200+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchUrlExpirationBufferSeconds}: {urlExpirationStr}. Expected a positive integer.");
201+
}
202+
203+
return config;
204+
}
205+
}
206+
}

0 commit comments

Comments
 (0)