Skip to content

Commit 0a84392

Browse files
authored
feat(csharp/src/Drivers/Databricks): Added support for connection param of maxBytesPerFetchRequest (apache#3474)
## Description Added support for the connection param of maxBytesPerFetchRequest. The parameter is used for direct results as well. [PECO-2734](https://databricks.atlassian.net/browse/PECO-2734)
1 parent 23c5bb2 commit 0a84392

File tree

10 files changed

+200
-4
lines changed

10 files changed

+200
-4
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ internal class DatabricksConnection : SparkHttpConnection
7070
private bool _useCloudFetch = true;
7171
private bool _canDecompressLz4 = true;
7272
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
73+
private const long DefaultMaxBytesPerFetchRequest = 400 * 1024 * 1024; // 400MB
74+
private long _maxBytesPerFetchRequest = DefaultMaxBytesPerFetchRequest;
7375
private const bool DefaultRetryOnUnavailable = true;
7476
private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
7577
private bool _useDescTableExtended = true;
@@ -322,6 +324,26 @@ private void ValidateProperties()
322324
_maxBytesPerFile = maxBytesPerFileValue;
323325
}
324326

327+
if (Properties.TryGetValue(DatabricksParameters.MaxBytesPerFetchRequest, out string? maxBytesPerFetchRequestStr))
328+
{
329+
try
330+
{
331+
long maxBytesPerFetchRequestValue = ParseBytesWithUnits(maxBytesPerFetchRequestStr);
332+
if (maxBytesPerFetchRequestValue < 0)
333+
{
334+
throw new ArgumentOutOfRangeException(
335+
nameof(Properties),
336+
maxBytesPerFetchRequestValue,
337+
$"Parameter '{DatabricksParameters.MaxBytesPerFetchRequest}' value must be a non-negative integer. Use 0 for no limit.");
338+
}
339+
_maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
340+
}
341+
catch (FormatException)
342+
{
343+
throw new ArgumentException($"Parameter '{DatabricksParameters.MaxBytesPerFetchRequest}' value '{maxBytesPerFetchRequestStr}' could not be parsed. Valid formats: number with optional unit suffix (B, KB, MB, GB). Examples: '400MB', '1024KB', '1073741824'.");
344+
}
345+
}
346+
325347
// Parse default namespace
326348
string? defaultCatalog = null;
327349
string? defaultSchema = null;
@@ -477,6 +499,11 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
477499
/// </summary>
478500
internal long MaxBytesPerFile => _maxBytesPerFile;
479501

502+
/// <summary>
503+
/// Gets the maximum bytes per fetch request.
504+
/// </summary>
505+
internal long MaxBytesPerFetchRequest => _maxBytesPerFetchRequest;
506+
480507
/// <summary>
481508
/// Gets the default namespace to use for SQL queries.
482509
/// </summary>
@@ -771,6 +798,61 @@ private string EscapeSqlString(string value)
771798
return "`" + value.Replace("`", "``") + "`";
772799
}
773800

