Skip to content

Commit 6a60a13

Browse files
authored
feat(csharp/src/Drivers/Databricks): Support server side property passthrough (apache#2692)
We want to be able to pass server side properties through the driver, e.g. use_cached_result. In ODBC driver, the behavior is like this: - If ApplySSPWithQueries = 1, set server side properties using "set x=y" commands during open session - If ApplySSPWithQueries = 0, set server side properties using thrift configuration This PR adds support in the ADBC driver for this Tested E2E by setting ``` { "adbc.databricks.apply_ssp_with_queries", "false" }, { "adbc.databricks.SSP_use_cached_result", "false"}, ``` which disabled result cache for the executed query. When using `{ "adbc.databricks.apply_ssp_with_queries", "true" }`, I see a set query in the query history <img width="620" alt="image" src="https://github.com/user-attachments/assets/cabb2708-04d7-40e7-a120-cd956c527fb8" />
1 parent 6027c11 commit 6a60a13

File tree

4 files changed

+196
-0
lines changed

4 files changed

+196
-0
lines changed

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
using System;
1919
using System.Collections.Generic;
20+
using System.Diagnostics;
21+
using System.Linq;
2022
using System.Threading;
2123
using System.Threading.Tasks;
2224
using Apache.Arrow.Adbc.Drivers.Apache;
@@ -29,10 +31,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
2931
{
3032
internal class DatabricksConnection : SparkHttpConnection
3133
{
34+
private bool _applySSPWithQueries = false;
35+
3236
public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
3337
{
38+
if (Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? applySSPWithQueriesStr) &&
39+
bool.TryParse(applySSPWithQueriesStr, out bool applySSPWithQueriesValue))
40+
{
41+
_applySSPWithQueries = applySSPWithQueriesValue;
42+
}
3443
}
3544

45+
/// <summary>
46+
/// Gets whether server side properties should be applied using queries.
47+
/// </summary>
48+
internal bool ApplySSPWithQueries => _applySSPWithQueries;
49+
3650
internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
3751
{
3852
// Get result format from metadata response if available
@@ -86,9 +100,90 @@ protected override TOpenSessionReq CreateSessionRequest()
86100
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
87101
CanUseMultipleCatalogs = true,
88102
};
103+
104+
// If not using queries to set server-side properties, include them in Configuration
105+
if (!_applySSPWithQueries)
106+
{
107+
req.Configuration = new Dictionary<string, string>();
108+
var serverSideProperties = GetServerSideProperties();
109+
foreach (var property in serverSideProperties)
110+
{
111+
req.Configuration[property.Key] = property.Value;
112+
}
113+
}
89114
return req;
90115
}
91116

117+
/// <summary>
118+
/// Gets a dictionary of server-side properties extracted from connection properties.
119+
/// </summary>
120+
/// <returns>Dictionary of server-side properties with prefix removed from keys.</returns>
121+
private Dictionary<string, string> GetServerSideProperties()
122+
{
123+
return Properties
124+
.Where(p => p.Key.StartsWith(DatabricksParameters.ServerSidePropertyPrefix))
125+
.ToDictionary(
126+
p => p.Key.Substring(DatabricksParameters.ServerSidePropertyPrefix.Length),
127+
p => p.Value
128+
);
129+
}
130+
131+
/// <summary>
132+
/// Applies server-side properties by executing "set key=value" queries.
133+
/// </summary>
134+
/// <returns>A task representing the asynchronous operation.</returns>
135+
public async Task ApplyServerSidePropertiesAsync()
136+
{
137+
if (!_applySSPWithQueries)
138+
{
139+
return;
140+
}
141+
142+
var serverSideProperties = GetServerSideProperties();
143+
144+
if (serverSideProperties.Count == 0)
145+
{
146+
return;
147+
}
148+
149+
using var statement = new DatabricksStatement(this);
150+
151+
foreach (var property in serverSideProperties)
152+
{
153+
if (!IsValidPropertyName(property.Key))
154+
{
155+
Debug.WriteLine($"Skipping invalid property name: {property.Key}");
156+
continue;
157+
}
158+
159+
string escapedValue = EscapeSqlString(property.Value);
160+
string query = $"SET {property.Key}={escapedValue}";
161+
statement.SqlQuery = query;
162+
163+
try
164+
{
165+
await statement.ExecuteUpdateAsync();
166+
}
167+
catch (Exception ex)
168+
{
169+
Debug.WriteLine($"Error setting server-side property '{property.Key}': {ex.Message}");
170+
}
171+
}
172+
}
173+
174+
private bool IsValidPropertyName(string propertyName)
175+
{
176+
// Allow only letters and underscores in property names
177+
return System.Text.RegularExpressions.Regex.IsMatch(
178+
propertyName,
179+
@"^[a-zA-Z_]+$");
180+
}
181+
182+
private string EscapeSqlString(string value)
183+
{
184+
return "`" + value.Replace("`", "``") + "`";
185+
}
186+
92187
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
93188
Task.FromResult(response.DirectResults.ResultSetMetadata);
94189
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>

csharp/src/Drivers/Databricks/DatabricksDatabase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public override AdbcConnection Connect(IReadOnlyDictionary<string, string>? opti
4141
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
4242
DatabricksConnection connection = new DatabricksConnection(mergedProperties);
4343
connection.OpenAsync().Wait();
44+
connection.ApplyServerSidePropertiesAsync().Wait();
4445
return connection;
4546
}
4647
}

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ public class DatabricksParameters : SparkParameters
4242
/// Default value is 5 minutes if not specified.
4343
/// </summary>
4444
public const string CloudFetchTimeoutMinutes = "adbc.databricks.cloudfetch.timeout_minutes";
45+
46+
/// <summary>
47+
/// Whether to apply service side properties (SSP) with queries. If false, SSP will be applied
48+
/// by setting the Thrift configuration when the session is opened.
49+
/// Default value is false if not specified.
50+
/// </summary>
51+
public const string ApplySSPWithQueries = "adbc.databricks.apply_ssp_with_queries";
52+
53+
/// <summary>
54+
/// Prefix for server-side properties. Properties with this prefix will be passed to the server
55+
/// by executing a "set key=value" query when opening a session.
56+
/// For example, a property with key "adbc.databricks.SSP_use_cached_result"
57+
/// and value "true" will result in executing "set use_cached_result=true" on the server.
58+
/// </summary>
59+
public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
4560
}
4661

