Skip to content

Commit 1a68271

Browse files
feat: Implement streaming prefetch for Thrift inline results (databricks#1184)
## Summary Implements proactive prefetching with a sliding window for both Thrift columnar and inline Arrow results, eliminating blocking at batch boundaries and improving throughput. ## Key Components ### New Streaming Infrastructure - **`ThriftStreamingProvider<T>`**: Generic type-safe streaming provider with background prefetch thread and configurable sliding window - **`StreamingBatch<T>`**: Type-safe batch container with lifecycle management and error handling - **`ThriftResponseProcessor<T>`**: Interface for pluggable response processors - `ColumnarResponseProcessor`: Processes Thrift columnar results - `InlineArrowResponseProcessor`: Processes inline Arrow results with schema caching ### Result Implementations - **`StreamingInlineArrowResult`**: High-throughput streaming implementation for inline Arrow results with background prefetching - **`StreamingColumnarResult`**: Streaming implementation for Thrift columnar results with prefetch ### Supporting Classes - **`ThriftBatchFetcher`** / **`ThriftBatchFetcherImpl`**: Abstraction for fetching batches from the Thrift server <img width="1792" height="1234" alt="streaming inline" src="https://github.com/user-attachments/assets/66ea9b83-a16b-42d5-9280-cb1fb81dadeb" /> ## Configuration | Parameter | Description | Default | |-----------|-------------|---------| | `EnableInlineStreaming` | Toggle streaming mode for inline results | `1` (enabled) | | `ThriftMaxBatchesInMemory` | Sliding window size (max batches kept in memory) | `3` | ## Key Features 1. **Background Prefetching**: Dedicated thread fetches batches ahead of consumption 2. **Sliding Window**: Configurable memory limit prevents unbounded memory growth 3. **Type Safety**: Generic `ThriftStreamingProvider<T>` eliminates unsafe casting 4. **Graceful Error Handling**: - Try-catch around resource cleanup to prevent cascading failures - Timeout on batch creation wait to prevent indefinite blocking 5. **Comprehensive Logging**: Debug/error logging for troubleshooting ## Testing - Updated `ExecutionResultFactoryTest` for new factory logic - Updated `DatabricksThriftServiceClientTest` for CloudFetch control - Existing integration tests cover streaming behavior ## Usage Streaming is enabled by default. To disable and use lazy loading instead: ``` jdbc:databricks://host:port/default;EnableInlineStreaming=0;... ``` To adjust the sliding window size: ``` jdbc:databricks://host:port/default;ThriftMaxBatchesInMemory=5;... ``` --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 809c0b3 commit 1a68271

29 files changed

+5151
-164
lines changed

NEXT_CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
## [Unreleased]
44

55
### Added
6+
- Added streaming prefetch mode for Thrift inline results (columnar and Arrow) with background batch prefetching and configurable sliding window for improved throughput.
7+
- Added `EnableInlineStreaming` connection parameter to enable/disable streaming mode (default: enabled).
8+
- Added `ThriftMaxBatchesInMemory` connection parameter to control the sliding window size for streaming (default: 3).
69
- Added support for disabling CloudFetch via `EnableQueryResultDownload=0` to use inline Arrow results instead.
710

811
### Updated

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,11 +1198,27 @@ public boolean isStreamingChunkProviderEnabled() {
11981198
return getParameter(DatabricksJdbcUrlParams.ENABLE_STREAMING_CHUNK_PROVIDER).equals("1");
11991199
}
12001200

1201+
@Override
1202+
public boolean isInlineStreamingEnabled() {
1203+
return getParameter(DatabricksJdbcUrlParams.ENABLE_INLINE_STREAMING).equals("1");
1204+
}
1205+
12011206
@Override
12021207
public boolean isCloudFetchEnabled() {
12031208
return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1");
12041209
}
12051210

1211+
@Override
1212+
public int getThriftMaxBatchesInMemory() {
1213+
try {
1214+
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.THRIFT_MAX_BATCHES_IN_MEMORY));
1215+
} catch (NumberFormatException e) {
1216+
LOGGER.warn("Invalid value for ThriftMaxBatchesInMemory, using default value");
1217+
return Integer.parseInt(
1218+
DatabricksJdbcUrlParams.THRIFT_MAX_BATCHES_IN_MEMORY.getDefaultValue());
1219+
}
1220+
}
1221+
12061222
@Override
12071223
public int getLinkPrefetchWindow() {
12081224
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.LINK_PREFETCH_WINDOW));

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import com.databricks.jdbc.api.IExecutionStatus;
99
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
1010
import com.databricks.jdbc.api.impl.arrow.ChunkProvider;
11+
import com.databricks.jdbc.api.impl.arrow.LazyThriftInlineArrowResult;
12+
import com.databricks.jdbc.api.impl.arrow.StreamingInlineArrowResult;
1113
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
1214
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
15+
import com.databricks.jdbc.api.impl.thrift.StreamingColumnarResult;
1316
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
1417
import com.databricks.jdbc.api.internal.IDatabricksResultSetInternal;
1518
import com.databricks.jdbc.api.internal.IDatabricksSession;
@@ -592,12 +595,11 @@ public boolean isBeforeFirst() throws SQLException {
592595
/**
593596
* {@inheritDoc}
594597
*
595-
* <p><b>Limitation:</b> For lazy-loaded result sets ({@link LazyThriftResult}), particularly
596-
* those using {@link
597-
* com.databricks.jdbc.model.client.thrift.generated.TSparkRowSetType#COLUMN_BASED_SET}, this
598-
* method cannot reliably determine the cursor position. The total row count remains unknown until
599-
* all rows are fetched, preventing accurate detection of whether the cursor is after the last
600-
* row. This is specific to Databricks JDBC dialect.
598+
* <p><b>Limitation:</b> For lazy/streaming result sets ({@link LazyThriftResult}, {@link
599+
* StreamingColumnarResult}, {@link LazyThriftInlineArrowResult}, {@link
600+
* StreamingInlineArrowResult}), this method cannot reliably determine the cursor position. The
601+
* total row count remains unknown until all rows are fetched, preventing accurate detection of
602+
* whether the cursor is after the last row. This is specific to Databricks JDBC dialect.
601603
*/
602604
@Override
603605
public boolean isAfterLast() throws SQLException {
@@ -617,8 +619,10 @@ public boolean isFirst() throws SQLException {
617619
* <p>This method uses different strategies based on the result set type:
618620
*
619621
* <ul>
620-
* <li>For {@link LazyThriftResult} instances: Checks if there are no more rows available (using
621-
* {@code hasNext()}), since the total row count is unknown until all rows are fetched.
622+
* <li>For lazy/streaming result types ({@link LazyThriftResult}, {@link
623+
* StreamingColumnarResult}, {@link LazyThriftInlineArrowResult}, {@link
624+
* StreamingInlineArrowResult}): Checks if there are no more rows available (using {@code
625+
* hasNext()}), since the total row count is unknown until all rows are fetched.
622626
* <li>For other result types: Compares the current row position against the known total row
623627
* count.
624628
* </ul>
@@ -629,7 +633,10 @@ public boolean isFirst() throws SQLException {
629633
@Override
630634
public boolean isLast() throws SQLException {
631635
checkIfClosed();
632-
if (executionResult instanceof LazyThriftResult) {
636+
if (executionResult instanceof LazyThriftResult
637+
|| executionResult instanceof StreamingColumnarResult
638+
|| executionResult instanceof LazyThriftInlineArrowResult
639+
|| executionResult instanceof StreamingInlineArrowResult) {
633640
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
634641
}
635642
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
44
import com.databricks.jdbc.api.impl.arrow.LazyThriftInlineArrowResult;
5+
import com.databricks.jdbc.api.impl.arrow.StreamingInlineArrowResult;
6+
import com.databricks.jdbc.api.impl.thrift.StreamingColumnarResult;
57
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
8+
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
69
import com.databricks.jdbc.api.internal.IDatabricksSession;
710
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
811
import com.databricks.jdbc.common.util.DatabricksThriftUtil;
@@ -96,9 +99,9 @@ private static IExecutionResult getResultHandler(
9699
LOGGER.info("Processing result of format {} from Thrift server", resultFormat);
97100
switch (resultFormat) {
98101
case COLUMN_BASED_SET:
99-
return new LazyThriftResult(resultsResp, parentStatement, session);
102+
return createThriftColumnarResult(resultsResp, parentStatement, session);
100103
case ARROW_BASED_SET:
101-
return new LazyThriftInlineArrowResult(resultsResp, parentStatement, session);
104+
return createInlineArrowResult(resultsResp, parentStatement, session);
102105
case URL_BASED_SET:
103106
return new ArrowStreamResult(resultsResp, parentStatement, session);
104107
case ROW_BASED_SET:
@@ -110,6 +113,50 @@ private static IExecutionResult getResultHandler(
110113
}
111114
}
112115

116+
/**
117+
* Creates the appropriate result handler for Thrift columnar results. Uses
118+
* StreamingColumnarResult by default for improved throughput, otherwise falls back to
119+
* LazyThriftResult if disabled.
120+
*/
121+
private static IExecutionResult createThriftColumnarResult(
122+
TFetchResultsResp resultsResp,
123+
IDatabricksStatementInternal parentStatement,
124+
IDatabricksSession session)
125+
throws DatabricksSQLException {
126+
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
127+
128+
// Streaming is enabled by default (ENABLE_INLINE_STREAMING defaults to "1")
129+
if (connectionContext.isInlineStreamingEnabled()) {
130+
LOGGER.info("Using StreamingColumnarResult for improved throughput (default)");
131+
return new StreamingColumnarResult(resultsResp, parentStatement, session);
132+
} else {
133+
LOGGER.info("Using LazyThriftResult (streaming explicitly disabled)");
134+
return new LazyThriftResult(resultsResp, parentStatement, session);
135+
}
136+
}
137+
138+
/**
139+
* Creates the appropriate result handler for inline Arrow results. Uses
140+
* StreamingInlineArrowResult by default for improved throughput, otherwise falls back to
141+
* LazyThriftInlineArrowResult.
142+
*/
143+
private static IExecutionResult createInlineArrowResult(
144+
TFetchResultsResp resultsResp,
145+
IDatabricksStatementInternal parentStatement,
146+
IDatabricksSession session)
147+
throws DatabricksSQLException {
148+
IDatabricksConnectionContext connectionContext = session.getConnectionContext();
149+
150+
// Streaming is enabled by default (ENABLE_INLINE_STREAMING defaults to "1")
151+
if (connectionContext.isInlineStreamingEnabled()) {
152+
LOGGER.info("Using StreamingInlineArrowResult for improved throughput (default)");
153+
return new StreamingInlineArrowResult(resultsResp, parentStatement, session);
154+
} else {
155+
LOGGER.info("Using LazyThriftInlineArrowResult (streaming explicitly disabled)");
156+
return new LazyThriftInlineArrowResult(resultsResp, parentStatement, session);
157+
}
158+
}
159+
113160
static IExecutionResult getResultSet(Object[][] rows) {
114161
return new InlineJsonResult(rows);
115162
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3+
import static com.databricks.jdbc.common.util.ArrowUtil.getColumnInfoList;
34
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createExternalLink;
4-
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.getColumnInfoFromTColumnDesc;
55

66
import com.databricks.jdbc.api.impl.ComplexDataTypeParser;
77
import com.databricks.jdbc.api.impl.IExecutionResult;
88
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
99
import com.databricks.jdbc.api.internal.IDatabricksSession;
1010
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1111
import com.databricks.jdbc.common.CompressionCodec;
12-
import com.databricks.jdbc.common.util.DatabricksThriftUtil;
1312
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
1413
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1514
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
1615
import com.databricks.jdbc.exception.DatabricksSQLException;
1716
import com.databricks.jdbc.log.JdbcLogger;
1817
import com.databricks.jdbc.log.JdbcLoggerFactory;
19-
import com.databricks.jdbc.model.client.thrift.generated.TColumnDesc;
2018
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
21-
import com.databricks.jdbc.model.client.thrift.generated.TGetResultSetMetadataResp;
2219
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
2320
import com.databricks.jdbc.model.core.ChunkLinkFetchResult;
2421
import com.databricks.jdbc.model.core.ColumnInfo;
@@ -163,7 +160,7 @@ public ArrowStreamResult(
163160
IDatabricksHttpClient httpClient)
164161
throws DatabricksSQLException {
165162
this.session = session;
166-
setColumnInfo(resultsResp.getResultSetMetadata());
163+
this.columnInfos = getColumnInfoList(resultsResp.getResultSetMetadata());
167164
this.chunkProvider =
168165
createThriftRemoteChunkProvider(resultsResp, parentStatement, session, httpClient);
169166
}
@@ -337,22 +334,6 @@ public ChunkProvider getChunkProvider() {
337334
return chunkProvider;
338335
}
339336

340-
private void setColumnInfo(TGetResultSetMetadataResp resultManifest)
341-
throws DatabricksSQLException {
342-
columnInfos = new ArrayList<>();
343-
List<String> arrowMetadataList = DatabricksThriftUtil.getArrowMetadata(resultManifest);
344-
if (resultManifest.getSchema() == null) {
345-
return;
346-
}
347-
List<TColumnDesc> columns = resultManifest.getSchema().getColumns();
348-
for (int columnIndex = 0; columnIndex < columns.size(); columnIndex++) {
349-
TColumnDesc tColumnDesc = columns.get(columnIndex);
350-
String columnArrowMetadata =
351-
arrowMetadataList != null ? arrowMetadataList.get(columnIndex) : null;
352-
columnInfos.add(getColumnInfoFromTColumnDesc(tColumnDesc, columnArrowMetadata));
353-
}
354-
}
355-
356337
/**
357338
* Helper method to handle complex type and geospatial type conversion when support is disabled.
358339
*
@@ -397,7 +378,6 @@ protected static Object getObjectWithComplexTypeHandling(
397378

398379
if (!isComplexDatatypeSupportEnabled && isComplexType(requiredType)) {
399380
LOGGER.debug("Complex datatype support is disabled, converting complex type to STRING");
400-
401381
Object result =
402382
chunkIterator.getColumnObjectAtCurrentRow(
403383
columnIndex, ColumnInfoTypeName.STRING, "STRING", columnInfo);

0 commit comments

Comments
 (0)