Skip to content

Commit fe6c5e8

Browse files
committed
Added support for connection param of maxBytesPerFetchRequest
1 parent 11f92d0 commit fe6c5e8

File tree

8 files changed

+252
-3
lines changed

8 files changed

+252
-3
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ internal class DatabricksConnection : SparkHttpConnection
6161

6262
internal static TSparkGetDirectResults defaultGetDirectResults = new()
6363
{
64-
MaxRows = 2000000,
65-
MaxBytes = 404857600
64+
MaxRows = 1000,
65+
MaxBytes = DefaultMaxBytesPerFetchRequest
6666
};
6767

6868
// CloudFetch configuration
@@ -71,6 +71,8 @@ internal class DatabricksConnection : SparkHttpConnection
7171
private bool _useCloudFetch = true;
7272
private bool _canDecompressLz4 = true;
7373
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
74+
private const long DefaultMaxBytesPerFetchRequest = 300 * 1024 * 1024; // 300MB
75+
private long _maxBytesPerFetchRequest = DefaultMaxBytesPerFetchRequest;
7476
private const bool DefaultRetryOnUnavailable = true;
7577
private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
7678
private bool _useDescTableExtended = true;
@@ -317,6 +319,26 @@ private void ValidateProperties()
317319
_maxBytesPerFile = maxBytesPerFileValue;
318320
}
319321

322+
if (Properties.TryGetValue(DatabricksParameters.MaxBytesPerFetchRequest, out string? maxBytesPerFetchRequestStr))
323+
{
324+
try
325+
{
326+
long maxBytesPerFetchRequestValue = ParseBytesWithUnits(maxBytesPerFetchRequestStr);
327+
if (maxBytesPerFetchRequestValue < 0)
328+
{
329+
throw new ArgumentOutOfRangeException(
330+
nameof(Properties),
331+
maxBytesPerFetchRequestValue,
332+
$"Parameter '{DatabricksParameters.MaxBytesPerFetchRequest}' value must be a non-negative integer. Use 0 for no limit.");
333+
}
334+
_maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
335+
}
336+
catch (FormatException)
337+
{
338+
throw new ArgumentException($"Parameter '{DatabricksParameters.MaxBytesPerFetchRequest}' value '{maxBytesPerFetchRequestStr}' could not be parsed. Valid formats: number with optional unit suffix (B, KB, MB, GB). Examples: '300MB', '1024KB', '1073741824'.");
339+
}
340+
}
341+
320342
// Parse default namespace
321343
string? defaultCatalog = null;
322344
string? defaultSchema = null;
@@ -403,7 +425,13 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
403425
{
404426
if (EnableDirectResults)
405427
{
406-
return base.TrySetGetDirectResults(request);
428+
// Use the configured MaxBytesPerFetchRequest for direct results
429+
request.GetDirectResults = new TSparkGetDirectResults
430+
{
431+
MaxRows = defaultGetDirectResults.MaxRows,
432+
MaxBytes = _maxBytesPerFetchRequest
433+
};
434+
return true;
407435
}
408436
return false;
409437
}
@@ -423,6 +451,11 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
423451
/// </summary>
424452
internal long MaxBytesPerFile => _maxBytesPerFile;
425453

454+
/// <summary>
455+
/// Gets the maximum bytes per fetch request.
456+
/// </summary>
457+
internal long MaxBytesPerFetchRequest => _maxBytesPerFetchRequest;
458+
426459
/// <summary>
427460
/// Gets the default namespace to use for SQL queries.
428461
/// </summary>
@@ -707,6 +740,61 @@ private string EscapeSqlString(string value)
707740
return "`" + value.Replace("`", "``") + "`";
708741
}
709742