801+
/// <summary>
802+
/// Parses a byte value that may include unit suffixes (B, KB, MB, GB).
803+
/// </summary>
804+
/// <param name="value">The value to parse, e.g., "400MB", "1024KB", "1073741824"</param>
805+
/// <returns>The value in bytes</returns>
806+
/// <exception cref="FormatException">Thrown when the value cannot be parsed</exception>
807+
internal static long ParseBytesWithUnits(string value)
808+
{
809+
if (string.IsNullOrWhiteSpace(value))
810+
{
811+
throw new FormatException("Value cannot be null or empty");
812+
}
813+
814+
value = value.Trim().ToUpperInvariant();
815+
816+
// Check for unit suffixes
817+
long multiplier = 1;
818+
string numberPart = value;
819+
820+
if (value.EndsWith("GB"))
821+
{
822+
multiplier = 1024L * 1024L * 1024L;
823+
numberPart = value.Substring(0, value.Length - 2);
824+
}
825+
else if (value.EndsWith("MB"))
826+
{
827+
multiplier = 1024L * 1024L;
828+
numberPart = value.Substring(0, value.Length - 2);
829+
}
830+
else if (value.EndsWith("KB"))
831+
{
832+
multiplier = 1024L;
833+
numberPart = value.Substring(0, value.Length - 2);
834+
}
835+
else if (value.EndsWith("B"))
836+
{
837+
multiplier = 1L;
838+
numberPart = value.Substring(0, value.Length - 1);
839+
}
840+
841+
if (!long.TryParse(numberPart.Trim(), out long number))
842+
{
843+
throw new FormatException($"Invalid number format: {numberPart}");
844+
}
845+
846+
try
847+
{
848+
return checked(number * multiplier);
849+
}
850+
catch (OverflowException)
851+
{
852+
throw new FormatException($"Value {value} results in overflow when converted to bytes");
853+
}
854+
}
855+
774856
protected override void ValidateOptions()
775857
{
776858
base.ValidateOptions();

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class DatabricksParameters : SparkParameters
4040

4141
/// <summary>
4242
/// Maximum bytes per file for CloudFetch.
43+
/// The value can be specified with unit suffixes: B (bytes), KB (kilobytes), MB (megabytes), GB (gigabytes).
44+
/// If no unit is specified, the value is treated as bytes.
4345
/// Default value is 20MB if not specified.
4446
/// </summary>
4547
public const string MaxBytesPerFile = "adbc.databricks.cloudfetch.max_bytes_per_file";
@@ -131,6 +133,14 @@ public class DatabricksParameters : SparkParameters
131133
/// </summary>
132134
public const string CloudFetchPrefetchEnabled = "adbc.databricks.cloudfetch.prefetch_enabled";
133135

136+
/// <summary>
137+
/// Maximum bytes per fetch request when retrieving query results from servers.
138+
/// The value can be specified with unit suffixes: B (bytes), KB (kilobytes), MB (megabytes), GB (gigabytes).
139+
/// If no unit is specified, the value is treated as bytes.
140+
/// Default value is 400MB if not specified.
141+
/// </summary>
142+
public const string MaxBytesPerFetchRequest = "adbc.databricks.max_bytes_per_fetch_request";
143+
134144
/// <summary>
135145
/// The OAuth grant type to use for authentication.
136146
/// Supported values:

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement
3939
private bool useCloudFetch;
4040
private bool canDecompressLz4;
4141
private long maxBytesPerFile;
42+
private long maxBytesPerFetchRequest;
4243
private bool enableMultipleCatalogSupport;
4344
private bool enablePKFK;
4445
private bool runAsyncInThrift;
@@ -61,6 +62,7 @@ public DatabricksStatement(DatabricksConnection connection)
6162
useCloudFetch = connection.UseCloudFetch;
6263
canDecompressLz4 = connection.CanDecompressLz4;
6364
maxBytesPerFile = connection.MaxBytesPerFile;
65+
maxBytesPerFetchRequest = connection.MaxBytesPerFetchRequest;
6466
enableMultipleCatalogSupport = connection.EnableMultipleCatalogSupport;
6567
enablePKFK = connection.EnablePKFK;
6668

@@ -155,13 +157,25 @@ public override void SetOption(string key, string value)
155157
}
156158
break;
157159
case DatabricksParameters.MaxBytesPerFile:
158-
if (long.TryParse(value, out long maxBytesPerFileValue))
160+
try
159161
{
162+
long maxBytesPerFileValue = DatabricksConnection.ParseBytesWithUnits(value);
160163
this.maxBytesPerFile = maxBytesPerFileValue;
161164
}
162-
else
165+
catch (FormatException)
163166
{
164-
throw new ArgumentException($"Invalid value for {key}: {value}. Expected a long value.");
167+
throw new ArgumentException($"Invalid value for {key}: {value}. Valid formats: number with optional unit suffix (B, KB, MB, GB). Examples: '20MB', '1024KB', '1073741824'.");
168+
}
169+
break;
170+
case DatabricksParameters.MaxBytesPerFetchRequest:
171+
try
172+
{
173+
long maxBytesPerFetchRequestValue = DatabricksConnection.ParseBytesWithUnits(value);
174+
this.maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
175+
}
176+
catch (FormatException)
177+
{
178+
throw new ArgumentException($"Invalid value for {key}: {value}. Valid formats: number with optional unit suffix (B, KB, MB, GB). Examples: '400MB', '1024KB', '1073741824'.");
165179
}
166180
break;
167181
default:
@@ -194,6 +208,11 @@ internal void SetUseCloudFetch(bool useCloudFetch)
194208
/// </summary>
195209
public bool CanDecompressLz4 => canDecompressLz4;
196210

