Skip to content

Commit e8ceb7f

Browse files
authored
Fix for async interface in SDK client (#606)
1 parent 9a0e918 commit e8ceb7f

File tree

3 files changed

+102
-14
lines changed

3 files changed

+102
-14
lines changed

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DatabricksSdkClient implements IDatabricksClient {
3838

3939
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksSdkClient.class);
4040
private static final String SYNC_TIMEOUT_VALUE = "10s";
41+
private static final String ASYNC_TIMEOUT_VALUE = "0s";
4142
private final IDatabricksConnectionContext connectionContext;
4243
private final ClientConfigurator clientConfigurator;
4344
private volatile WorkspaceClient workspaceClient;
@@ -135,7 +136,8 @@ public DatabricksResultSet executeStatement(
135136
((Warehouse) computeResource).getWarehouseId(),
136137
session,
137138
parameters,
138-
parentStatement);
139+
parentStatement,
140+
false);
139141
ExecuteStatementResponse response =
140142
workspaceClient
141143
.apiClient()
@@ -217,7 +219,8 @@ public DatabricksResultSet executeStatementAsync(
217219
((Warehouse) computeResource).getWarehouseId(),
218220
session,
219221
parameters,
220-
parentStatement);
222+
parentStatement,
223+
true);
221224
ExecuteStatementResponse response =
222225
workspaceClient
223226
.apiClient()
@@ -319,7 +322,8 @@ private ExecuteStatementRequest getRequest(
319322
String warehouseId,
320323
IDatabricksSession session,
321324
Map<Integer, ImmutableSqlParameter> parameters,
322-
IDatabricksStatementInternal parentStatement)
325+
IDatabricksStatementInternal parentStatement,
326+
boolean executeAsync)
323327
throws SQLException {
324328
Format format = useCloudFetchForResult(statementType) ? Format.ARROW_STREAM : Format.JSON_ARRAY;
325329
Disposition disposition =
@@ -341,9 +345,14 @@ private ExecuteStatementRequest getRequest(
341345
.setDisposition(disposition)
342346
.setFormat(format)
343347
.setResultCompression(compressionCodec)
344-
.setWaitTimeout(SYNC_TIMEOUT_VALUE)
345-
.setOnWaitTimeout(ExecuteStatementRequestOnWaitTimeout.CONTINUE)
346348
.setParameters(parameterListItems);
349+
if (executeAsync) {
350+
request.setWaitTimeout(ASYNC_TIMEOUT_VALUE);
351+
} else {
352+
request
353+
.setWaitTimeout(SYNC_TIMEOUT_VALUE)
354+
.setOnWaitTimeout(ExecuteStatementRequestOnWaitTimeout.CONTINUE);
355+
}
347356
if (maxRows != DEFAULT_ROW_LIMIT) {
348357
request.setRowLimit(maxRows);
349358
}

src/test/java/com/databricks/client/jdbc/DriverTest.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -434,13 +434,19 @@ void testAllPurposeClusters_errorHandling() throws Exception {
434434
void testAllPurposeClusters_async() throws Exception {
435435
String jdbcUrl =
436436
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:443/default;ssl=1;AuthMech=3;httpPath=sql/protocolv1/o/6051921418418893/1115-130834-ms4m0yv;enableDirectResults=1";
437-
Connection con = DriverManager.getConnection(jdbcUrl, "token", "token");
437+
String token = "token";
438+
Connection con = DriverManager.getConnection(jdbcUrl, "token", token);
438439
System.out.println("Connection established...... con1");
439440
Statement s = con.createStatement();
440441
IDatabricksStatement ids = s.unwrap(IDatabricksStatement.class);
442+
try {
443+
s.execute("DROP TABLE JDBC_ASYNC_CLUSTER");
444+
} catch (Exception e) {
445+
// do nothing
446+
}
441447
long initialTime = System.currentTimeMillis();
442448
String sql =
443-
"CREATE TABLE TMP_P2P_EKKO_EKPO_ASYNC8 AS ("
449+
"CREATE TABLE JDBC_ASYNC_CLUSTER AS ("
444450
+ " SELECT * FROM ("
445451
+ " SELECT * FROM ("
446452
+ " SELECT t1.*"
@@ -471,7 +477,75 @@ void testAllPurposeClusters_async() throws Exception {
471477
+ (System.currentTimeMillis() - initialTime));
472478
}
473479

474-
Connection con2 = DriverManager.getConnection(jdbcUrl, "token", "token");
480+
Connection con2 = DriverManager.getConnection(jdbcUrl, "token", token);
481+
System.out.println("Connection established......con2");
482+
IDatabricksConnection idc = con2.unwrap(IDatabricksConnection.class);
483+
Statement stm = idc.getStatement(rs.unwrap(IDatabricksResultSet.class).getStatementId());
484+
ResultSet rs2 = stm.unwrap(IDatabricksStatement.class).getExecutionResult();
485+
486+
System.out.println(
487+
"Status of async execution using con2 "
488+
+ rs2.unwrap(IDatabricksResultSet.class).getStatementStatus().getState());
489+
490+
stm.cancel();
491+
stm.execute("DROP TABLE JDBC_ASYNC_CLUSTER");
492+
System.out.println("Statement cancelled using con2");
493+
s.close();
494+
System.out.println("Statement cancelled using con1");
495+
con2.close();
496+
con.close();
497+
System.out.println("Connection closed successfully......");
498+
}
499+
500+
@Test
501+
void testDBSQL_async() throws Exception {
502+
String jdbcUrl =
503+
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:443/default;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/791ba2a31c7fd70a;enableDirectResults=1";
504+
String token = "token";
505+
Connection con = DriverManager.getConnection(jdbcUrl, "token", token);
506+
System.out.println("Connection established...... con1");
507+
Statement s = con.createStatement();
508+
IDatabricksStatement ids = s.unwrap(IDatabricksStatement.class);
509+
try {
510+
s.execute("DROP TABLE JDBC_ASYNC_DBSQL");
511+
} catch (Exception e) {
512+
// do nothing
513+
}
514+
long initialTime = System.currentTimeMillis();
515+
String sql =
516+
"CREATE TABLE JDBC_ASYNC_DBSQL AS ("
517+
+ " SELECT * FROM ("
518+
+ " SELECT * FROM ("
519+
+ " SELECT t1.*"
520+
+ " FROM main.streaming.random_large_table t1"
521+
+ " INNER JOIN main.streaming.random_large_table t2"
522+
+ " ON t1.prompt = t2.prompt"
523+
+ " ) nested_t1"
524+
+ " ) nested_t2"
525+
+ ")";
526+
ResultSet rs = ids.executeAsync(sql);
527+
System.out.println("Time taken: " + (System.currentTimeMillis() - initialTime));
528+
System.out.println(
529+
"Status of async execution " + rs.unwrap(IDatabricksResultSet.class).getStatementStatus());
530+
531+
System.out.println("StatementId " + rs.unwrap(IDatabricksResultSet.class).getStatementId());
532+
533+
int count = 1;
534+
StatementState state = rs.unwrap(IDatabricksResultSet.class).getStatementStatus().getState();
535+
while (state != StatementState.SUCCEEDED && state != StatementState.FAILED) {
536+
Thread.sleep(1000);
537+
rs = s.unwrap(IDatabricksStatement.class).getExecutionResult();
538+
state = rs.unwrap(IDatabricksResultSet.class).getStatementStatus().getState();
539+
System.out.println(
540+
"Status of async execution "
541+
+ state
542+
+ " attempt "
543+
+ count++
544+
+ " time taken "
545+
+ (System.currentTimeMillis() - initialTime));
546+
}
547+
548+
Connection con2 = DriverManager.getConnection(jdbcUrl, "token", token);
475549
System.out.println("Connection established......con2");
476550
IDatabricksConnection idc = con2.unwrap(IDatabricksConnection.class);
477551
Statement stm = idc.getStatement(rs.unwrap(IDatabricksResultSet.class).getStatementId());
@@ -482,7 +556,7 @@ void testAllPurposeClusters_async() throws Exception {
482556
+ rs2.unwrap(IDatabricksResultSet.class).getStatementStatus().getState());
483557

484558
stm.cancel();
485-
stm.execute("DROP TABLE TMP_P2P_EKKO_EKPO_ASYNC8");
559+
stm.execute("DROP TABLE JDBC_ASYNC_DBSQL");
486560
System.out.println("Statement cancelled using con2");
487561
s.close();
488562
System.out.println("Statement cancelled using con1");

src/test/java/com/databricks/jdbc/api/impl/DatabricksSdkClientTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private void setupSessionMocks() {
5757
.thenReturn(response);
5858
}
5959

60-
private void setupClientMocks(boolean includeResults) {
60+
private void setupClientMocks(boolean includeResults, boolean async) {
6161
List<StatementParameterListItem> params =
6262
new ArrayList<>() {
6363
{
@@ -76,10 +76,15 @@ private void setupClientMocks(boolean includeResults) {
7676
.setStatement(STATEMENT)
7777
.setDisposition(Disposition.EXTERNAL_LINKS)
7878
.setFormat(Format.ARROW_STREAM)
79-
.setWaitTimeout("10s")
8079
.setRowLimit(100L)
81-
.setOnWaitTimeout(ExecuteStatementRequestOnWaitTimeout.CONTINUE)
8280
.setParameters(params);
81+
if (async) {
82+
executeStatementRequest.setWaitTimeout("0s");
83+
} else {
84+
executeStatementRequest
85+
.setWaitTimeout("10s")
86+
.setOnWaitTimeout(ExecuteStatementRequestOnWaitTimeout.CONTINUE);
87+
}
8388
ExecuteStatementResponse response =
8489
new ExecuteStatementResponse()
8590
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
@@ -142,7 +147,7 @@ public void testDeleteSession() throws DatabricksSQLException {
142147

143148
@Test
144149
public void testExecuteStatement() throws Exception {
145-
setupClientMocks(true);
150+
setupClientMocks(true, false);
146151
IDatabricksConnectionContext connectionContext =
147152
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
148153
DatabricksSdkClient databricksSdkClient =
@@ -176,7 +181,7 @@ public void testExecuteStatement() throws Exception {
176181

177182
@Test
178183
public void testExecuteStatementAsync() throws Exception {
179-
setupClientMocks(false);
184+
setupClientMocks(false, true);
180185
IDatabricksConnectionContext connectionContext =
181186
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
182187
DatabricksSdkClient databricksSdkClient =

0 commit comments

Comments
 (0)