Skip to content

Commit 2dfd110

Browse files
authored
feat(csharp/src/Drivers/Databricks): Added support for user-configurable Fetch heartbeat interval param (apache#3472)
## Description Added support for the Fetch heartbeat interval connection param. The default values are kept same as before. Users can now configure this param. [PECO-2736](https://databricks.atlassian.net/browse/PECO-2736)
1 parent 95fff2a commit 2dfd110

File tree

7 files changed

+171
-3
lines changed

7 files changed

+171
-3
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ internal class DatabricksConnection : SparkHttpConnection
8383
// Identity federation client ID for token exchange
8484
private string? _identityFederationClientId;
8585

86+
// Heartbeat interval configuration
87+
private int _fetchHeartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
88+
89+
// Request timeout configuration
90+
private int _operationStatusRequestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
91+
8692
// Default namespace
8793
private TNamespace? _defaultNamespace;
8894

@@ -386,6 +392,40 @@ private void ValidateProperties()
386392
{
387393
_identityFederationClientId = identityFederationClientId;
388394
}
395+
396+
if (Properties.TryGetValue(DatabricksParameters.FetchHeartbeatInterval, out string? fetchHeartbeatIntervalStr))
397+
{
398+
if (!int.TryParse(fetchHeartbeatIntervalStr, out int fetchHeartbeatIntervalValue))
399+
{
400+
throw new ArgumentException($"Parameter '{DatabricksParameters.FetchHeartbeatInterval}' value '{fetchHeartbeatIntervalStr}' could not be parsed. Valid values are positive integers.");
401+
}
402+
403+
if (fetchHeartbeatIntervalValue <= 0)
404+
{
405+
throw new ArgumentOutOfRangeException(
406+
nameof(Properties),
407+
fetchHeartbeatIntervalValue,
408+
$"Parameter '{DatabricksParameters.FetchHeartbeatInterval}' value must be a positive integer.");
409+
}
410+
_fetchHeartbeatIntervalSeconds = fetchHeartbeatIntervalValue;
411+
}
412+
413+
if (Properties.TryGetValue(DatabricksParameters.OperationStatusRequestTimeout, out string? operationStatusRequestTimeoutStr))
414+
{
415+
if (!int.TryParse(operationStatusRequestTimeoutStr, out int operationStatusRequestTimeoutValue))
416+
{
417+
throw new ArgumentException($"Parameter '{DatabricksParameters.OperationStatusRequestTimeout}' value '{operationStatusRequestTimeoutStr}' could not be parsed. Valid values are positive integers.");
418+
}
419+
420+
if (operationStatusRequestTimeoutValue <= 0)
421+
{
422+
throw new ArgumentOutOfRangeException(
423+
nameof(Properties),
424+
operationStatusRequestTimeoutValue,
425+
$"Parameter '{DatabricksParameters.OperationStatusRequestTimeout}' value must be a positive integer.");
426+
}
427+
_operationStatusRequestTimeoutSeconds = operationStatusRequestTimeoutValue;
428+
}
389429
}
390430

391431
/// <summary>
@@ -428,6 +468,16 @@ protected internal override bool TrySetGetDirectResults(IRequest request)
428468
/// </summary>
429469
internal TNamespace? DefaultNamespace => _defaultNamespace;
430470