4762
/// <summary>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Threading.Tasks;
21+
using Apache.Arrow.Adbc.Drivers.Databricks;
22+
using Xunit;
23+
using Xunit.Abstractions;
24+
25+
namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
26+
{
27+
/// <summary>
28+
/// End-to-end tests for the server-side property passthrough feature in the Databricks ADBC driver.
29+
/// </summary>
30+
public class ServerSidePropertyE2ETest : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>
31+
{
32+
public ServerSidePropertyE2ETest(ITestOutputHelper? outputHelper)
33+
: base(outputHelper, new DatabricksTestEnvironment.Factory())
34+
{
35+
// Skip the test if the DATABRICKS_TEST_CONFIG_FILE environment variable is not set
36+
Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
37+
}
38+
39+
/// <summary>
40+
/// Tests setting server-side properties.
41+
/// </summary>
42+
[Theory]
43+
[InlineData(true)]
44+
[InlineData(false)]
45+
public async Task TestServerSideProperty(bool applyWithQueries)
46+
{
47+
var additionalConnectionParams = new Dictionary<string, string>()
48+
{
49+
[DatabricksParameters.ServerSidePropertyPrefix + "use_cached_result"] = "false",
50+
[DatabricksParameters.ServerSidePropertyPrefix + "statement_timeout"] = "12345",
51+
[DatabricksParameters.ApplySSPWithQueries] = applyWithQueries.ToString().ToLower()
52+
};
53+
using var connection = NewConnection(TestConfiguration, additionalConnectionParams);
54+
55+
// Verify the server-side property was set by querying it
56+
using var statement = connection.CreateStatement();
57+
statement.SqlQuery = "SET";
58+
59+
var result = await statement.ExecuteQueryAsync();
60+
Assert.NotNull(result.Stream);
61+
62+
var batch = await result.Stream.ReadNextRecordBatchAsync();
63+
Assert.NotNull(batch);
64+
Assert.True(batch.Length > 0);
65+
Assert.Equal(2, batch.ColumnCount);
66+
67+
var returnedProperties = new Dictionary<string, string>();
68+
var keys = (StringArray)batch.Column(0);
69+
var values = (StringArray)batch.Column(1);
70+
for (int i = 0; i < batch.Length; i++)
71+
{
72+
string key = keys.GetString(i);
73+
string value = values.GetString(i);
74+
returnedProperties[key] = value;
75+
Console.WriteLine($"Property: {key} = {value}");
76+
}
77+
78+
Assert.True(returnedProperties.ContainsKey("use_cached_result"));
79+
Assert.Equal("false", returnedProperties["use_cached_result"]);
80+
81+
Assert.True(returnedProperties.ContainsKey("statement_timeout"));
82+
Assert.Equal("12345", returnedProperties["statement_timeout"]);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)