Skip to content

Commit aafefb0

Browse files
Add ChunkProvider interface and unify chunk management (#554)
- Added a new ChunkProvider interface to abstract chunk retrieval methods: - boolean hasNextChunk(); - boolean next(); - ArrowResultChunk getChunk() throws DatabricksSQLException; - void close(); - Implemented ChunkProvider in both ChunkDownloader and ChunkExtractor. - Updated ArrowStreamResult to use ChunkProvider for chunk management.
1 parent 4a44967 commit aafefb0

File tree

8 files changed

+136
-89
lines changed

8 files changed

+136
-89
lines changed

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323

2424
public class ArrowStreamResult implements IExecutionResult {
2525

26-
private ChunkDownloader chunkDownloader;
27-
private ChunkExtractor chunkExtractor;
26+
private final ChunkProvider chunkProvider;
2827
private long currentRowIndex = -1;
29-
private boolean isInlineArrow;
3028
private boolean isClosed;
3129
private ArrowResultChunk.ArrowResultChunkIterator chunkIterator;
3230
private List<ColumnInfo> columnInfos;
@@ -53,8 +51,8 @@ public ArrowStreamResult(
5351
IDatabricksSession session,
5452
IDatabricksHttpClient httpClient)
5553
throws DatabricksParsingException {
56-
this.chunkDownloader =
57-
new ChunkDownloader(
54+
this.chunkProvider =
55+
new RemoteChunkProvider(
5856
statementId,
5957
resultManifest,
6058
resultData,
@@ -93,22 +91,19 @@ public ArrowStreamResult(
9391
IDatabricksHttpClient httpClient)
9492
throws DatabricksParsingException {
9593
setColumnInfo(resultManifest);
96-
this.isInlineArrow = isInlineArrow;
9794
if (isInlineArrow) {
98-
this.chunkExtractor =
99-
new ChunkExtractor(resultData.getArrowBatches(), resultManifest, statementId);
100-
this.chunkDownloader = null;
95+
this.chunkProvider =
96+
new InlineChunkProvider(resultData.getArrowBatches(), resultManifest, statementId);
10197
} else {
10298
CompressionType compressionType = CompressionType.getCompressionMapping(resultManifest);
103-
this.chunkDownloader =
104-
new ChunkDownloader(
99+
this.chunkProvider =
100+
new RemoteChunkProvider(
105101
statementId,
106102
resultData,
107103
session,
108104
httpClient,
109105
session.getConnectionContext().getCloudFetchThreadPoolSize(),
110106
compressionType);
111-
this.chunkExtractor = null;
112107
}
113108
}
114109

@@ -132,20 +127,13 @@ public boolean next() throws DatabricksSQLException {
132127
if (!hasNext()) {
133128
return false;
134129
}
130+
135131
currentRowIndex++;
136-
if (isInlineArrow) {
137-
if (chunkIterator == null) {
138-
chunkIterator = chunkExtractor.next().getChunkIterator();
139-
}
140-
return chunkIterator.nextRow();
141-
}
142-
// Either this is first chunk or we are crossing chunk boundary
143132
if (chunkIterator == null || !chunkIterator.hasNextRow()) {
144-
chunkDownloader.next();
145-
chunkIterator = chunkDownloader.getChunk().getChunkIterator();
146-
return chunkIterator.nextRow();
133+
chunkProvider.next();
134+
chunkIterator = chunkProvider.getChunk().getChunkIterator();
147135
}
148-
// Traversing within a chunk
136+
149137
return chunkIterator.nextRow();
150138
}
151139

@@ -163,18 +151,14 @@ public boolean hasNext() {
163151

164152
// For inline arrow, check if the chunk extractor has more chunks
165153
// Otherwise, check the chunk downloader
166-
return isInlineArrow ? chunkExtractor.hasNext() : chunkDownloader.hasNextChunk();
154+
return chunkProvider.hasNextChunk();
167155
}
168156

169157
/** {@inheritDoc} */
170158
@Override
171159
public void close() {
172160
isClosed = true;
173-
if (isInlineArrow) {
174-
chunkExtractor.releaseChunk();
175-
} else {
176-
chunkDownloader.releaseAllChunks();
177-
}
161+
chunkProvider.close();
178162
}
179163

180164
private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {

src/main/java/com/databricks/jdbc/api/impl/arrow/SingleChunkDownloader.java renamed to src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
import java.util.concurrent.Callable;
1010

1111
/** Task class to manage download for a single chunk. */
12-
class SingleChunkDownloader implements Callable<Void> {
12+
class ChunkDownloadTask implements Callable<Void> {
1313

14-
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(SingleChunkDownloader.class);
14+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkDownloadTask.class);
1515
public static final int MAX_RETRIES = 5;
1616
private static final long RETRY_DELAY_MS = 1500; // 1.5 seconds
1717
private final ArrowResultChunk chunk;
1818
private final IDatabricksHttpClient httpClient;
1919
private final ChunkDownloadCallback chunkDownloader;
2020

21-
SingleChunkDownloader(
21+
ChunkDownloadTask(
2222
ArrowResultChunk chunk,
2323
IDatabricksHttpClient httpClient,
2424
ChunkDownloadCallback chunkDownloader) {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.databricks.jdbc.api.impl.arrow;
2+
3+
import com.databricks.jdbc.exception.DatabricksSQLException;
4+
5+
/**
6+
* Implementations of this interface manage the retrieval and iteration over {@link
7+
* ArrowResultChunk}s.
8+
*/
9+
public interface ChunkProvider {
10+
11+
/**
12+
* Checks if there are more chunks available to iterate over.
13+
*
14+
* @return {@code true} if there are additional chunks to be retrieved; {@code false} otherwise.
15+
*/
16+
boolean hasNextChunk();
17+
18+
/**
19+
* Advances to the next available chunk. This method should be called before calling {@link
20+
* #getChunk()} to retrieve the data from the next chunk.
21+
*
22+
* @return {@code true} if the next chunk was successfully moved to; {@code false} if there are no
23+
* more chunks.
24+
*/
25+
boolean next();
26+
27+
/**
28+
* Retrieves the current chunk of data after a successful call to {@link #next()}.
29+
*
30+
* @return The current {@link ArrowResultChunk} containing the data.
31+
* @throws DatabricksSQLException if an error occurs while fetching the chunk.
32+
*/
33+
ArrowResultChunk getChunk() throws DatabricksSQLException;
34+
35+
/**
36+
* Closes the chunk provider and releases any resources associated with it. After calling this
37+
* method, the chunk provider should not be used again.
38+
*/
39+
void close();
40+
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkExtractor.java renamed to src/main/java/com/databricks/jdbc/api/impl/arrow/InlineChunkProvider.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
import org.apache.arrow.vector.util.SchemaUtility;
2424

2525
/** Class to manage inline Arrow chunks */
26-
public class ChunkExtractor {
26+
public class InlineChunkProvider implements ChunkProvider {
2727

28-
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkExtractor.class);
28+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(InlineChunkProvider.class);
2929
private long totalRows;
3030
private long currentChunkIndex;
3131

3232
ArrowResultChunk arrowResultChunk; // There is only one packet of data in case of inline arrow
3333

34-
ChunkExtractor(
34+
InlineChunkProvider(
3535
List<TSparkArrowBatch> arrowBatches, TGetResultSetMetadataResp metadata, String statementId)
3636
throws DatabricksParsingException {
3737
this.currentChunkIndex = -1;
@@ -40,18 +40,34 @@ public class ChunkExtractor {
4040
arrowResultChunk = ArrowResultChunk.builder().withInputStream(byteStream, totalRows).build();
4141
}
4242

43-
public boolean hasNext() {
43+
/** {@inheritDoc} */
44+
@Override
45+
public boolean hasNextChunk() {
4446
return this.currentChunkIndex == -1;
4547
}
4648

47-
public ArrowResultChunk next() {
48-
if (this.currentChunkIndex != -1) {
49-
return null;
49+
/** {@inheritDoc} */
50+
@Override
51+
public boolean next() {
52+
if (!hasNextChunk()) {
53+
return false;
5054
}
5155
this.currentChunkIndex++;
56+
return true;
57+
}
58+
59+
/** {@inheritDoc} */
60+
@Override
61+
public ArrowResultChunk getChunk() {
5262
return arrowResultChunk;
5363
}
5464

65+
/** {@inheritDoc} */
66+
@Override
67+
public void close() {
68+
arrowResultChunk.releaseChunk();
69+
}
70+
5571
private ByteArrayInputStream initializeByteStream(
5672
List<TSparkArrowBatch> arrowBatches, TGetResultSetMetadataResp metadata, String statementId)
5773
throws DatabricksParsingException {
@@ -120,8 +136,7 @@ private static Schema hiveSchemaToArrowSchema(TTableSchema hiveSchema)
120136

121137
private static Field getArrowField(TColumnDesc columnDesc) throws SQLException {
122138
TTypeId thriftType = getThriftTypeFromTypeDesc(columnDesc.getTypeDesc());
123-
ArrowType arrowType = null;
124-
arrowType = mapThriftToArrowType(thriftType);
139+
ArrowType arrowType = mapThriftToArrowType(thriftType);
125140
FieldType fieldType = new FieldType(true, arrowType, null);
126141
return new Field(columnDesc.getColumnName(), fieldType, null);
127142
}
@@ -132,8 +147,4 @@ static void handleError(Exception e) throws DatabricksParsingException {
132147
LOGGER.error(errorMessage);
133148
throw new DatabricksParsingException(errorMessage, e);
134149
}
135-
136-
public void releaseChunk() {
137-
this.arrowResultChunk.releaseChunk();
138-
}
139150
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloader.java renamed to src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@
2323
import java.util.concurrent.atomic.AtomicInteger;
2424

2525
/** Class to manage Arrow chunks and fetch them on proactive basis. */
26-
public class ChunkDownloader implements ChunkDownloadCallback {
26+
public class RemoteChunkProvider implements ChunkProvider, ChunkDownloadCallback {
2727

28-
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkDownloader.class);
28+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(RemoteChunkProvider.class);
2929
private static final String CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX =
3030
"databricks-jdbc-chunks-downloader-";
31+
private static int chunksDownloaderThreadPoolSize;
3132
private final IDatabricksSession session;
3233
private final String statementId;
3334
private final long totalChunks;
3435
private final ExecutorService chunkDownloaderExecutorService;
3536
private final IDatabricksHttpClient httpClient;
36-
private static int chunksDownloaderThreadPoolSize;
3737
private Long currentChunkIndex;
3838
private long nextChunkToDownload;
3939
private Long totalChunksInMemory;
@@ -42,15 +42,15 @@ public class ChunkDownloader implements ChunkDownloadCallback {
4242
private final CompressionType compressionType;
4343
private final ConcurrentHashMap<Long, ArrowResultChunk> chunkIndexToChunksMap;
4444

45-
ChunkDownloader(
45+
RemoteChunkProvider(
4646
String statementId,
4747
ResultManifest resultManifest,
4848
ResultData resultData,
4949
IDatabricksSession session,
5050
IDatabricksHttpClient httpClient,
5151
int chunksDownloaderThreadPoolSize)
5252
throws DatabricksParsingException {
53-
ChunkDownloader.chunksDownloaderThreadPoolSize = chunksDownloaderThreadPoolSize;
53+
RemoteChunkProvider.chunksDownloaderThreadPoolSize = chunksDownloaderThreadPoolSize;
5454
this.chunkDownloaderExecutorService = createChunksDownloaderExecutorService();
5555
this.httpClient = httpClient;
5656
this.session = session;
@@ -61,7 +61,7 @@ public class ChunkDownloader implements ChunkDownloadCallback {
6161
initializeData();
6262
}
6363

64-
ChunkDownloader(
64+
RemoteChunkProvider(
6565
String statementId,
6666
TRowSet resultData,
6767
IDatabricksSession session,
@@ -78,15 +78,15 @@ public class ChunkDownloader implements ChunkDownloadCallback {
7878
}
7979

8080
@VisibleForTesting
81-
ChunkDownloader(
81+
RemoteChunkProvider(
8282
String statementId,
8383
TRowSet resultData,
8484
IDatabricksSession session,
8585
IDatabricksHttpClient httpClient,
8686
int chunksDownloaderThreadPoolSize,
8787
CompressionType compressionType)
8888
throws DatabricksParsingException {
89-
ChunkDownloader.chunksDownloaderThreadPoolSize = chunksDownloaderThreadPoolSize;
89+
RemoteChunkProvider.chunksDownloaderThreadPoolSize = chunksDownloaderThreadPoolSize;
9090
this.chunkDownloaderExecutorService = createChunksDownloaderExecutorService();
9191
this.httpClient = httpClient;
9292
this.compressionType = compressionType;
@@ -117,12 +117,15 @@ public void downloadLinks(long chunkIndexToDownloadLink) throws DatabricksSQLExc
117117
}
118118

119119
/**
120-
* Fetches the chunk for the given index. If chunk is not already downloaded, will download the
120+
* {@inheritDoc}
121+
*
122+
* <p>Fetches the chunk for the given index. If chunk is not already downloaded, will download the
121123
* chunk first
122124
*
123125
* @return the chunk at given index
124126
*/
125-
ArrowResultChunk getChunk() throws DatabricksSQLException {
127+
@Override
128+
public ArrowResultChunk getChunk() throws DatabricksSQLException {
126129
if (currentChunkIndex < 0) {
127130
return null;
128131
}
@@ -153,11 +156,15 @@ public CompressionType getCompressionType() {
153156
return compressionType;
154157
}
155158

156-
boolean hasNextChunk() {
159+
/** {@inheritDoc} */
160+
@Override
161+
public boolean hasNextChunk() {
157162
return currentChunkIndex < totalChunks - 1;
158163
}
159164

160-
boolean next() {
165+
/** {@inheritDoc} */
166+
@Override
167+
public boolean next() {
161168
if (currentChunkIndex >= 0) {
162169
// release current chunk
163170
releaseChunk();
@@ -170,6 +177,19 @@ boolean next() {
170177
return true;
171178
}
172179

180+
/**
181+
* {@inheritDoc}
182+
*
183+
* <p>Release all chunks from memory. This would be called when result-set has been closed.
184+
*/
185+
@Override
186+
public void close() {
187+
this.isClosed = true;
188+
this.chunkDownloaderExecutorService.shutdownNow();
189+
this.chunkIndexToChunksMap.values().forEach(ArrowResultChunk::releaseChunk);
190+
httpClient.closeExpiredAndIdleConnections();
191+
}
192+
173193
/** Release the memory for previous chunk since it is already consumed */
174194
void releaseChunk() {
175195
if (chunkIndexToChunksMap.get(currentChunkIndex).releaseChunk()) {
@@ -189,22 +209,13 @@ void setChunkLink(ExternalLink chunkLink) {
189209
}
190210
}
191211

192-
/** Release all chunks from memory. This would be called when result-set has been closed. */
193-
void releaseAllChunks() {
194-
this.isClosed = true;
195-
this.chunkDownloaderExecutorService.shutdownNow();
196-
this.chunkIndexToChunksMap.values().forEach(ArrowResultChunk::releaseChunk);
197-
httpClient.closeExpiredAndIdleConnections();
198-
}
199-
200212
void downloadNextChunks() {
201213
while (!this.isClosed
202214
&& nextChunkToDownload < totalChunks
203215
&& totalChunksInMemory < allowedChunksInMemory) {
204216
ArrowResultChunk chunk = chunkIndexToChunksMap.get(nextChunkToDownload);
205217
if (chunk.getStatus() != ArrowResultChunk.ChunkStatus.DOWNLOAD_SUCCEEDED) {
206-
this.chunkDownloaderExecutorService.submit(
207-
new SingleChunkDownloader(chunk, httpClient, this));
218+
this.chunkDownloaderExecutorService.submit(new ChunkDownloadTask(chunk, httpClient, this));
208219
totalChunksInMemory++;
209220
}
210221
nextChunkToDownload++;

0 commit comments

Comments
 (0)