Skip to content

Commit 336e2a1

Browse files
authored
[PECO-1959] Added support for async execution (#534)
* Refactoring IDatabricksStatement and IDatabricksResult interfaces into private ..Handle interfaces and public interfaces
1 parent 6ef1117 commit 336e2a1

38 files changed

+1086
-473
lines changed
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.databricks.jdbc.api;
22

3-
import com.databricks.jdbc.exception.DatabricksSQLException;
43
import java.sql.ResultSet;
4+
import java.sql.SQLException;
55

66
/** Interface for Databricks specific statement. */
77
public interface IDatabricksStatement {
@@ -11,15 +11,15 @@ public interface IDatabricksStatement {
1111
*
1212
* @param sql SQL command to be executed
1313
* @return result set for given execution
14-
* @throws DatabricksSQLException in case of error
14+
* @throws SQLException in case of error
1515
*/
16-
ResultSet executeAsync(String sql) throws DatabricksSQLException;
16+
ResultSet executeAsync(String sql) throws SQLException;
1717

1818
/**
1919
* Returns result set response for the executed statement
2020
*
2121
* @return result set for underlying execution
22-
* @throws DatabricksSQLException if statement was never executed
22+
* @throws SQLException if statement was never executed
2323
*/
24-
ResultSet getExecutionResult() throws DatabricksSQLException;
24+
ResultSet getExecutionResult() throws SQLException;
2525
}

src/main/java/com/databricks/jdbc/api/callback/IDatabricksResultSetHandle.java

Lines changed: 0 additions & 14 deletions
This file was deleted.

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import com.databricks.jdbc.api.IDatabricksSession;
66
import com.databricks.jdbc.api.IDatabricksStatement;
77
import com.databricks.jdbc.api.IDatabricksUCVolumeClient;
8-
import com.databricks.jdbc.api.callback.IDatabricksStatementHandle;
98
import com.databricks.jdbc.api.impl.volume.DatabricksUCVolumeClient;
9+
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1010
import com.databricks.jdbc.common.DatabricksJdbcConstants;
1111
import com.databricks.jdbc.common.util.UserAgentManager;
1212
import com.databricks.jdbc.common.util.ValidationUtil;
1313
import com.databricks.jdbc.dbclient.IDatabricksClient;
14+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1415
import com.databricks.jdbc.exception.DatabricksSQLClientInfoException;
1516
import com.databricks.jdbc.exception.DatabricksSQLException;
1617
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
@@ -30,7 +31,7 @@
3031
public class DatabricksConnection implements IDatabricksConnection, Connection {
3132
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksConnection.class);
3233
private IDatabricksSession session;
33-
private final Set<IDatabricksStatementHandle> statementSet = ConcurrentHashMap.newKeySet();
34+
private final Set<IDatabricksStatementInternal> statementSet = ConcurrentHashMap.newKeySet();
3435
private SQLWarning warnings = null;
3536
private volatile IDatabricksUCVolumeClient ucVolumeClient = null;
3637

@@ -59,7 +60,7 @@ public void open() throws DatabricksSQLException {
5960

6061
@Override
6162
public Statement getStatement(String statementId) throws SQLException {
62-
throw new DatabricksSQLFeatureNotSupportedException("Not implemented");
63+
return new DatabricksStatement(this, StatementId.deserialize(statementId));
6364
}
6465

6566
@Override
@@ -127,7 +128,7 @@ public void rollback() throws SQLException {
127128
@Override
128129
public void close() throws DatabricksSQLException {
129130
LOGGER.debug("public void close()");
130-
for (IDatabricksStatementHandle statement : statementSet) {
131+
for (IDatabricksStatementInternal statement : statementSet) {
131132
statement.close(false);
132133
statementSet.remove(statement);
133134
}
@@ -488,15 +489,18 @@ public int getNetworkTimeout() throws SQLException {
488489
@Override
489490
public <T> T unwrap(Class<T> iface) throws SQLException {
490491
LOGGER.debug("public <T> T unwrap(Class<T> iface)");
491-
throw new DatabricksSQLFeatureNotSupportedException(
492-
"Not implemented in DatabricksConnection - unwrap(Class<T> iface)");
492+
if (iface.isInstance(this)) {
493+
return (T) this;
494+
}
495+
throw new DatabricksSQLException(
496+
String.format(
497+
"Class {%s} cannot be wrapped from {%s}", this.getClass().getName(), iface.getName()));
493498
}
494499

495500
@Override
496501
public boolean isWrapperFor(Class<?> iface) throws SQLException {
497502
LOGGER.debug("public boolean isWrapperFor(Class<?> iface)");
498-
throw new DatabricksSQLFeatureNotSupportedException(
499-
"Not implemented in DatabricksConnection - isWrapperFor(Class<?> iface)");
503+
return iface.isInstance(this);
500504
}
501505

502506
@Override

src/main/java/com/databricks/jdbc/api/impl/DatabricksDatabaseMetaData.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.databricks.jdbc.common.StatementType;
88
import com.databricks.jdbc.common.util.DriverUtil;
99
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
10+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1011
import com.databricks.jdbc.exception.DatabricksSQLException;
1112
import com.databricks.jdbc.log.JdbcLogger;
1213
import com.databricks.jdbc.log.JdbcLoggerFactory;
@@ -877,7 +878,7 @@ public ResultSet getProcedures(String catalog, String schemaPattern, String proc
877878
throwExceptionIfConnectionIsClosed();
878879
return new DatabricksResultSet(
879880
new StatementStatus().setState(StatementState.SUCCEEDED),
880-
"getprocedures-metadata",
881+
new StatementId("getprocedures-metadata"),
881882
Arrays.asList(
882883
"PROCEDURE_CAT",
883884
"PROCEDURE_SCHEM",
@@ -1179,7 +1180,7 @@ public ResultSet getUDTs(
11791180
throwExceptionIfConnectionIsClosed();
11801181
return new DatabricksResultSet(
11811182
new StatementStatus().setState(StatementState.SUCCEEDED),
1182-
"getudts-metadata",
1183+
new StatementId("getudts-metadata"),
11831184
Arrays.asList(
11841185
"TYPE_CAT",
11851186
"TYPE_SCHEM",

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44

55
import com.databricks.jdbc.api.IDatabricksResultSet;
66
import com.databricks.jdbc.api.IDatabricksSession;
7-
import com.databricks.jdbc.api.callback.IDatabricksResultSetHandle;
8-
import com.databricks.jdbc.api.callback.IDatabricksStatementHandle;
97
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
108
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
11-
import com.databricks.jdbc.api.impl.volume.VolumeInputStream;
9+
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
10+
import com.databricks.jdbc.api.internal.IDatabricksResultSetInternal;
11+
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1212
import com.databricks.jdbc.common.StatementType;
1313
import com.databricks.jdbc.common.util.WarningUtil;
14+
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1415
import com.databricks.jdbc.exception.DatabricksParsingException;
1516
import com.databricks.jdbc.exception.DatabricksSQLException;
1617
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
@@ -25,7 +26,6 @@
2526
import com.databricks.sdk.service.sql.StatementState;
2627
import com.databricks.sdk.service.sql.StatementStatus;
2728
import com.google.common.annotations.VisibleForTesting;
28-
import java.io.IOException;
2929
import java.io.InputStream;
3030
import java.io.Reader;
3131
import java.math.BigDecimal;
@@ -36,43 +36,47 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.function.Supplier;
39-
import org.apache.http.HttpEntity;
4039
import org.apache.http.entity.InputStreamEntity;
4140

4241
public class DatabricksResultSet
43-
implements ResultSet, IDatabricksResultSet, IDatabricksResultSetHandle {
42+
implements ResultSet, IDatabricksResultSet, IDatabricksResultSetInternal {
4443

4544
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksResultSet.class);
4645
protected static final String AFFECTED_ROWS_COUNT = "num_affected_rows";
4746
private final StatementStatus statementStatus;
48-
private final String statementId;
47+
private final StatementId statementId;
4948
private final IExecutionResult executionResult;
5049
private final DatabricksResultSetMetaData resultSetMetaData;
5150
private final StatementType statementType;
52-
private final IDatabricksStatementHandle parentStatement;
51+
private final IDatabricksStatementInternal parentStatement;
5352
private Long updateCount;
5453
private boolean isClosed;
5554
private SQLWarning warnings = null;
5655
private boolean wasNull;
57-
private VolumeInputStream volumeInputStream = null;
58-
private long volumeStreamContentLength = -1L;
56+
private boolean isResultInitialized = true;
5957

6058
// Constructor for SEA result set
6159
public DatabricksResultSet(
6260
StatementStatus statementStatus,
63-
String statementId,
61+
StatementId statementId,
6462
ResultData resultData,
6563
ResultManifest resultManifest,
6664
StatementType statementType,
6765
IDatabricksSession session,
68-
IDatabricksStatementHandle parentStatement)
66+
IDatabricksStatementInternal parentStatement)
6967
throws DatabricksParsingException {
7068
this.statementStatus = statementStatus;
7169
this.statementId = statementId;
72-
this.executionResult =
73-
ExecutionResultFactory.getResultSet(
74-
resultData, resultManifest, statementId, session, parentStatement, this);
75-
this.resultSetMetaData = new DatabricksResultSetMetaData(statementId, resultManifest);
70+
if (resultData != null) {
71+
this.executionResult =
72+
ExecutionResultFactory.getResultSet(
73+
resultData, resultManifest, statementId, session, parentStatement);
74+
this.resultSetMetaData = new DatabricksResultSetMetaData(statementId, resultManifest);
75+
} else {
76+
executionResult = null;
77+
resultSetMetaData = null;
78+
isResultInitialized = false;
79+
}
7680
this.statementType = statementType;
7781
this.updateCount = null;
7882
this.parentStatement = parentStatement;
@@ -83,9 +87,9 @@ public DatabricksResultSet(
8387
@VisibleForTesting
8488
public DatabricksResultSet(
8589
StatementStatus statementStatus,
86-
String statementId,
90+
StatementId statementId,
8791
StatementType statementType,
88-
IDatabricksStatementHandle parentStatement,
92+
IDatabricksStatementInternal parentStatement,
8993
IExecutionResult executionResult,
9094
DatabricksResultSetMetaData resultSetMetaData) {
9195
this.statementStatus = statementStatus;
@@ -102,26 +106,39 @@ public DatabricksResultSet(
102106
// Constructor for thrift result set
103107
public DatabricksResultSet(
104108
TStatus statementStatus,
105-
String statementId,
109+
StatementId statementId,
106110
TRowSet resultData,
107111
TGetResultSetMetadataResp resultManifest,
108112
StatementType statementType,
109-
IDatabricksStatementHandle parentStatement,
113+
IDatabricksStatementInternal parentStatement,
110114
IDatabricksSession session)
111115
throws SQLException {
112-
if (SUCCESS_STATUS_LIST.contains(statementStatus.getStatusCode())) {
113-
this.statementStatus = new StatementStatus().setState(StatementState.SUCCEEDED);
114-
} else {
115-
this.statementStatus = new StatementStatus().setState(StatementState.FAILED);
116+
switch (statementStatus.getStatusCode()) {
117+
case SUCCESS_STATUS:
118+
case SUCCESS_WITH_INFO_STATUS:
119+
this.statementStatus = new StatementStatus().setState(StatementState.SUCCEEDED);
120+
break;
121+
case STILL_EXECUTING_STATUS:
122+
this.statementStatus = new StatementStatus().setState(StatementState.RUNNING);
123+
break;
124+
default:
125+
this.statementStatus = new StatementStatus().setState(StatementState.FAILED);
116126
}
127+
117128
this.statementId = statementId;
118-
this.executionResult =
119-
ExecutionResultFactory.getResultSet(
120-
resultData, resultManifest, statementId, session, parentStatement, this);
121-
long rowSize = getRowCount(resultData);
122-
this.resultSetMetaData =
123-
new DatabricksResultSetMetaData(
124-
statementId, resultManifest, rowSize, resultData.getResultLinksSize());
129+
if (resultData != null) {
130+
this.executionResult =
131+
ExecutionResultFactory.getResultSet(
132+
resultData, resultManifest, statementId, session, parentStatement);
133+
long rowSize = getRowCount(resultData);
134+
this.resultSetMetaData =
135+
new DatabricksResultSetMetaData(
136+
statementId, resultManifest, rowSize, resultData.getResultLinksSize());
137+
} else {
138+
this.executionResult = null;
139+
this.resultSetMetaData = null;
140+
this.isResultInitialized = false;
141+
}
125142
this.statementType = statementType;
126143
this.updateCount = null;
127144
this.parentStatement = parentStatement;
@@ -132,7 +149,7 @@ public DatabricksResultSet(
132149
// Constructing results for getUDTs, getTypeInfo, getProcedures metadata calls
133150
public DatabricksResultSet(
134151
StatementStatus statementStatus,
135-
String statementId,
152+
StatementId statementId,
136153
List<String> columnNames,
137154
List<String> columnTypeText,
138155
List<Integer> columnTypes,
@@ -160,7 +177,7 @@ public DatabricksResultSet(
160177
// Constructing metadata result set in thrift flow
161178
public DatabricksResultSet(
162179
StatementStatus statementStatus,
163-
String statementId,
180+
StatementId statementId,
164181
List<String> columnNames,
165182
List<String> columnTypeText,
166183
List<Integer> columnTypes,
@@ -188,7 +205,7 @@ public DatabricksResultSet(
188205
// Constructing metadata result set in SEA flow
189206
public DatabricksResultSet(
190207
StatementStatus statementStatus,
191-
String statementId,
208+
StatementId statementId,
192209
List<ColumnMetadata> columnMetadataList,
193210
List<List<Object>> rows,
194211
StatementType statementType) {
@@ -1517,7 +1534,7 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
15171534

15181535
@Override
15191536
public String getStatementId() {
1520-
return statementId;
1537+
return statementId.toString();
15211538
}
15221539

15231540
@Override
@@ -1555,18 +1572,13 @@ public boolean hasUpdateCount() throws SQLException {
15551572
&& this.resultSetMetaData.getTotalRows() == 1;
15561573
}
15571574

1558-
@Override
1559-
public void setVolumeOperationEntityStream(HttpEntity httpEntity)
1560-
throws SQLException, IOException {
1561-
checkIfClosed();
1562-
this.volumeInputStream = new VolumeInputStream(httpEntity);
1563-
this.volumeStreamContentLength = httpEntity.getContentLength();
1564-
}
1565-
15661575
@Override
15671576
public InputStreamEntity getVolumeOperationInputStream() throws SQLException {
15681577
checkIfClosed();
1569-
return new InputStreamEntity(this.volumeInputStream, this.volumeStreamContentLength);
1578+
if (executionResult instanceof VolumeOperationResult) {
1579+
return ((VolumeOperationResult) executionResult).getVolumeOperationInputStream();
1580+
}
1581+
throw new DatabricksSQLException("Invalid volume operation");
15701582
}
15711583

15721584
private void addWarningAndLog(String warningMessage) {

0 commit comments

Comments
 (0)