Skip to content

Commit e5bc274

Browse files
authored
[ES-1006739] Input stream support for UC Volume Operation implementation (#382)
* changes * Minor test changes * Updated changes * Address comemnts * minor fix
1 parent 5c08711 commit e5bc274

File tree

8 files changed

+520
-63
lines changed

8 files changed

+520
-63
lines changed

src/main/java/com/databricks/jdbc/core/DatabricksResultSet.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public DatabricksResultSet(
5555
this.statementStatus = statementStatus;
5656
this.statementId = statementId;
5757
this.executionResult =
58-
ExecutionResultFactory.getResultSet(resultData, resultManifest, statementId, session);
58+
ExecutionResultFactory.getResultSet(
59+
resultData, resultManifest, statementId, session, parentStatement, this);
5960
this.resultSetMetaData = new DatabricksResultSetMetaData(statementId, resultManifest);
6061
this.statementType = statementType;
6162
this.updateCount = null;
@@ -99,7 +100,8 @@ public DatabricksResultSet(
99100
}
100101
this.statementId = statementId;
101102
this.executionResult =
102-
ExecutionResultFactory.getResultSet(resultData, resultManifest, statementId, session);
103+
ExecutionResultFactory.getResultSet(
104+
resultData, resultManifest, statementId, session, parentStatement, this);
103105
long rowSize = getRowCount(resultData);
104106
this.resultSetMetaData =
105107
new DatabricksResultSetMetaData(

src/main/java/com/databricks/jdbc/core/EmptyResultSet.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package com.databricks.jdbc.core;
22

3+
import com.databricks.sdk.service.sql.StatementStatus;
4+
import java.io.IOException;
35
import java.io.InputStream;
46
import java.io.Reader;
57
import java.math.BigDecimal;
68
import java.net.URL;
79
import java.sql.*;
810
import java.util.Calendar;
911
import java.util.Map;
12+
import org.apache.http.HttpEntity;
13+
import org.apache.http.entity.InputStreamEntity;
1014

1115
/** Empty implementation of ResultSet */
12-
class EmptyResultSet implements ResultSet {
16+
class EmptyResultSet implements ResultSet, IDatabricksResultSet {
1317
private boolean isClosed;
18+
private HttpEntity httpEntity = null;
1419

1520
EmptyResultSet() {
1621
isClosed = false;
@@ -1082,4 +1087,39 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
10821087
checkIfClosed();
10831088
return false;
10841089
}
1090+
1091+
@Override
1092+
public String statementId() {
1093+
return null;
1094+
}
1095+
1096+
@Override
1097+
public StatementStatus getStatementStatus() {
1098+
return null;
1099+
}
1100+
1101+
@Override
1102+
public long getUpdateCount() throws SQLException {
1103+
return 0;
1104+
}
1105+
1106+
@Override
1107+
public boolean hasUpdateCount() throws SQLException {
1108+
return false;
1109+
}
1110+
1111+
@Override
1112+
public void setVolumeOperationEntityStream(HttpEntity httpEntity)
1113+
throws SQLException, IOException {
1114+
this.httpEntity = httpEntity;
1115+
}
1116+
1117+
@Override
1118+
public InputStreamEntity getVolumeOperationInputStream() throws SQLException {
1119+
try {
1120+
return new InputStreamEntity(httpEntity.getContent(), httpEntity.getContentLength());
1121+
} catch (IOException e) {
1122+
return null;
1123+
}
1124+
}
10851125
}

src/main/java/com/databricks/jdbc/core/ExecutionResultFactory.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,22 @@
1111

1212
class ExecutionResultFactory {
1313
static IExecutionResult getResultSet(
14-
ResultData data, ResultManifest manifest, String statementId, IDatabricksSession session) {
14+
ResultData data,
15+
ResultManifest manifest,
16+
String statementId,
17+
IDatabricksSession session,
18+
IDatabricksStatement statement,
19+
IDatabricksResultSet resultSet) {
1520
IExecutionResult resultHandler = getResultHandler(data, manifest, statementId, session);
1621
if (manifest.getIsVolumeOperation() != null && manifest.getIsVolumeOperation()) {
1722
return new VolumeOperationResult(
1823
statementId,
1924
manifest.getTotalRowCount(),
2025
manifest.getSchema().getColumnCount(),
2126
session,
22-
resultHandler);
27+
resultHandler,
28+
statement,
29+
resultSet);
2330
} else {
2431
return resultHandler;
2532
}
@@ -46,7 +53,9 @@ static IExecutionResult getResultSet(
4653
TRowSet data,
4754
TGetResultSetMetadataResp manifest,
4855
String statementId,
49-
IDatabricksSession session)
56+
IDatabricksSession session,
57+
IDatabricksStatement statement,
58+
IDatabricksResultSet resultSet)
5059
throws DatabricksSQLException {
5160
IExecutionResult resultHandler = getResultHandler(data, manifest, statementId, session);
5261
if (manifest.isSetIsStagingOperation() && manifest.isIsStagingOperation()) {
@@ -55,7 +64,9 @@ static IExecutionResult getResultSet(
5564
DatabricksThriftHelper.getRowCount(data),
5665
manifest.getSchema().getColumnsSize(),
5766
session,
58-
resultHandler);
67+
resultHandler,
68+
statement,
69+
resultSet);
5970
} else {
6071
return resultHandler;
6172
}

src/main/java/com/databricks/jdbc/core/VolumeOperationExecutor.java

Lines changed: 98 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.databricks.jdbc.commons.LogLevel;
66
import com.databricks.jdbc.commons.util.LoggingUtil;
77
import java.io.*;
8+
import java.sql.SQLException;
89
import java.util.*;
910
import org.apache.http.HttpEntity;
1011
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -13,6 +14,7 @@
1314
import org.apache.http.client.methods.HttpPut;
1415
import org.apache.http.entity.ContentType;
1516
import org.apache.http.entity.FileEntity;
17+
import org.apache.http.entity.InputStreamEntity;
1618
import org.apache.http.util.EntityUtils;
1719

1820
/** Executor for volume operations */
@@ -30,8 +32,10 @@ class VolumeOperationExecutor implements Runnable {
3032
private final String localFilePath;
3133
private final Map<String, String> headers;
3234
private final Set<String> allowedVolumeIngestionPaths;
35+
private final IDatabricksStatement statement;
36+
private final IDatabricksResultSet resultSet;
37+
private final IDatabricksHttpClient databricksHttpClient;
3338
private VolumeOperationStatus status;
34-
private IDatabricksHttpClient databricksHttpClient;
3539
private String errorMessage;
3640

3741
VolumeOperationExecutor(
@@ -40,13 +44,17 @@ class VolumeOperationExecutor implements Runnable {
4044
Map<String, String> headers,
4145
String localFilePath,
4246
String allowedVolumeIngestionPathString,
43-
IDatabricksHttpClient databricksHttpClient) {
47+
IDatabricksHttpClient databricksHttpClient,
48+
IDatabricksStatement statement,
49+
IDatabricksResultSet resultSet) {
4450
this.operationType = operationType;
4551
this.operationUrl = operationUrl;
4652
this.localFilePath = localFilePath;
4753
this.headers = headers;
4854
this.allowedVolumeIngestionPaths = getAllowedPaths(allowedVolumeIngestionPathString);
4955
this.databricksHttpClient = databricksHttpClient;
56+
this.statement = statement;
57+
this.resultSet = resultSet;
5058
this.status = VolumeOperationStatus.PENDING;
5159
this.errorMessage = null;
5260
}
@@ -101,6 +109,16 @@ String getErrorMessage() {
101109
}
102110

103111
private void validateLocalFilePath() {
112+
try {
113+
if (statement.isAllowedInputStreamForVolumeOperation()) {
114+
return;
115+
}
116+
} catch (DatabricksSQLException e) {
117+
status = VolumeOperationStatus.ABORTED;
118+
errorMessage = "Volume operation called on closed statement: " + e.getMessage();
119+
LoggingUtil.log(LogLevel.ERROR, errorMessage);
120+
return;
121+
}
104122
if (allowedVolumeIngestionPaths.isEmpty()) {
105123
LoggingUtil.log(LogLevel.ERROR, "Volume ingestion paths are not set");
106124
status = VolumeOperationStatus.ABORTED;
@@ -136,6 +154,36 @@ private void executeGetOperation() {
136154
HttpGet httpGet = new HttpGet(operationUrl);
137155
headers.forEach(httpGet::addHeader);
138156

157+
HttpEntity entity = null;
158+
try {
159+
// We return the input stream directly to clients, if they want to consume as input stream
160+
if (statement.isAllowedInputStreamForVolumeOperation()) {
161+
CloseableHttpResponse response = databricksHttpClient.execute(httpGet);
162+
if (!isSuccessfulHttpResponse(response)) {
163+
status = VolumeOperationStatus.FAILED;
164+
errorMessage =
165+
String.format(
166+
"Failed to fetch content from volume with error code {%s} for input stream and error {%s}",
167+
response.getStatusLine().getStatusCode(),
168+
response.getStatusLine().getReasonPhrase());
169+
LoggingUtil.log(LogLevel.ERROR, errorMessage);
170+
return;
171+
}
172+
entity = response.getEntity();
173+
if (entity != null) {
174+
this.resultSet.setVolumeOperationEntityStream(entity);
175+
}
176+
status = VolumeOperationStatus.SUCCEEDED;
177+
return;
178+
}
179+
} catch (SQLException | IOException e) {
180+
status = VolumeOperationStatus.FAILED;
181+
errorMessage = "Failed to execute GET operation for input stream: " + e.getMessage();
182+
LoggingUtil.log(LogLevel.ERROR, errorMessage);
183+
return;
184+
}
185+
186+
// Copy the data in local file as requested by user
139187
File localFile = new File(localFilePath);
140188
if (localFile.exists()) {
141189
LoggingUtil.log(
@@ -151,13 +199,13 @@ private void executeGetOperation() {
151199
LoggingUtil.log(
152200
LogLevel.ERROR,
153201
String.format(
154-
"Failed to fetch content from volume with error {} for local file {%s}",
202+
"Failed to fetch content from volume with error {%s} for local file {%s}",
155203
response.getStatusLine().getStatusCode(), localFilePath));
156204
status = VolumeOperationStatus.FAILED;
157205
errorMessage = "Failed to download file";
158206
return;
159207
}
160-
HttpEntity entity = response.getEntity();
208+
entity = response.getEntity();
161209
if (entity != null) {
162210
// Get the content of the HttpEntity
163211
InputStream inputStream = entity.getContent();
@@ -200,34 +248,31 @@ private void executePutOperation() {
200248
HttpPut httpPut = new HttpPut(operationUrl);
201249
headers.forEach(httpPut::addHeader);
202250

203-
// Set the FileEntity as the request body
204-
File file = new File(localFilePath);
205-
if (!file.exists() || file.isDirectory()) {
206-
LoggingUtil.log(
207-
LogLevel.ERROR,
208-
String.format("Local file does not exist or is a directory {%s}", localFilePath));
209-
status = VolumeOperationStatus.ABORTED;
210-
errorMessage = "Local file does not exist or is a directory";
211-
return;
212-
}
213-
if (file.length() == 0) {
214-
215-
LoggingUtil.log(LogLevel.ERROR, String.format("Local file is empty {%s}", localFilePath));
216-
status = VolumeOperationStatus.ABORTED;
217-
errorMessage = "Local file is empty";
218-
return;
219-
}
251+
try {
252+
if (statement.isAllowedInputStreamForVolumeOperation()) {
253+
InputStreamEntity inputStream = statement.getInputStreamForUCVolume();
254+
if (inputStream == null) {
255+
status = VolumeOperationStatus.ABORTED;
256+
errorMessage = "InputStream not set for PUT operation";
257+
LoggingUtil.log(LogLevel.ERROR, errorMessage);
258+
return;
259+
}
260+
httpPut.setEntity(inputStream);
261+
} else {
262+
// Set the FileEntity as the request body
263+
File file = new File(localFilePath);
220264

221-
if (file.length() > PUT_SIZE_LIMITS) {
222-
LoggingUtil.log(LogLevel.ERROR, String.format("Local file too large {%s}", localFilePath));
265+
if (localFileHasErrorForPutOperation(file)) {
266+
return;
267+
}
268+
httpPut.setEntity(new FileEntity(file, ContentType.DEFAULT_BINARY));
269+
}
270+
} catch (DatabricksSQLException e) {
223271
status = VolumeOperationStatus.ABORTED;
224-
errorMessage = "Local file too large";
225-
return;
272+
errorMessage = "PUT operation called on closed statement";
273+
LoggingUtil.log(LogLevel.ERROR, errorMessage);
226274
}
227275

228-
FileEntity fileEntity = new FileEntity(file, ContentType.DEFAULT_BINARY);
229-
httpPut.setEntity(fileEntity);
230-
231276
// Execute the request
232277
try (CloseableHttpResponse response = databricksHttpClient.execute(httpPut)) {
233278
// Process the response
@@ -254,6 +299,31 @@ private void executePutOperation() {
254299
}
255300
}
256301

302+
private boolean localFileHasErrorForPutOperation(File file) {
303+
if (!file.exists() || file.isDirectory()) {
304+
LoggingUtil.log(
305+
LogLevel.ERROR,
306+
String.format("Local file does not exist or is a directory {%s}", localFilePath));
307+
status = VolumeOperationStatus.ABORTED;
308+
errorMessage = "Local file does not exist or is a directory";
309+
return true;
310+
}
311+
if (file.length() == 0) {
312+
LoggingUtil.log(LogLevel.ERROR, String.format("Local file is empty {%s}", localFilePath));
313+
status = VolumeOperationStatus.ABORTED;
314+
errorMessage = "Local file is empty";
315+
return true;
316+
}
317+
318+
if (file.length() > PUT_SIZE_LIMITS) {
319+
LoggingUtil.log(LogLevel.ERROR, String.format("Local file too large {%s}", localFilePath));
320+
status = VolumeOperationStatus.ABORTED;
321+
errorMessage = "Local file too large";
322+
return true;
323+
}
324+
return false;
325+
}
326+
257327
private void executeDeleteOperation() {
258328
// TODO: Check for AWS specific handling
259329
HttpDelete httpDelete = new HttpDelete(operationUrl);

src/main/java/com/databricks/jdbc/core/VolumeOperationResult.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class VolumeOperationResult implements IExecutionResult {
2121
private final IDatabricksSession session;
2222
private final String statementId;
2323
private final IExecutionResult resultHandler;
24+
private final IDatabricksResultSet resultSet;
25+
private final IDatabricksStatement statement;
2426
private final IDatabricksHttpClient httpClient;
2527
private final long rowCount;
2628
private final long columnCount;
@@ -33,12 +35,16 @@ class VolumeOperationResult implements IExecutionResult {
3335
long totalRows,
3436
long totalColumns,
3537
IDatabricksSession session,
36-
IExecutionResult resultHandler) {
38+
IExecutionResult resultHandler,
39+
IDatabricksStatement statement,
40+
IDatabricksResultSet resultSet) {
3741
this.statementId = statementId;
3842
this.rowCount = totalRows;
3943
this.columnCount = totalColumns;
4044
this.session = session;
4145
this.resultHandler = resultHandler;
46+
this.statement = statement;
47+
this.resultSet = resultSet;
4248
this.httpClient = DatabricksHttpClient.getInstance(session.getConnectionContext());
4349
this.currentRowIndex = -1;
4450
}
@@ -49,12 +55,16 @@ class VolumeOperationResult implements IExecutionResult {
4955
ResultManifest manifest,
5056
IDatabricksSession session,
5157
IExecutionResult resultHandler,
52-
IDatabricksHttpClient httpClient) {
58+
IDatabricksHttpClient httpClient,
59+
IDatabricksStatement statement,
60+
IDatabricksResultSet resultSet) {
5361
this.statementId = statementId;
5462
this.rowCount = manifest.getTotalRowCount();
5563
this.columnCount = manifest.getSchema().getColumnCount();
5664
this.session = session;
5765
this.resultHandler = resultHandler;
66+
this.statement = statement;
67+
this.resultSet = resultSet;
5868
this.httpClient = httpClient;
5969
this.currentRowIndex = -1;
6070
}
@@ -73,7 +83,9 @@ private void initHandler(IExecutionResult resultHandler) throws DatabricksSQLExc
7383
session
7484
.getClientInfoProperties()
7585
.getOrDefault(ALLOWED_VOLUME_INGESTION_PATHS.toLowerCase(), ""),
76-
httpClient);
86+
httpClient,
87+
statement,
88+
resultSet);
7789
Thread thread = new Thread(volumeOperationExecutor);
7890
thread.setName("VolumeOperationExecutor " + statementId);
7991
thread.start();

0 commit comments

Comments
 (0)