Skip to content

Commit 9c9ffe2

Browse files
feat: Add support for disabling CloudFetch via EnableQueryResultDownload parameter (databricks#1183)
## Summary Add support for disabling CloudFetch via `EnableQueryResultDownload=0` connection parameter to use inline Arrow results instead. ## Changes - Add `isCloudFetchEnabled()` method to `IDatabricksConnectionContext` interface - Implement the method in `DatabricksConnectionContext` using existing `EnableQueryResultDownload` parameter - Update `DatabricksThriftServiceClient` to respect this setting when making execute requests - Add unit tests for the new functionality ## Usage To disable CloudFetch and use inline Arrow results: ``` jdbc:databricks://host:port/default;EnableQueryResultDownload=0;... ``` --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 87b5527 commit 9c9ffe2

File tree

6 files changed

+111
-2
lines changed

6 files changed

+111
-2
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## [Unreleased]
44

55
### Added
6+
- Added support for disabling CloudFetch via `EnableQueryResultDownload=0` to use inline Arrow results instead.
67

78
### Updated
89
- Implemented lazy loading for inline Arrow results, fetching arrow batches on demand instead of all at once. This improves memory usage and initial response time for large result sets when using the Thrift protocol with Arrow format.

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ public DatabricksClientType getClientTypeFromContext() {
469469
return DatabricksClientType.THRIFT;
470470
}
471471
// Check if CloudFetch is disabled - Thrift is required for inline mode
472-
if (!Objects.equals(getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH), "1")) {
472+
if (!isCloudFetchEnabled()) {
473473
return DatabricksClientType.THRIFT;
474474
}
475475
// Check feature flag to determine if SEA client should be enabled
@@ -1198,6 +1198,11 @@ public boolean isStreamingChunkProviderEnabled() {
11981198
return getParameter(DatabricksJdbcUrlParams.ENABLE_STREAMING_CHUNK_PROVIDER).equals("1");
11991199
}
12001200

1201+
@Override
1202+
public boolean isCloudFetchEnabled() {
1203+
return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1");
1204+
}
1205+
12011206
@Override
12021207
public int getLinkPrefetchWindow() {
12031208
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.LINK_PREFETCH_WINDOW));

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,16 @@ public interface IDatabricksConnectionContext {
426426
/** Returns whether streaming chunk provider is enabled for result fetching. */
427427
boolean isStreamingChunkProviderEnabled();
428428

429+
/**
430+
* Returns whether CloudFetch (URL-based result download) is enabled.
431+
*
432+
* <p>When enabled (default), the server may return URL_BASED_SET results that are downloaded from
433+
* cloud storage. When disabled, the server returns ARROW_BASED_SET with inline Arrow data.
434+
*
435+
* @return true if CloudFetch is enabled, false otherwise
436+
*/
437+
boolean isCloudFetchEnabled();
438+
429439
/**
430440
* Returns the number of chunk links to prefetch ahead of consumption.
431441
*

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ private TExecuteStatementReq getRequest(
232232
request.setCanDecompressLZ4Result(true);
233233
}
234234
if (ProtocolFeatureUtil.supportsCloudFetch(serverProtocolVersion)) {
235-
request.setCanDownloadResult(true);
235+
// Use EnableQueryResultDownload param to control CloudFetch vs inline Arrow
236+
request.setCanDownloadResult(this.connectionContext.isCloudFetchEnabled());
236237
}
237238
if (ProtocolFeatureUtil.supportsAdvancedArrowTypes(serverProtocolVersion)) {
238239
arrowNativeTypes

src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,4 +1210,47 @@ public void testEnableTokenFederation() throws DatabricksSQLException {
12101210
DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, props);
12111211
assertFalse(ctx.isTokenFederationEnabled());
12121212
}
1213+
1214+
@Test
1215+
public void testIsCloudFetchEnabled() throws DatabricksSQLException {
1216+
// Test default value (should be enabled by default)
1217+
DatabricksConnectionContext ctx =
1218+
(DatabricksConnectionContext)
1219+
DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, properties);
1220+
assertTrue(ctx.isCloudFetchEnabled()); // Default should be true
1221+
1222+
// Test via URL parameter - enabled
1223+
String urlWithCloudFetchEnabled =
1224+
"jdbc:databricks://sample-host.18.azuredatabricks.net:9999/default;httpPath=/sql/1.0/warehouses/999999999;EnableQueryResultDownload=1";
1225+
ctx =
1226+
(DatabricksConnectionContext)
1227+
DatabricksConnectionContext.parse(urlWithCloudFetchEnabled, properties);
1228+
assertTrue(ctx.isCloudFetchEnabled());
1229+
1230+
// Test via URL parameter - disabled
1231+
String urlWithCloudFetchDisabled =
1232+
"jdbc:databricks://sample-host.18.azuredatabricks.net:9999/default;httpPath=/sql/1.0/warehouses/999999999;EnableQueryResultDownload=0";
1233+
ctx =
1234+
(DatabricksConnectionContext)
1235+
DatabricksConnectionContext.parse(urlWithCloudFetchDisabled, properties);
1236+
assertFalse(ctx.isCloudFetchEnabled());
1237+
1238+
// Test via Properties - enabled
1239+
Properties props = new Properties();
1240+
props.setProperty("password", "passwd");
1241+
props.setProperty("EnableQueryResultDownload", "1");
1242+
ctx =
1243+
(DatabricksConnectionContext)
1244+
DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, props);
1245+
assertTrue(ctx.isCloudFetchEnabled());
1246+
1247+
// Test via Properties - disabled
1248+
props = new Properties();
1249+
props.setProperty("password", "passwd");
1250+
props.setProperty("EnableQueryResultDownload", "0");
1251+
ctx =
1252+
(DatabricksConnectionContext)
1253+
DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, props);
1254+
assertFalse(ctx.isCloudFetchEnabled());
1255+
}
12131256
}

src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import static org.mockito.ArgumentMatchers.any;
1313
import static org.mockito.ArgumentMatchers.anyLong;
1414
import static org.mockito.ArgumentMatchers.eq;
15+
import static org.mockito.Mockito.lenient;
1516
import static org.mockito.Mockito.verify;
1617
import static org.mockito.Mockito.when;
1718

@@ -164,6 +165,9 @@ private static Stream<Arguments> protocolVersionProvider() {
164165
void testGetRequestWithDifferentProtocolVersions(TProtocolVersion protocolVersion)
165166
throws SQLException {
166167
when(connectionContext.shouldEnableArrow()).thenReturn(true);
168+
// Use lenient() because isCloudFetchEnabled() is only called for protocols that support
169+
// CloudFetch
170+
lenient().when(connectionContext.isCloudFetchEnabled()).thenReturn(true);
167171
DatabricksThriftServiceClient client =
168172
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
169173
when(session.getSessionInfo()).thenReturn(SESSION_INFO);
@@ -243,6 +247,7 @@ void testGetRequestWithDifferentProtocolVersions(TProtocolVersion protocolVersio
243247
@Test
244248
void testExecute() throws SQLException {
245249
when(connectionContext.shouldEnableArrow()).thenReturn(true);
250+
when(connectionContext.isCloudFetchEnabled()).thenReturn(true);
246251
DatabricksThriftServiceClient client =
247252
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
248253
when(session.getSessionInfo()).thenReturn(SESSION_INFO);
@@ -281,9 +286,52 @@ void testExecute() throws SQLException {
281286
assertEquals(resultSet, actualResultSet);
282287
}
283288

289+
@Test
290+
void testExecuteWithCloudFetchDisabled() throws SQLException {
291+
when(connectionContext.shouldEnableArrow()).thenReturn(true);
292+
when(connectionContext.isCloudFetchEnabled()).thenReturn(false);
293+
DatabricksThriftServiceClient client =
294+
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
295+
when(session.getSessionInfo()).thenReturn(SESSION_INFO);
296+
when(parentStatement.getStatement()).thenReturn(statement);
297+
when(parentStatement.getMaxRows()).thenReturn(10);
298+
when(statement.getQueryTimeout()).thenReturn(10);
299+
TSparkArrowTypes arrowNativeTypes =
300+
new TSparkArrowTypes()
301+
.setComplexTypesAsArrow(true)
302+
.setIntervalTypesAsArrow(true)
303+
.setNullTypeAsArrow(true)
304+
.setDecimalAsArrow(true)
305+
.setTimestampAsArrow(true);
306+
TExecuteStatementReq executeStatementReq =
307+
new TExecuteStatementReq()
308+
.setStatement(TEST_STRING)
309+
.setSessionHandle(SESSION_HANDLE)
310+
.setCanReadArrowResult(true)
311+
.setQueryTimeout(10)
312+
.setResultRowLimit(10)
313+
.setCanDecompressLZ4Result(true)
314+
.setCanDownloadResult(false)
315+
.setParameters(Collections.emptyList())
316+
.setRunAsync(true)
317+
.setUseArrowNativeTypes(arrowNativeTypes);
318+
when(thriftAccessor.execute(executeStatementReq, parentStatement, session, StatementType.SQL))
319+
.thenReturn(resultSet);
320+
DatabricksResultSet actualResultSet =
321+
client.executeStatement(
322+
TEST_STRING,
323+
CLUSTER_COMPUTE,
324+
Collections.emptyMap(),
325+
StatementType.SQL,
326+
session,
327+
parentStatement);
328+
assertEquals(resultSet, actualResultSet);
329+
}
330+
284331
@Test
285332
void testExecuteAsync() throws SQLException {
286333
when(connectionContext.shouldEnableArrow()).thenReturn(true);
334+
when(connectionContext.isCloudFetchEnabled()).thenReturn(true);
287335
DatabricksThriftServiceClient client =
288336
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
289337
when(session.getSessionInfo()).thenReturn(SESSION_INFO);
@@ -630,6 +678,7 @@ void testListFunctionsWithSQLEnabled() throws SQLException {
630678
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
631679
when(connectionContext.enableShowCommandsForGetFunctions()).thenReturn(true);
632680
when(connectionContext.shouldEnableArrow()).thenReturn(true);
681+
when(connectionContext.isCloudFetchEnabled()).thenReturn(true);
633682
when(session.getSessionInfo()).thenReturn(SESSION_INFO);
634683
TSparkArrowTypes arrowNativeTypes =
635684
new TSparkArrowTypes()

0 commit comments

Comments
 (0)