211+
/// <summary>
212+
/// Gets the maximum bytes per fetch request.
213+
/// </summary>
214+
public long MaxBytesPerFetchRequest => maxBytesPerFetchRequest;
215+
197216
/// <summary>
198217
/// Sets whether the client can decompress LZ4 compressed results.
199218
/// </summary>

csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,11 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can
280280
// Create fetch request
281281
TFetchResultsReq request = new TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, _batchSize);
282282

283+
if (_statement is DatabricksStatement databricksStatement)
284+
{
285+
request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
286+
}
287+
283288
// Set the start row offset
284289
long startOffset = offset ?? _startOffset;
285290
if (startOffset > 0)

csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ private BaseDatabricksReader DetermineReader(TFetchResultsResp initialResults)
123123
// Make a FetchResults call to get the initial result set
124124
// and determine the reader based on the result set
125125
TFetchResultsReq request = new TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, this._statement.BatchSize);
126+
127+
// Set MaxBytes from DatabricksStatement
128+
if (this._statement is DatabricksStatement databricksStatement)
129+
{
130+
request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
131+
}
132+
126133
TFetchResultsResp response = await this._statement.Client!.FetchResults(request, cancellationToken);
127134
_activeReader = DetermineReader(response);
128135
}

csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ public DatabricksReader(IHiveServer2Statement statement, Schema schema, IRespons
8383
}
8484
// TODO: use an expiring cancellationtoken
8585
TFetchResultsReq request = new TFetchResultsReq(this.response.OperationHandle!, TFetchOrientation.FETCH_NEXT, this.statement.BatchSize);
86+
87+
// Set MaxBytes from DatabricksStatement
88+
if (this.statement is DatabricksStatement databricksStatement)
89+
{
90+
request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
91+
}
92+
8693
TFetchResultsResp response = await this.statement.Connection.Client!.FetchResults(request, cancellationToken);
8794

8895
// Make sure we get the arrowBatches

csharp/src/Drivers/Databricks/readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ CloudFetch is Databricks' high-performance result retrieval system that download
103103
| :--- | :--- | :--- |
104104
| `adbc.databricks.cloudfetch.enabled` | Whether to use CloudFetch for retrieving results | `true` |
105105
| `adbc.databricks.cloudfetch.lz4.enabled` | Whether the client can decompress LZ4 compressed results | `true` |
106-
| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for CloudFetch (e.g., `20971520` for 20MB) | `20971520` |
106+
| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for CloudFetch. Supports unit suffixes (B, KB, MB, GB). Examples: `20MB`, `1024KB`, `20971520` | `20MB` |
107107
| `adbc.databricks.cloudfetch.parallel_downloads` | Maximum number of parallel downloads | `3` |
108108
| `adbc.databricks.cloudfetch.prefetch_count` | Number of files to prefetch | `2` |
109109
| `adbc.databricks.cloudfetch.memory_buffer_size_mb` | Maximum memory buffer size in MB for prefetched files | `200` |

csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,62 @@ internal void CanDetectConnectionParameterErrors(ParametersWithExceptions test)
5656
OutputHelper?.WriteLine(exception.Message);
5757
}
5858

