diff --git a/csharp/src/DatabricksParameters.cs b/csharp/src/DatabricksParameters.cs index b68c24fc..26ea2986 100644 --- a/csharp/src/DatabricksParameters.cs +++ b/csharp/src/DatabricksParameters.cs @@ -357,6 +357,15 @@ public class DatabricksParameters : SparkParameters /// public const string EnableSessionManagement = "adbc.databricks.rest.enable_session_management"; + /// + /// Whether to use preview API endpoints for Statement Execution API. + /// When true, uses "/2.0/sql/..." paths (without /api prefix). + /// When false (default), uses "/api/2.0/sql/..." paths. + /// Default: false + /// Only applicable when Protocol is "rest". + /// + public const string UsePreviewEndpoint = "adbc.databricks.rest.use_preview_endpoint"; + /// /// Whether to enable the feature flag cache for fetching remote configuration from the server. /// When enabled, the driver fetches feature flags from the Databricks server and merges them with local properties. diff --git a/csharp/src/StatementExecution/StatementExecutionClient.cs b/csharp/src/StatementExecution/StatementExecutionClient.cs index 7b169338..8439a850 100644 --- a/csharp/src/StatementExecution/StatementExecutionClient.cs +++ b/csharp/src/StatementExecution/StatementExecutionClient.cs @@ -98,6 +98,11 @@ internal class StatementExecutionClient : IStatementExecutionClient private const string SessionsEndpoint = "/api/2.0/sql/sessions"; private const string StatementsEndpoint = "/api/2.0/sql/statements"; + private const string PreviewSessionsEndpoint = "/2.0/preview/sql/sessions"; + private const string PreviewStatementsEndpoint = "/2.0/preview/sql/statements"; + + private readonly string _sessionsEndpoint; + private readonly string _statementsEndpoint; // JSON serialization options - ignore null values when writing private static readonly JsonSerializerOptions s_jsonOptions = new JsonSerializerOptions @@ -110,7 +115,8 @@ internal class StatementExecutionClient : IStatementExecutionClient /// /// The HTTP client to use for requests. /// The Databricks workspace host. - public StatementExecutionClient(HttpClient httpClient, string host) + /// When true, uses /2.0/sql/... paths instead of /api/2.0/sql/... paths. + public StatementExecutionClient(HttpClient httpClient, string host, bool usePreviewEndpoint = false) { _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); @@ -127,6 +133,8 @@ public StatementExecutionClient(HttpClient httpClient, string host) } _baseUrl = host; + _sessionsEndpoint = usePreviewEndpoint ? PreviewSessionsEndpoint : SessionsEndpoint; + _statementsEndpoint = usePreviewEndpoint ? PreviewStatementsEndpoint : StatementsEndpoint; } /// @@ -144,7 +152,7 @@ public async Task CreateSessionAsync( throw new ArgumentNullException(nameof(request)); } - var url = $"{_baseUrl}{SessionsEndpoint}"; + var url = $"{_baseUrl}{_sessionsEndpoint}"; var jsonContent = JsonSerializer.Serialize(request, s_jsonOptions); var content = new StringContent(jsonContent, Encoding.UTF8, "application/json"); @@ -188,7 +196,7 @@ public async Task DeleteSessionAsync(string sessionId, string warehouseId, Cance } // Databricks requires warehouse_id as query parameter even for DELETE - var url = $"{_baseUrl}{SessionsEndpoint}/{sessionId}?warehouse_id={Uri.EscapeDataString(warehouseId)}"; + var url = $"{_baseUrl}{_sessionsEndpoint}/{sessionId}?warehouse_id={Uri.EscapeDataString(warehouseId)}"; var httpRequest = new HttpRequestMessage(HttpMethod.Delete, url); var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); @@ -211,7 +219,7 @@ public async Task ExecuteStatementAsync( throw new ArgumentNullException(nameof(request)); } - var url = $"{_baseUrl}{StatementsEndpoint}"; + var url = $"{_baseUrl}{_statementsEndpoint}"; var jsonContent = JsonSerializer.Serialize(request, s_jsonOptions); var content = new StringContent(jsonContent, Encoding.UTF8, "application/json"); @@ -266,7 +274,7 @@ public async Task GetStatementAsync( throw new ArgumentException("Statement ID cannot be null or whitespace.", nameof(statementId)); } - var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}"; + var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}"; var httpRequest = new HttpRequestMessage(HttpMethod.Get, url); var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); @@ -306,7 +314,7 @@ public async Task GetResultChunkAsync( throw new ArgumentException("Chunk index must be non-negative.", nameof(chunkIndex)); } - var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}/result/chunks/{chunkIndex}"; + var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}/result/chunks/{chunkIndex}"; var httpRequest = new HttpRequestMessage(HttpMethod.Get, url); var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); @@ -337,7 +345,7 @@ public async Task CancelStatementAsync(string statementId, CancellationToken can throw new ArgumentException("Statement ID cannot be null or whitespace.", nameof(statementId)); } - var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}/cancel"; + var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}/cancel"; var httpRequest = new HttpRequestMessage(HttpMethod.Post, url); var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); @@ -360,7 +368,7 @@ public async Task CloseStatementAsync(string statementId, CancellationToken canc } // Databricks uses DELETE on /statements/{statement_id}, not POST to /close - var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}"; + var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}"; var httpRequest = new HttpRequestMessage(HttpMethod.Delete, url); var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); diff --git a/csharp/src/StatementExecution/StatementExecutionConnection.cs b/csharp/src/StatementExecution/StatementExecutionConnection.cs index c49c69dd..285e4a58 100644 --- a/csharp/src/StatementExecution/StatementExecutionConnection.cs +++ b/csharp/src/StatementExecution/StatementExecutionConnection.cs @@ -241,7 +241,8 @@ private StatementExecutionConnection( _cloudFetchHttpClient = HttpClientFactory.CreateCloudFetchHttpClient(properties); // Create REST API client - _client = new StatementExecutionClient(_httpClient, baseUrl); + bool usePreviewEndpoint = PropertyHelper.GetBooleanPropertyWithValidation(properties, DatabricksParameters.UsePreviewEndpoint, false); + _client = new StatementExecutionClient(_httpClient, baseUrl, usePreviewEndpoint); } /// diff --git a/csharp/test/Unit/StatementExecution/StatementExecutionClientTests.cs b/csharp/test/Unit/StatementExecution/StatementExecutionClientTests.cs index c6769b16..77c4673e 100644 --- a/csharp/test/Unit/StatementExecution/StatementExecutionClientTests.cs +++ b/csharp/test/Unit/StatementExecution/StatementExecutionClientTests.cs @@ -86,6 +86,194 @@ public void Constructor_WithNullHost_ThrowsArgumentException() new StatementExecutionClient(_httpClient, null!)); } + [Fact] + public void Constructor_WithPreviewEndpointTrue_CreatesClient() + { + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + Assert.NotNull(client); + } + + [Fact] + public void Constructor_WithPreviewEndpointFalse_CreatesClient() + { + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: false); + Assert.NotNull(client); + } + + #endregion + + #region Preview Endpoint Tests + + [Fact] + public async Task CreateSessionAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var request = new CreateSessionRequest { WarehouseId = "warehouse-123" }; + var responseJson = JsonSerializer.Serialize(new { session_id = "test-session" }); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson, + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.CreateSessionAsync(request, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal("https://test.databricks.com/2.0/preview/sql/sessions", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task CreateSessionAsync_WithoutPreviewEndpoint_UsesStandardPath() + { + var request = new CreateSessionRequest { WarehouseId = "warehouse-123" }; + var responseJson = JsonSerializer.Serialize(new { session_id = "test-session" }); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson, + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: false); + await client.CreateSessionAsync(request, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal("https://test.databricks.com/api/2.0/sql/sessions", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task DeleteSessionAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var sessionId = "test-session-id"; + var warehouseId = "test-warehouse-id"; + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, "", + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.DeleteSessionAsync(sessionId, warehouseId, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Contains("/2.0/preview/sql/sessions/", capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task ExecuteStatementAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var request = new ExecuteStatementRequest + { + Statement = "SELECT 1", + WarehouseId = "warehouse-123" + }; + var responseJson = JsonSerializer.Serialize(new + { + statement_id = "stmt-123", + status = new { state = "SUCCEEDED" }, + manifest = new { schema = new { columns = new object[0] } }, + result = new { data_array = new object[0] } + }); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson, + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.ExecuteStatementAsync(request, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal("https://test.databricks.com/2.0/preview/sql/statements", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task GetStatementAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var statementId = "stmt-123"; + var responseJson = JsonSerializer.Serialize(new + { + statement_id = statementId, + status = new { state = "SUCCEEDED" }, + manifest = new { schema = new { columns = new object[0] } }, + result = new { data_array = new object[0] } + }); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson, + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.GetStatementAsync(statementId, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task CancelStatementAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var statementId = "stmt-123"; + SetupMockResponse(HttpStatusCode.OK, ""); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, "", + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.CancelStatementAsync(statementId, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}/cancel", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task CloseStatementAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var statementId = "stmt-123"; + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, "", + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.CloseStatementAsync(statementId, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}", + capturedRequest.RequestUri?.ToString()); + } + + [Fact] + public async Task GetResultChunkAsync_WithPreviewEndpoint_UsesPreviewPath() + { + var statementId = "stmt-123"; + var chunkIndex = 2; + var responseJson = JsonSerializer.Serialize(new + { + data_array = new object[0] + }); + HttpRequestMessage? capturedRequest = null; + + SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson, + req => capturedRequest = req, + _ => { }); + + var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true); + await client.GetResultChunkAsync(statementId, chunkIndex, CancellationToken.None); + + Assert.NotNull(capturedRequest); + Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}/result/chunks/{chunkIndex}", + capturedRequest.RequestUri?.ToString()); + } + #endregion #region CreateSessionAsync Tests diff --git a/csharp/test/Unit/StatementExecution/StatementExecutionConnectionPreviewEndpointTests.cs b/csharp/test/Unit/StatementExecution/StatementExecutionConnectionPreviewEndpointTests.cs new file mode 100644 index 00000000..560d1aad --- /dev/null +++ b/csharp/test/Unit/StatementExecution/StatementExecutionConnectionPreviewEndpointTests.cs @@ -0,0 +1,111 @@ +/* +* Copyright (c) 2025 ADBC Drivers Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Http; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using AdbcDrivers.Databricks.StatementExecution; +using AdbcDrivers.HiveServer2.Spark; +using Moq; +using Moq.Protected; +using Xunit; + +namespace AdbcDrivers.Databricks.Tests.Unit.StatementExecution +{ + /// + /// Tests that the UsePreviewEndpoint property routes requests to + /// /2.0/preview/sql/... instead of /api/2.0/sql/... paths. + /// + public class StatementExecutionConnectionPreviewEndpointTests + { + private static Dictionary BaseProperties() => new() + { + { SparkParameters.HostName, "test.databricks.com" }, + { DatabricksParameters.WarehouseId, "test-warehouse" }, + { SparkParameters.AccessToken, "test-token" }, + { "adbc.connection.catalog", "main" } + }; + + private static (HttpClient, List) CreateCapturingHttpClient() + { + var capturedUris = new List(); + var mockHandler = new Mock(); + var sessionResponse = JsonSerializer.Serialize(new { session_id = "session-123" }); + + mockHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Returns((req, _) => + { + capturedUris.Add(req.RequestUri); + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent(sessionResponse) + }); + }); + + return (new HttpClient(mockHandler.Object), capturedUris); + } + + [Fact] + public async Task OpenAsync_WithPreviewEndpointTrue_UsesPreviewSessionPath() + { + var (httpClient, uris) = CreateCapturingHttpClient(); + var properties = BaseProperties(); + properties[DatabricksParameters.UsePreviewEndpoint] = "true"; + + using var conn = new StatementExecutionConnection(properties, httpClient); + await conn.OpenAsync(CancellationToken.None); + + Assert.Single(uris); + Assert.Contains("/2.0/preview/sql/sessions", uris[0]?.ToString()); + } + + [Fact] + public async Task OpenAsync_WithPreviewEndpointFalse_UsesStandardSessionPath() + { + var (httpClient, uris) = CreateCapturingHttpClient(); + var properties = BaseProperties(); + properties[DatabricksParameters.UsePreviewEndpoint] = "false"; + + using var conn = new StatementExecutionConnection(properties, httpClient); + await conn.OpenAsync(CancellationToken.None); + + Assert.Single(uris); + Assert.Contains("/api/2.0/sql/sessions", uris[0]?.ToString()); + } + + [Fact] + public async Task OpenAsync_WithoutPreviewEndpointProperty_UsesStandardSessionPath() + { + var (httpClient, uris) = CreateCapturingHttpClient(); + var properties = BaseProperties(); + // UsePreviewEndpoint not set — should default to standard path + + using var conn = new StatementExecutionConnection(properties, httpClient); + await conn.OpenAsync(CancellationToken.None); + + Assert.Single(uris); + Assert.Contains("/api/2.0/sql/sessions", uris[0]?.ToString()); + } + } +}