743+
/// <summary>
744+
/// Parses a byte value that may include unit suffixes (B, KB, MB, GB).
745+
/// </summary>
746+
/// <param name="value">The value to parse, e.g., "300MB", "1024KB", "1073741824"</param>
747+
/// <returns>The value in bytes</returns>
748+
/// <exception cref="FormatException">Thrown when the value cannot be parsed</exception>
749+
private static long ParseBytesWithUnits(string value)
750+
{
751+
if (string.IsNullOrWhiteSpace(value))
752+
{
753+
throw new FormatException("Value cannot be null or empty");
754+
}
755+
756+
value = value.Trim().ToUpperInvariant();
757+
758+
// Check for unit suffixes
759+
long multiplier = 1;
760+
string numberPart = value;
761+
762+
if (value.EndsWith("GB"))
763+
{
764+
multiplier = 1024L * 1024L * 1024L;
765+
numberPart = value.Substring(0, value.Length - 2);
766+
}
767+
else if (value.EndsWith("MB"))
768+
{
769+
multiplier = 1024L * 1024L;
770+
numberPart = value.Substring(0, value.Length - 2);
771+
}
772+
else if (value.EndsWith("KB"))
773+
{
774+
multiplier = 1024L;
775+
numberPart = value.Substring(0, value.Length - 2);
776+
}
777+
else if (value.EndsWith("B"))
778+
{
779+
multiplier = 1L;
780+
numberPart = value.Substring(0, value.Length - 1);
781+
}
782+
783+
if (!long.TryParse(numberPart.Trim(), out long number))
784+
{
785+
throw new FormatException($"Invalid number format: {numberPart}");
786+
}
787+
788+
try
789+
{
790+
return checked(number * multiplier);
791+
}
792+
catch (OverflowException)
793+
{
794+
throw new FormatException($"Value {value} results in overflow when converted to bytes");
795+
}
796+
}
797+
710798
protected override void ValidateOptions()
711799
{
712800
base.ValidateOptions();

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ public class DatabricksParameters : SparkParameters
131131
/// </summary>
132132
public const string CloudFetchPrefetchEnabled = "adbc.databricks.cloudfetch.prefetch_enabled";
133133

134+
/// <summary>
135+
/// Maximum bytes per fetch request when retrieving query results from servers.
136+
/// The value can be specified with unit suffixes: B (bytes), KB (kilobytes), MB (megabytes), GB (gigabytes).
137+
/// If no unit is specified, the value is treated as bytes.
138+
/// Default value is 300MB if not specified.
139+
/// </summary>
140+
public const string MaxBytesPerFetchRequest = "adbc.databricks.max_bytes_per_fetch_request";
141+
134142
/// <summary>
135143
/// The OAuth grant type to use for authentication.
136144
/// Supported values:

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement
3939
private bool useCloudFetch;
4040
private bool canDecompressLz4;
4141
private long maxBytesPerFile;
42+
private long maxBytesPerFetchRequest;
4243
private bool enableMultipleCatalogSupport;
4344
private bool enablePKFK;
4445
private bool runAsyncInThrift;
@@ -61,6 +62,7 @@ public DatabricksStatement(DatabricksConnection connection)
6162
useCloudFetch = connection.UseCloudFetch;
6263
canDecompressLz4 = connection.CanDecompressLz4;
6364
maxBytesPerFile = connection.MaxBytesPerFile;
65+
maxBytesPerFetchRequest = connection.MaxBytesPerFetchRequest;
6466
enableMultipleCatalogSupport = connection.EnableMultipleCatalogSupport;
6567
enablePKFK = connection.EnablePKFK;
6668

@@ -164,6 +166,17 @@ public override void SetOption(string key, string value)
164166
throw new ArgumentException($"Invalid value for {key}: {value}. Expected a long value.");
165167
}
166168
break;
169+
case DatabricksParameters.MaxBytesPerFetchRequest:
170+
try
171+
{
172+
long maxBytesPerFetchRequestValue = ParseBytesWithUnits(value);
173+
this.maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
174+
}
175+
catch (FormatException)
176+
{
177+
throw new ArgumentException($"Invalid value for {key}: {value}. Valid formats: number with optional unit suffix (B, KB, MB, GB). Examples: '300MB', '1024KB', '1073741824'.");
178+
}
179+
break;
167180
default:
168181
base.SetOption(key, value);
169182
break;
@@ -194,6 +207,11 @@ internal void SetUseCloudFetch(bool useCloudFetch)
194207
/// </summary>
195208
public bool CanDecompressLz4 => canDecompressLz4;
196209

