Skip to content

Commit e1e6a07

Browse files
authored
Merge pull request #193 from databricks/abortstatement
[PECO-1683] Add cancel Statement support
2 parents 1aa675f + f8fda4a commit e1e6a07

File tree

8 files changed

+104
-4
lines changed

8 files changed

+104
-4
lines changed

src/main/java/com/databricks/jdbc/client/DatabricksClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,16 @@ DatabricksResultSet executeStatement(
5959
* Closes a statement in Databricks server
6060
*
6161
* @param statementId statement which should be closed
62-
* @return response for statement execution
6362
*/
6463
void closeStatement(String statementId) throws DatabricksSQLException;
6564

65+
/**
66+
* Cancels a statement in Databricks server
67+
*
68+
* @param statementId statement which should be aborted
69+
*/
70+
void cancelStatement(String statementId) throws DatabricksSQLException;
71+
6672
/**
6773
* Fetches the chunk details for given chunk index and statement-Id.
6874
*

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksSdkClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,14 @@ public void closeStatement(String statementId) {
205205
workspaceClient.apiClient().DELETE(path, request, Void.class, getHeaders());
206206
}
207207

208+
@Override
209+
public void cancelStatement(String statementId) {
210+
LOGGER.debug("public void cancelStatement(String statementId = {})", statementId);
211+
CancelStatementRequest request = new CancelStatementRequest().setStatementId(statementId);
212+
String path = String.format(CANCEL_STATEMENT_PATH_WITH_ID, request.getStatementId());
213+
workspaceClient.apiClient().POST(path, request, Void.class, getHeaders());
214+
}
215+
208216
@Override
209217
public Collection<ExternalLink> getResultChunks(String statementId, long chunkIndex) {
210218
LOGGER.debug(

src/main/java/com/databricks/jdbc/client/impl/sdk/PathConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ public class PathConstants {
77
public static final String STATEMENT_PATH = BASE_PATH + "statements/";
88
public static final String DELETE_SESSION_PATH_WITH_ID = STATEMENT_PATH + "sessions/%s";
99
public static final String STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "%s";
10+
public static final String CANCEL_STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "%s/cancel";
1011
public static final String RESULT_CHUNK_PATH = STATEMENT_PATH_WITH_ID + "/result/chunks/%s";
1112
}

src/main/java/com/databricks/jdbc/client/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ public void closeStatement(String statementId) throws DatabricksSQLException {
119119
"closeStatement for all purpose cluster not implemented");
120120
}
121121

122+
@Override
123+
public void cancelStatement(String statementId) throws DatabricksSQLException {
124+
LOGGER.debug(
125+
"public void cancelStatement(String statementId = {}) for all purpose cluster",
126+
statementId);
127+
throw new DatabricksSQLFeatureNotImplementedException(
128+
"abortStatement for all purpose cluster not implemented");
129+
}
130+
122131
@Override
123132
public Collection<ExternalLink> getResultChunks(String statementId, long chunkIndex)
124133
throws DatabricksSQLException {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.databricks.jdbc.client.sqlexec;
2+
3+
import com.databricks.sdk.support.ToStringer;
4+
import java.util.Objects;
5+
6+
public class CancelStatementRequest {
7+
private String statementId;
8+
9+
public CancelStatementRequest() {}
10+
11+
public CancelStatementRequest setStatementId(String statementId) {
12+
this.statementId = statementId;
13+
return this;
14+
}
15+
16+
public String getStatementId() {
17+
return this.statementId;
18+
}
19+
20+
public boolean equals(Object o) {
21+
if (this == o) {
22+
return true;
23+
} else if (o != null && this.getClass() == o.getClass()) {
24+
CancelStatementRequest that = (CancelStatementRequest) o;
25+
return Objects.equals(this.statementId, that.statementId);
26+
} else {
27+
return false;
28+
}
29+
}
30+
31+
public int hashCode() {
32+
return Objects.hash(new Object[] {this.statementId});
33+
}
34+
35+
public String toString() {
36+
return (new ToStringer(CancelStatementRequest.class))
37+
.add("statementId", this.statementId)
38+
.toString();
39+
}
40+
}

src/main/java/com/databricks/jdbc/core/DatabricksStatement.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,14 @@ public void setQueryTimeout(int seconds) throws SQLException {
132132
@Override
133133
public void cancel() throws SQLException {
134134
LOGGER.debug("public void cancel()");
135-
throw new DatabricksSQLFeatureNotSupportedException(
136-
"Not implemented in DatabricksStatement - cancel()");
135+
checkIfClosed();
136+
137+
if (statementId != null) {
138+
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
139+
} else {
140+
WarningUtil.addWarning(
141+
warnings, "The statement you are trying to cancel does not have an ID yet.");
142+
}
137143
}
138144

139145
@Override

src/test/java/com/databricks/jdbc/core/DatabricksSdkClientTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,31 @@ public void testExecuteStatement() throws Exception {
167167
assertEquals(STATEMENT_ID, statement.getStatementId());
168168
}
169169

170+
@Test
171+
public void testCloseStatement() throws DatabricksSQLException {
172+
String path = String.format(STATEMENT_PATH_WITH_ID, STATEMENT_ID);
173+
IDatabricksConnectionContext connectionContext =
174+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
175+
DatabricksSdkClient databricksSdkClient =
176+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
177+
CloseStatementRequest request = new CloseStatementRequest().setStatementId(STATEMENT_ID);
178+
databricksSdkClient.closeStatement(STATEMENT_ID);
179+
180+
verify(apiClient).DELETE(eq(path), eq(request), eq(Void.class), eq(headers));
181+
}
182+
183+
@Test
184+
public void testCancelStatement() throws DatabricksSQLException {
185+
String path = String.format(CANCEL_STATEMENT_PATH_WITH_ID, STATEMENT_ID);
186+
IDatabricksConnectionContext connectionContext =
187+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
188+
DatabricksSdkClient databricksSdkClient =
189+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
190+
CancelStatementRequest request = new CancelStatementRequest().setStatementId(STATEMENT_ID);
191+
databricksSdkClient.cancelStatement(STATEMENT_ID);
192+
verify(apiClient).POST(eq(path), eq(request), eq(Void.class), eq(headers));
193+
}
194+
170195
private StatementParameterListItem getParam(String type, String value, int ordinal) {
171196
return new PositionalStatementParameterListItem()
172197
.setOrdinal(ordinal)

src/test/java/com/databricks/jdbc/core/DatabricksStatementTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ public void testExecuteStatement() throws Exception {
8181

8282
when(resultSet.hasUpdateCount()).thenReturn(true);
8383
assertFalse(statement.execute(STATEMENT, Statement.NO_GENERATED_KEYS));
84+
85+
assertFalse(statement.isClosed());
86+
statement.cancel();
87+
88+
statement.close();
89+
assertThrows(DatabricksSQLException.class, () -> statement.cancel());
8490
}
8591

8692
@Test
@@ -199,7 +205,6 @@ public void testFeatureNotSupported() throws SQLException {
199205
() -> statement.isWrapperFor(java.sql.Connection.class));
200206
assertThrows(
201207
DatabricksSQLFeatureNotSupportedException.class, () -> statement.setCursorName("name"));
202-
assertThrows(DatabricksSQLFeatureNotSupportedException.class, statement::cancel);
203208
assertThrows(DatabricksSQLFeatureNotSupportedException.class, statement::getMaxFieldSize);
204209
assertThrows(DatabricksSQLFeatureNotSupportedException.class, statement::getMoreResults);
205210
assertThrows(

0 commit comments

Comments
 (0)