59+
/// <summary>
60+
/// Validates that MaxBytesPerFetchRequest parameter accepts valid values with unit suffixes.
61+
/// </summary>
62+
[SkippableTheory]
63+
[InlineData("300MB", 300L * 1024L * 1024L)]
64+
[InlineData("1GB", 1024L * 1024L * 1024L)]
65+
[InlineData("512KB", 512L * 1024L)]
66+
[InlineData("1024B", 1024L)]
67+
[InlineData("1024", 1024L)]
68+
[InlineData("0", 0L)]
69+
internal void CanParseMaxBytesPerFetchRequestParameter(string parameterValue, long expectedBytes)
70+
{
71+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
72+
var parameters = TestEnvironment.GetDriverParameters(testConfig);
73+
parameters[DatabricksParameters.MaxBytesPerFetchRequest] = parameterValue;
74+
75+
AdbcDriver driver = NewDriver;
76+
AdbcDatabase database = driver.Open(parameters);
77+
78+
// This should not throw an exception
79+
using var connection = database.Connect(parameters);
80+
81+
// Verify the parameter was parsed correctly by creating a statement and checking the property
82+
using var statement = connection.CreateStatement();
83+
if (statement is DatabricksStatement databricksStatement)
84+
{
85+
Assert.Equal(expectedBytes, databricksStatement.MaxBytesPerFetchRequest);
86+
}
87+
}
88+
89+
/// <summary>
90+
/// Validates that MaxBytesPerFetchRequest parameter can be set via test configuration.
91+
/// </summary>
92+
[SkippableTheory]
93+
[InlineData("500MB", 500L * 1024L * 1024L)]
94+
[InlineData("2GB", 2L * 1024L * 1024L * 1024L)]
95+
internal void CanSetMaxBytesPerFetchRequestViaTestConfiguration(string configValue, long expectedBytes)
96+
{
97+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
98+
testConfig.MaxBytesPerFetchRequest = configValue;
99+
var parameters = TestEnvironment.GetDriverParameters(testConfig);
100+
101+
AdbcDriver driver = NewDriver;
102+
AdbcDatabase database = driver.Open(parameters);
103+
104+
// This should not throw an exception
105+
using var connection = database.Connect(parameters);
106+
107+
// Verify the parameter was set correctly via test configuration
108+
using var statement = connection.CreateStatement();
109+
if (statement is DatabricksStatement databricksStatement)
110+
{
111+
Assert.Equal(expectedBytes, databricksStatement.MaxBytesPerFetchRequest);
112+
}
113+
}
114+
59115
/// <summary>
60116
/// Tests connection timeout to establish a session with the backend.
61117
/// </summary>
@@ -304,6 +360,9 @@ public InvalidConnectionParametersTestData()
304360
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.CanDecompressLz4] = "notabool"}, typeof(ArgumentException)));
305361
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "notanumber" }, typeof(ArgumentException)));
306362
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "-100" }, typeof(ArgumentOutOfRangeException)));
363+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "notanumber" }, typeof(ArgumentException)));
364+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "-100" }, typeof(ArgumentOutOfRangeException)));
365+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "invalid_unit" }, typeof(ArgumentException)));
307366
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.EnableDirectResults] = "notabool" }, typeof(ArgumentException)));
308367
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = "-1" }, typeof(ArgumentOutOfRangeException)));
309368
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) }, typeof(ArgumentOutOfRangeException)));

csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,8 @@ public class DatabricksTestConfiguration : SparkTestConfiguration
6363

6464
[JsonPropertyName("enableDirectResults"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
6565
public string EnableDirectResults { get; set; } = string.Empty;
66+
67+
[JsonPropertyName("maxBytesPerFetchRequest"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
68+
public string MaxBytesPerFetchRequest { get; set; } = string.Empty;
6669
}
6770
}

csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ public override Dictionary<string, string> GetDriverParameters(DatabricksTestCon
173173
{
174174
parameters.Add(DatabricksParameters.EnableDirectResults, testConfiguration.EnableDirectResults!);
175175
}
176+
if (!string.IsNullOrEmpty(testConfiguration.MaxBytesPerFetchRequest))
177+
{
178+
parameters.Add(DatabricksParameters.MaxBytesPerFetchRequest, testConfiguration.MaxBytesPerFetchRequest!);
179+
}
176180
if (testConfiguration.HttpOptions != null)
177181
{
178182
if (testConfiguration.HttpOptions.Tls != null)

0 commit comments

Comments
 (0)