210+
/// <summary>
211+
/// Gets the maximum bytes per fetch request.
212+
/// </summary>
213+
public long MaxBytesPerFetchRequest => maxBytesPerFetchRequest;
214+
197215
/// <summary>
198216
/// Sets whether the client can decompress LZ4 compressed results.
199217
/// </summary>
@@ -679,6 +697,61 @@ private static IArrowArray[] CreateColumnMetadataEmptyArray()
679697
];
680698
}
681699

700+
/// <summary>
701+
/// Parses a byte value that may include unit suffixes (B, KB, MB, GB).
702+
/// </summary>
703+
/// <param name="value">The value to parse, e.g., "300MB", "1024KB", "1073741824"</param>
704+
/// <returns>The value in bytes</returns>
705+
/// <exception cref="FormatException">Thrown when the value cannot be parsed</exception>
706+
private static long ParseBytesWithUnits(string value)
707+
{
708+
if (string.IsNullOrWhiteSpace(value))
709+
{
710+
throw new FormatException("Value cannot be null or empty");
711+
}
712+
713+
value = value.Trim().ToUpperInvariant();
714+
715+
// Check for unit suffixes
716+
long multiplier = 1;
717+
string numberPart = value;
718+
719+
if (value.EndsWith("GB"))
720+
{
721+
multiplier = 1024L * 1024L * 1024L;
722+
numberPart = value.Substring(0, value.Length - 2);
723+
}
724+
else if (value.EndsWith("MB"))
725+
{
726+
multiplier = 1024L * 1024L;
727+
numberPart = value.Substring(0, value.Length - 2);
728+
}
729+
else if (value.EndsWith("KB"))
730+
{
731+
multiplier = 1024L;
732+
numberPart = value.Substring(0, value.Length - 2);
733+
}
734+
else if (value.EndsWith("B"))
735+
{
736+
multiplier = 1L;
737+
numberPart = value.Substring(0, value.Length - 1);
738+
}
739+
740+
if (!long.TryParse(numberPart.Trim(), out long number))
741+
{
742+
throw new FormatException($"Invalid number format: {numberPart}");
743+
}
744+
745+
try
746+
{
747+
return checked(number * multiplier);
748+
}
749+
catch (OverflowException)
750+
{
751+
throw new FormatException($"Value {value} results in overflow when converted to bytes");
752+
}
753+
}
754+
682755
private QueryResult CreateExtendedColumnsResult(Schema columnMetadataSchema, DescTableExtendedResult descResult)
683756
{
684757
var allFields = new List<Field>(columnMetadataSchema.FieldsList);

csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ private BaseDatabricksReader DetermineReader(TFetchResultsResp initialResults)
123123
// Make a FetchResults call to get the initial result set
124124
// and determine the reader based on the result set
125125
TFetchResultsReq request = new TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, this._statement.BatchSize);
126+
127+
// Set MaxBytes from DatabricksStatement
128+
if (this._statement is DatabricksStatement databricksStatement)
129+
{
130+
request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
131+
}
132+
126133
TFetchResultsResp response = await this._statement.Client!.FetchResults(request, cancellationToken);
127134
_activeReader = DetermineReader(response);
128135
}

csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ public DatabricksReader(IHiveServer2Statement statement, Schema schema, IRespons
8383
}
8484
// TODO: use an expiring cancellationtoken
8585
TFetchResultsReq request = new TFetchResultsReq(this.response.OperationHandle!, TFetchOrientation.FETCH_NEXT, this.statement.BatchSize);
86+
87+
// Set MaxBytes from DatabricksStatement
88+
if (this.statement is DatabricksStatement databricksStatement)
89+
{
90+
request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
91+
}
92+
8693
TFetchResultsResp response = await this.statement.Connection.Client!.FetchResults(request, cancellationToken);
8794

8895
// Make sure we get the arrowBatches

csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,62 @@ internal void CanDetectConnectionParameterErrors(ParametersWithExceptions test)
5656
OutputHelper?.WriteLine(exception.Message);
5757
}
5858

