Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions csharp/src/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ public class DatabricksParameters : SparkParameters
/// </summary>
public const string EnableSessionManagement = "adbc.databricks.rest.enable_session_management";

/// <summary>
/// Whether to use preview API endpoints for Statement Execution API.
/// When true, uses "/2.0/sql/..." paths (without /api prefix).
/// When false (default), uses "/api/2.0/sql/..." paths.
/// Default: false
/// Only applicable when Protocol is "rest".
/// </summary>
public const string UsePreviewEndpoint = "adbc.databricks.rest.use_preview_endpoint";

/// <summary>
/// Whether to enable the feature flag cache for fetching remote configuration from the server.
/// When enabled, the driver fetches feature flags from the Databricks server and merges them with local properties.
Expand Down
24 changes: 16 additions & 8 deletions csharp/src/StatementExecution/StatementExecutionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ internal class StatementExecutionClient : IStatementExecutionClient

private const string SessionsEndpoint = "/api/2.0/sql/sessions";
private const string StatementsEndpoint = "/api/2.0/sql/statements";
private const string PreviewSessionsEndpoint = "/2.0/preview/sql/sessions";
private const string PreviewStatementsEndpoint = "/2.0/preview/sql/statements";

private readonly string _sessionsEndpoint;
private readonly string _statementsEndpoint;

// JSON serialization options - ignore null values when writing
private static readonly JsonSerializerOptions s_jsonOptions = new JsonSerializerOptions
Expand All @@ -110,7 +115,8 @@ internal class StatementExecutionClient : IStatementExecutionClient
/// </summary>
/// <param name="httpClient">The HTTP client to use for requests.</param>
/// <param name="host">The Databricks workspace host.</param>
public StatementExecutionClient(HttpClient httpClient, string host)
/// <param name="usePreviewEndpoint">When true, uses /2.0/sql/... paths instead of /api/2.0/sql/... paths.</param>
public StatementExecutionClient(HttpClient httpClient, string host, bool usePreviewEndpoint = false)
{
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));

Expand All @@ -127,6 +133,8 @@ public StatementExecutionClient(HttpClient httpClient, string host)
}

_baseUrl = host;
_sessionsEndpoint = usePreviewEndpoint ? PreviewSessionsEndpoint : SessionsEndpoint;
_statementsEndpoint = usePreviewEndpoint ? PreviewStatementsEndpoint : StatementsEndpoint;
}

/// <summary>
Expand All @@ -144,7 +152,7 @@ public async Task<CreateSessionResponse> CreateSessionAsync(
throw new ArgumentNullException(nameof(request));
}

var url = $"{_baseUrl}{SessionsEndpoint}";
var url = $"{_baseUrl}{_sessionsEndpoint}";
var jsonContent = JsonSerializer.Serialize(request, s_jsonOptions);
var content = new StringContent(jsonContent, Encoding.UTF8, "application/json");

Expand Down Expand Up @@ -188,7 +196,7 @@ public async Task DeleteSessionAsync(string sessionId, string warehouseId, Cance
}

// Databricks requires warehouse_id as query parameter even for DELETE
var url = $"{_baseUrl}{SessionsEndpoint}/{sessionId}?warehouse_id={Uri.EscapeDataString(warehouseId)}";
var url = $"{_baseUrl}{_sessionsEndpoint}/{sessionId}?warehouse_id={Uri.EscapeDataString(warehouseId)}";
var httpRequest = new HttpRequestMessage(HttpMethod.Delete, url);

var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false);
Expand All @@ -211,7 +219,7 @@ public async Task<ExecuteStatementResponse> ExecuteStatementAsync(
throw new ArgumentNullException(nameof(request));
}

var url = $"{_baseUrl}{StatementsEndpoint}";
var url = $"{_baseUrl}{_statementsEndpoint}";
var jsonContent = JsonSerializer.Serialize(request, s_jsonOptions);
var content = new StringContent(jsonContent, Encoding.UTF8, "application/json");

Expand Down Expand Up @@ -266,7 +274,7 @@ public async Task<GetStatementResponse> GetStatementAsync(
throw new ArgumentException("Statement ID cannot be null or whitespace.", nameof(statementId));
}

var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}";
var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}";
var httpRequest = new HttpRequestMessage(HttpMethod.Get, url);

var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -306,7 +314,7 @@ public async Task<ResultData> GetResultChunkAsync(
throw new ArgumentException("Chunk index must be non-negative.", nameof(chunkIndex));
}

var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}/result/chunks/{chunkIndex}";
var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}/result/chunks/{chunkIndex}";
var httpRequest = new HttpRequestMessage(HttpMethod.Get, url);

var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -337,7 +345,7 @@ public async Task CancelStatementAsync(string statementId, CancellationToken can
throw new ArgumentException("Statement ID cannot be null or whitespace.", nameof(statementId));
}

