Skip to content

Commit 7dd0465

Browse files
author
Jade Wang
committed
reduce diff
squash and merge refactor
1 parent 8b28861 commit 7dd0465

16 files changed

+3068
-676
lines changed

csharp/doc/PECO-2788-cloudfetch-protocol-agnostic-design.md

Lines changed: 1946 additions & 0 deletions
Large diffs are not rendered by default.

csharp/src/Reader/BaseDatabricksReader.cs

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,70 +30,49 @@
3030
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
3131
{
3232
/// <summary>
33-
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader
33+
/// Base class for Databricks readers that handles common functionality of DatabricksReader and CloudFetchReader.
34+
/// Protocol-agnostic - works with both Thrift and REST implementations.
3435
/// </summary>
3536
internal abstract class BaseDatabricksReader : TracingReader
3637
{
37-
protected IHiveServer2Statement statement;
3838
protected readonly Schema schema;
39-
protected readonly IResponse response;
39+
protected readonly IResponse? response; // Nullable for protocol-agnostic usage
4040
protected readonly bool isLz4Compressed;
4141
protected bool hasNoMoreRows = false;
4242
private bool isDisposed;
43-
private bool isClosed;
4443

45-
protected BaseDatabricksReader(IHiveServer2Statement statement, Schema schema, IResponse response, bool isLz4Compressed)
44+
/// <summary>
45+
/// Gets the statement for this reader. Subclasses can decide how to provide it.
46+
/// Used for Thrift operations in DatabricksReader. Not used in CloudFetchReader.
47+
/// </summary>
48+
protected abstract ITracingStatement Statement { get; }
49+
50+
/// <summary>
51+
/// Protocol-agnostic constructor.
52+
/// </summary>
53+
/// <param name="statement">The tracing statement (both Thrift and REST implement ITracingStatement).</param>
54+
/// <param name="schema">The Arrow schema.</param>
55+
/// <param name="response">The query response (nullable for REST API).</param>
56+
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
57+
protected BaseDatabricksReader(ITracingStatement statement, Schema schema, IResponse? response, bool isLz4Compressed)
4658
: base(statement)
4759
{
4860
this.schema = schema;
4961
this.response = response;
5062
this.isLz4Compressed = isLz4Compressed;
51-
this.statement = statement;
5263
}
5364

5465
public override Schema Schema { get { return schema; } }
5566

5667
protected override void Dispose(bool disposing)
5768
{
58-
try
59-
{
60-
if (!isDisposed)
61-
{
62-
if (disposing)
63-
{
64-
_ = CloseOperationAsync().Result;
65-
}
66-
}
67-
}
68-
finally
69+
if (!isDisposed)
6970
{
7071
base.Dispose(disposing);
7172
isDisposed = true;
7273
}
7374
}
7475

75-
/// <summary>
76-
/// Closes the current operation.
77-
/// </summary>
78-
/// <returns>Returns true if the close operation completes successfully, false otherwise.</returns>
79-
/// <exception cref="HiveServer2Exception" />
80-
public async Task<bool> CloseOperationAsync()
81-
{
82-
try
83-
{
84-
if (!isClosed)
85-
{
86-
_ = await HiveServer2Reader.CloseOperationAsync(this.statement, this.response);
87-
return true;
88-
}
89-
return false;
90-
}
91-
finally
92-
{
93-
isClosed = true;
94-
}
95-
}
96-
9776
protected void ThrowIfDisposed()
9877
{
9978
if (isDisposed)
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* This file has been modified from its original version, which is
5+
* under the Apache License:
6+
*
7+
* Licensed to the Apache Software Foundation (ASF) under one
8+
* or more contributor license agreements. See the NOTICE file
9+
* distributed with this work for additional information
10+
* regarding copyright ownership. The ASF licenses this file
11+
* to you under the Apache License, Version 2.0 (the
12+
* "License"); you may not use this file except in compliance
13+
* with the License. You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
24+
using System;
25+
using System.Collections.Generic;
26+
27+
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
28+
{
29+
/// <summary>
30+
/// Configuration for the CloudFetch download pipeline.
31+
/// Protocol-agnostic - works with both Thrift and REST implementations.
32+
/// </summary>
33+
internal sealed class CloudFetchConfiguration
34+
{
35+
// Default values
36+
private const int DefaultParallelDownloads = 3;
37+
private const int DefaultPrefetchCount = 2;
38+
internal const int DefaultMemoryBufferSizeMB = 100;
39+
private const int DefaultTimeoutMinutes = 5;
40+
private const int DefaultMaxRetries = 3;
41+
private const int DefaultRetryDelayMs = 500;
42+
private const int DefaultMaxUrlRefreshAttempts = 3;
43+
private const int DefaultUrlExpirationBufferSeconds = 60;
44+
45+
/// <summary>
46+
/// Number of parallel downloads to perform.
47+
/// </summary>
48+
public int ParallelDownloads { get; set; } = DefaultParallelDownloads;
49+
50+
/// <summary>
51+
/// Number of files to prefetch ahead of the reader.
52+
/// </summary>
53+
public int PrefetchCount { get; set; } = DefaultPrefetchCount;
54+
55+
/// <summary>
56+
/// Memory buffer size limit in MB for buffered files.
57+
/// </summary>
58+
public int MemoryBufferSizeMB { get; set; } = DefaultMemoryBufferSizeMB;
59+
60+
/// <summary>
61+
/// HTTP client timeout for downloads (in minutes).
62+
/// </summary>
63+
public int TimeoutMinutes { get; set; } = DefaultTimeoutMinutes;
64+
65+
/// <summary>
66+
/// Maximum retry attempts for failed downloads.
67+
/// </summary>
68+
public int MaxRetries { get; set; } = DefaultMaxRetries;
69+
70+
/// <summary>
71+
/// Delay between retry attempts (in milliseconds).
72+
/// </summary>
73+
public int RetryDelayMs { get; set; } = DefaultRetryDelayMs;
74+
75+
/// <summary>
76+
/// Maximum attempts to refresh expired URLs.
77+
/// </summary>
78+
public int MaxUrlRefreshAttempts { get; set; } = DefaultMaxUrlRefreshAttempts;
79+
80+
/// <summary>
81+
/// Buffer time before URL expiration to trigger refresh (in seconds).
82+
/// </summary>
83+
public int UrlExpirationBufferSeconds { get; set; } = DefaultUrlExpirationBufferSeconds;
84+
85+
/// <summary>
86+
/// Whether the result data is LZ4 compressed.
87+
/// </summary>
88+
public bool IsLz4Compressed { get; set; }
89+
90+
/// <summary>
91+
/// The Arrow schema for the results.
92+
/// </summary>
93+
public Schema Schema { get; set; }
94+
95+
/// <summary>
96+
/// Creates configuration from connection properties.
97+
/// Works with UNIFIED properties that are shared across ALL protocols (Thrift, REST, future protocols).
98+
/// Same property names (e.g., "adbc.databricks.cloudfetch.parallel_downloads") work for all protocols.
99+
/// </summary>
100+
/// <param name="properties">Connection properties from either Thrift or REST connection.</param>
101+
/// <param name="schema">Arrow schema for the results.</param>
102+
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
103+
/// <returns>CloudFetch configuration parsed from unified properties.</returns>
104+
public static CloudFetchConfiguration FromProperties(
105+
IReadOnlyDictionary<string, string> properties,
106+
Schema schema,
107+
bool isLz4Compressed)
108+
{
109+
var config = new CloudFetchConfiguration
110+
{
111+
Schema = schema ?? throw new ArgumentNullException(nameof(schema)),
112+
IsLz4Compressed = isLz4Compressed
113+
};
114+
115+
// Parse parallel downloads
116+
if (properties.TryGetValue(DatabricksParameters.CloudFetchParallelDownloads, out string? parallelStr))
117+
{
118+
if (int.TryParse(parallelStr, out int parallel) && parallel > 0)
119+
config.ParallelDownloads = parallel;
120+
else
121+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchParallelDownloads}: {parallelStr}. Expected a positive integer.");
122+
}
123+
124+
// Parse prefetch count
125+
if (properties.TryGetValue(DatabricksParameters.CloudFetchPrefetchCount, out string? prefetchStr))
126+
{
127+
if (int.TryParse(prefetchStr, out int prefetch) && prefetch > 0)
128+
config.PrefetchCount = prefetch;
129+
else
130+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchPrefetchCount}: {prefetchStr}. Expected a positive integer.");
131+
}
132+
133+
// Parse memory buffer size
134+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMemoryBufferSize, out string? memoryStr))
135+
{
136+
if (int.TryParse(memoryStr, out int memory) && memory > 0)
137+
config.MemoryBufferSizeMB = memory;
138+
else
139+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMemoryBufferSize}: {memoryStr}. Expected a positive integer.");
140+
}
141+
142+
// Parse timeout
143+
if (properties.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out string? timeoutStr))
144+
{
145+
if (int.TryParse(timeoutStr, out int timeout) && timeout > 0)
146+
config.TimeoutMinutes = timeout;
147+
else
148+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchTimeoutMinutes}: {timeoutStr}. Expected a positive integer.");
149+
}
150+
151+
// Parse max retries
152+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? retriesStr))
153+
{
154+
if (int.TryParse(retriesStr, out int retries) && retries > 0)
155+
config.MaxRetries = retries;
156+
else
157+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMaxRetries}: {retriesStr}. Expected a positive integer.");
158+
}
159+
160+
// Parse retry delay
161+
if (properties.TryGetValue(DatabricksParameters.CloudFetchRetryDelayMs, out string? retryDelayStr))
162+
{
163+
if (int.TryParse(retryDelayStr, out int retryDelay) && retryDelay > 0)
164+
config.RetryDelayMs = retryDelay;
165+
else
166+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchRetryDelayMs}: {retryDelayStr}. Expected a positive integer.");
167+
}
168+
169+
// Parse max URL refresh attempts
170+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, out string? maxUrlRefreshStr))
171+
{
172+
if (int.TryParse(maxUrlRefreshStr, out int maxUrlRefresh) && maxUrlRefresh > 0)
173+
config.MaxUrlRefreshAttempts = maxUrlRefresh;
174+
else
175+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchMaxUrlRefreshAttempts}: {maxUrlRefreshStr}. Expected a positive integer.");
176+
}
177+
178+
// Parse URL expiration buffer
179+
if (properties.TryGetValue(DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, out string? urlExpirationStr))
180+
{
181+
if (int.TryParse(urlExpirationStr, out int urlExpiration) && urlExpiration > 0)
182+
config.UrlExpirationBufferSeconds = urlExpiration;
183+
else
184+
throw new ArgumentException($"Invalid {DatabricksParameters.CloudFetchUrlExpirationBufferSeconds}: {urlExpirationStr}. Expected a positive integer.");
185+
}
186+
187+
return config;
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)