59+
/// <summary>
60+
/// Validates that MaxBytesPerFetchRequest parameter accepts valid values with unit suffixes.
61+
/// </summary>
62+
[SkippableTheory]
63+
[InlineData("300MB", 300L * 1024L * 1024L)]
64+
[InlineData("1GB", 1024L * 1024L * 1024L)]
65+
[InlineData("512KB", 512L * 1024L)]
66+
[InlineData("1024B", 1024L)]
67+
[InlineData("1024", 1024L)]
68+
[InlineData("0", 0L)]
69+
internal void CanParseMaxBytesPerFetchRequestParameter(string parameterValue, long expectedBytes)
70+
{
71+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
72+
var parameters = TestEnvironment.GetDriverParameters(testConfig);
73+
parameters[DatabricksParameters.MaxBytesPerFetchRequest] = parameterValue;
74+
75+
AdbcDriver driver = NewDriver;
76+
AdbcDatabase database = driver.Open(parameters);
77+
78+
// This should not throw an exception
79+
using var connection = database.Connect(parameters);
80+
81+
// Verify the parameter was parsed correctly by creating a statement and checking the property
82+
using var statement = connection.CreateStatement();
83+
if (statement is DatabricksStatement databricksStatement)
84+
{
85+
Assert.Equal(expectedBytes, databricksStatement.MaxBytesPerFetchRequest);
86+
}
87+
}
88+
89+
/// <summary>
90+
/// Validates that MaxBytesPerFetchRequest parameter can be set via test configuration.
91+
/// </summary>
92+
[SkippableTheory]
93+
[InlineData("500MB", 500L * 1024L * 1024L)]
94+
[InlineData("2GB", 2L * 1024L * 1024L * 1024L)]
95+
internal void CanSetMaxBytesPerFetchRequestViaTestConfiguration(string configValue, long expectedBytes)
96+
{
97+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
98+
testConfig.MaxBytesPerFetchRequest = configValue;
99+
var parameters = TestEnvironment.GetDriverParameters(testConfig);
100+
101+
AdbcDriver driver = NewDriver;
102+
AdbcDatabase database = driver.Open(parameters);
103+
104+
// This should not throw an exception
105+
using var connection = database.Connect(parameters);
106+
107+
// Verify the parameter was set correctly via test configuration
108+
using var statement = connection.CreateStatement();
109+
if (statement is DatabricksStatement databricksStatement)
110+
{
111+
Assert.Equal(expectedBytes, databricksStatement.MaxBytesPerFetchRequest);
112+
}
113+
}
114+
59115
/// <summary>
60116
/// Tests connection timeout to establish a session with the backend.
61117
/// </summary>
@@ -304,6 +360,9 @@ public InvalidConnectionParametersTestData()
304360
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.CanDecompressLz4] = "notabool"}, typeof(ArgumentException)));
305361
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "notanumber" }, typeof(ArgumentException)));
306362
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFile] = "-100" }, typeof(ArgumentOutOfRangeException)));
363+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "notanumber" }, typeof(ArgumentException)));
364+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "-100" }, typeof(ArgumentOutOfRangeException)));
365+
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.MaxBytesPerFetchRequest] = "invalid_unit" }, typeof(ArgumentException)));
307366
Add(new(new() { [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [DatabricksParameters.EnableDirectResults] = "notabool" }, typeof(ArgumentException)));
308367
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = "-1" }, typeof(ArgumentOutOfRangeException)));
309368
Add(new(new() { /*[SparkParameters.Type] = SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] = IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) }, typeof(ArgumentOutOfRangeException)));

csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,8 @@ public class DatabricksTestConfiguration : SparkTestConfiguration
5757

5858
[JsonPropertyName("enableDirectResults"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
5959
public string EnableDirectResults { get; set; } = string.Empty;
60+
61+
[JsonPropertyName("maxBytesPerFetchRequest"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
62+
public string MaxBytesPerFetchRequest { get; set; } = string.Empty;
6063
}
6164
}

csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ public override Dictionary<string, string> GetDriverParameters(DatabricksTestCon
165165
{
166166
parameters.Add(DatabricksParameters.EnableDirectResults, testConfiguration.EnableDirectResults!);
167167
}
168+
if (!string.IsNullOrEmpty(testConfiguration.MaxBytesPerFetchRequest))
169+
{
170+
parameters.Add(DatabricksParameters.MaxBytesPerFetchRequest, testConfiguration.MaxBytesPerFetchRequest!);
171+
}
168172
if (testConfiguration.HttpOptions != null)
169173
{
170174
if (testConfiguration.HttpOptions.Tls != null)

0 commit comments

Comments
 (0)