var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}/cancel";
var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}/cancel";
var httpRequest = new HttpRequestMessage(HttpMethod.Post, url);

var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false);
Expand All @@ -360,7 +368,7 @@ public async Task CloseStatementAsync(string statementId, CancellationToken canc
}

// Databricks uses DELETE on /statements/{statement_id}, not POST to /close
var url = $"{_baseUrl}{StatementsEndpoint}/{statementId}";
var url = $"{_baseUrl}{_statementsEndpoint}/{statementId}";
var httpRequest = new HttpRequestMessage(HttpMethod.Delete, url);

var response = await _httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ private StatementExecutionConnection(
_cloudFetchHttpClient = HttpClientFactory.CreateCloudFetchHttpClient(properties);

// Create REST API client
_client = new StatementExecutionClient(_httpClient, baseUrl);
bool usePreviewEndpoint = PropertyHelper.GetBooleanPropertyWithValidation(properties, DatabricksParameters.UsePreviewEndpoint, false);
_client = new StatementExecutionClient(_httpClient, baseUrl, usePreviewEndpoint);
}

/// <summary>
Expand Down
188 changes: 188 additions & 0 deletions csharp/test/Unit/StatementExecution/StatementExecutionClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,194 @@ public void Constructor_WithNullHost_ThrowsArgumentException()
new StatementExecutionClient(_httpClient, null!));
}

[Fact]
public void Constructor_WithPreviewEndpointTrue_CreatesClient()
{
var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
Assert.NotNull(client);
}

[Fact]
public void Constructor_WithPreviewEndpointFalse_CreatesClient()
{
var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: false);
Assert.NotNull(client);
}

#endregion

#region Preview Endpoint Tests

[Fact]
public async Task CreateSessionAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var request = new CreateSessionRequest { WarehouseId = "warehouse-123" };
var responseJson = JsonSerializer.Serialize(new { session_id = "test-session" });
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson,
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.CreateSessionAsync(request, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal("https://test.databricks.com/2.0/preview/sql/sessions",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task CreateSessionAsync_WithoutPreviewEndpoint_UsesStandardPath()
{
var request = new CreateSessionRequest { WarehouseId = "warehouse-123" };
var responseJson = JsonSerializer.Serialize(new { session_id = "test-session" });
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson,
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: false);
await client.CreateSessionAsync(request, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal("https://test.databricks.com/api/2.0/sql/sessions",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task DeleteSessionAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var sessionId = "test-session-id";
var warehouseId = "test-warehouse-id";
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, "",
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.DeleteSessionAsync(sessionId, warehouseId, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Contains("/2.0/preview/sql/sessions/", capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task ExecuteStatementAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var request = new ExecuteStatementRequest
{
Statement = "SELECT 1",
WarehouseId = "warehouse-123"
};
var responseJson = JsonSerializer.Serialize(new
{
statement_id = "stmt-123",
status = new { state = "SUCCEEDED" },
manifest = new { schema = new { columns = new object[0] } },
result = new { data_array = new object[0] }
});
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson,
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.ExecuteStatementAsync(request, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal("https://test.databricks.com/2.0/preview/sql/statements",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task GetStatementAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var statementId = "stmt-123";
var responseJson = JsonSerializer.Serialize(new
{
statement_id = statementId,
status = new { state = "SUCCEEDED" },
manifest = new { schema = new { columns = new object[0] } },
result = new { data_array = new object[0] }
});
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson,
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.GetStatementAsync(statementId, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task CancelStatementAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var statementId = "stmt-123";
SetupMockResponse(HttpStatusCode.OK, "");
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, "",
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.CancelStatementAsync(statementId, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}/cancel",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task CloseStatementAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var statementId = "stmt-123";
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, "",
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.CloseStatementAsync(statementId, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}",
capturedRequest.RequestUri?.ToString());
}

[Fact]
public async Task GetResultChunkAsync_WithPreviewEndpoint_UsesPreviewPath()
{
var statementId = "stmt-123";
var chunkIndex = 2;
var responseJson = JsonSerializer.Serialize(new
{
data_array = new object[0]
});
HttpRequestMessage? capturedRequest = null;

SetupMockResponseWithCapture(HttpStatusCode.OK, responseJson,
req => capturedRequest = req,
_ => { });

var client = new StatementExecutionClient(_httpClient, _testHost, usePreviewEndpoint: true);
await client.GetResultChunkAsync(statementId, chunkIndex, CancellationToken.None);

Assert.NotNull(capturedRequest);
Assert.Equal($"https://test.databricks.com/2.0/preview/sql/statements/{statementId}/result/chunks/{chunkIndex}",
capturedRequest.RequestUri?.ToString());
}

#endregion

#region CreateSessionAsync Tests
Expand Down
Loading
Loading