471+
/// <summary>
472+
/// Gets the heartbeat interval in seconds for long-running operations.
473+
/// </summary>
474+
internal int FetchHeartbeatIntervalSeconds => _fetchHeartbeatIntervalSeconds;
475+
476+
/// <summary>
477+
/// Gets the request timeout in seconds for operation status polling requests.
478+
/// </summary>
479+
internal int OperationStatusRequestTimeoutSeconds => _operationStatusRequestTimeoutSeconds;
480+
431481
/// <summary>
432482
/// Gets whether multiple catalog is supported
433483
/// </summary>

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,22 @@ public class DatabricksParameters : SparkParameters
228228
/// Default value is false if not specified.
229229
/// </summary>
230230
public const string DriverConfigTakePrecedence = "adbc.databricks.driver_config_take_precedence";
231+
232+
/// <summary>
233+
/// The interval in seconds for heartbeat polling during long-running operations.
234+
/// This prevents queries from timing out by periodically checking operation status.
235+
/// Default value is 60 seconds if not specified.
236+
/// Must be a positive integer value.
237+
/// </summary>
238+
public const string FetchHeartbeatInterval = "adbc.databricks.fetch_heartbeat_interval";
239+
240+
/// <summary>
241+
/// The timeout in seconds for operation status polling requests.
242+
/// This controls how long to wait for each individual polling request to complete.
243+
/// Default value is 30 seconds if not specified.
244+
/// Must be a positive integer value.
245+
/// </summary>
246+
public const string OperationStatusRequestTimeout = "adbc.databricks.operation_status_request_timeout";
231247
}
232248

233249
/// <summary>
@@ -236,12 +252,12 @@ public class DatabricksParameters : SparkParameters
236252
public class DatabricksConstants
237253
{
238254
/// <summary>
239-
/// Default heartbeat interval in seconds for long-running operations. TODO: make this user-configurable
255+
/// Default heartbeat interval in seconds for long-running operations.
240256
/// </summary>
241257
public const int DefaultOperationStatusPollingIntervalSeconds = 60;
242258

243259
/// <summary>
244-
/// Default timeout in seconds for operation status polling requests. TODO: make this user-configurable
260+
/// Default timeout in seconds for operation status polling requests.
245261
/// </summary>
246262
public const int DefaultOperationStatusRequestTimeoutSeconds = 30;
247263

csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ internal DatabricksCompositeReader(
8080
}
8181
if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
8282
{
83-
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response);
83+
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection());
8484
operationStatusPoller.Start();
8585
}
8686
}
@@ -197,5 +197,41 @@ private void StopOperationStatusPoller()
197197
operationStatusPoller?.Dispose();
198198
operationStatusPoller = null;
199199
}
200+
201+
/// <summary>
202+
/// Gets the heartbeat interval from the statement's connection.
203+
/// </summary>
204+
/// <returns>The heartbeat interval in seconds, or default if not available.</returns>
205+
private int GetHeartbeatIntervalFromConnection()
206+
{
207+
if (_statement is DatabricksStatement databricksStatement)
208+
{
209+
var connection = databricksStatement.Connection;
210+
if (connection is DatabricksConnection databricksConnection)
211+
{
212+
return databricksConnection.FetchHeartbeatIntervalSeconds;
213+
}
214+
}
215+
216+
return DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds;
217+
}
218+
219+
/// <summary>
220+
/// Gets the request timeout from the statement's connection.
221+
/// </summary>
222+
/// <returns>The request timeout in seconds, or default if not available.</returns>
223+
private int GetRequestTimeoutFromConnection()
224+
{
225+
if (_statement is DatabricksStatement databricksStatement)
226+
{
227+
var connection = databricksStatement.Connection;
228+
if (connection is DatabricksConnection databricksConnection)
229+
{
230+
return databricksConnection.OperationStatusRequestTimeoutSeconds;
231+
}
232+
}
233+
234+
return DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds;
235+
}
200236
}
201237
}

csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,16 @@ public InvalidConnectionParametersTestData()
325325
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TracePropagationEnabled] = "notabool" }, typeof(ArgumentException)));
326326
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TraceParentHeaderName] = "" }, typeof(ArgumentException)));
327327
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.TraceStateEnabled] = "notabool" }, typeof(ArgumentException)));
328+
329+
// Tests for fetch heartbeat interval parameter
330+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.FetchHeartbeatInterval] = "notanumber" }, typeof(ArgumentException)));
331+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.FetchHeartbeatInterval] = "0" }, typeof(ArgumentOutOfRangeException)));
332+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.FetchHeartbeatInterval] = "-1" }, typeof(ArgumentOutOfRangeException)));
333+
334+
// Tests for operation status request timeout parameter
335+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.OperationStatusRequestTimeout] = "notanumber" }, typeof(ArgumentException)));
336+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.OperationStatusRequestTimeout] = "0" }, typeof(ArgumentOutOfRangeException)));
337+
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [DatabricksParameters.OperationStatusRequestTimeout] = "-1" }, typeof(ArgumentOutOfRangeException)));
328338
}
329339
}
330340

csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public class DatabricksTestConfiguration : SparkTestConfiguration
5555
[JsonPropertyName("enableRunAsyncInThriftOp"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
5656
public string EnableRunAsyncInThriftOp { get; set; } = string.Empty;
5757

58+
[JsonPropertyName("fetchHeartbeatInterval"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
59+
public string FetchHeartbeatInterval { get; set; } = string.Empty;
60+
61+
[JsonPropertyName("operationStatusRequestTimeout"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
62+
public string OperationStatusRequestTimeout { get; set; } = string.Empty;
63+
5864
[JsonPropertyName("enableDirectResults"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
5965
public string EnableDirectResults { get; set; } = string.Empty;
6066
}

csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ public override Dictionary<string, string> GetDriverParameters(DatabricksTestCon
141141
{
142142
parameters.Add(ApacheParameters.QueryTimeoutSeconds, testConfiguration.QueryTimeoutSeconds!);
143143
}
144+
if (!string.IsNullOrEmpty(testConfiguration.FetchHeartbeatInterval))
145+
{
146+
parameters.Add(DatabricksParameters.FetchHeartbeatInterval, testConfiguration.FetchHeartbeatInterval!);
147+
}
148+
if (!string.IsNullOrEmpty(testConfiguration.OperationStatusRequestTimeout))
149+
{
150+
parameters.Add(DatabricksParameters.OperationStatusRequestTimeout, testConfiguration.OperationStatusRequestTimeout!);
151+
}
144152
if (!string.IsNullOrEmpty(testConfiguration.EnableMultipleCatalogSupport))
145153
{
146154
parameters.Add(DatabricksParameters.EnableMultipleCatalogSupport, testConfiguration.EnableMultipleCatalogSupport!);

csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,5 +190,47 @@ public async Task StopsPollingOnException()
190190
{
191191
}
192192
}
193+
194+
[Fact]
195+
public async Task UsesCustomHeartbeatInterval()
196+
{
197+
// Arrange
198+
int customHeartbeatInterval = 2; // 2 seconds
199+
using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, customHeartbeatInterval);
200+
var pollCount = 0;
201+
_mockClient.Setup(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()))
202+
.ReturnsAsync(new TGetOperationStatusResp())
203+
.Callback(() => pollCount++);
204+
205+
// Act
206+
poller.Start();
207+
await Task.Delay(TimeSpan.FromSeconds(customHeartbeatInterval * 3)); // Wait for 3 intervals
208+
209+
// Assert
210+
Assert.True(pollCount > 0, "Should have polled at least once");
211+
_mockClient.Verify(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()), Times.AtLeastOnce);
212+
}
213+
214+
[Fact]
215+
public async Task UsesCustomRequestTimeout()
216+
{
217+
// Arrange
218+
int customRequestTimeout = 5;
219+
using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds, customRequestTimeout);
220+
var pollCount = 0;
221+
222+
_mockClient.Setup(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()))
223+
.ReturnsAsync(new TGetOperationStatusResp())
224+
.Callback(() => pollCount++);
225+
226+
// Act
227+
poller.Start();
228+
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 2));
229+
230+
// Assert: This test verifies that the poller can be instantiated with a custom request timeout
231+
232+
Assert.True(pollCount > 0, "Should have polled at least once");
233+
_mockClient.Verify(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()), Times.AtLeastOnce);
234+
}
193235
}
194236
}

0 commit comments

Comments
 (0)