Skip to content

Commit d27d6c7

Browse files
Run queries in async mode in thrift client (#570)
- Run execution queries in `async` mode - Refactor result retrieval in Thrift by improving response handling and fixing polling - Competing driver, SQL Gateway, and SQL Execution API also execute the queries in async mode - Minor code cleanup Running queries in async mode and using polling will be beneficial: - Freeing up server resources under high load instead of holding the connection for ~5 seconds (direct results) - If query takes more than 5 seconds, server will anyway require polling from client - Didn't notice any regressions (slightly better performance) under high concurrent load when running queries in async mode. Some empirical results below: - Running a simple select query on a table with 10-15 columns for 10K rows in a concurrent environment (250 connections executing query in parallel) - Average results when running on `main` ``` Average query time across connections: 73873.58ms Min query time: 22075ms Max query time: 88997ms ``` - Average results when running in `async` mode ``` Average query time across connections: 68382.95ms Min query time: 24718ms Max query time: 95282ms ```
1 parent 56ef04f commit d27d6c7

File tree

11 files changed

+118
-62
lines changed

11 files changed

+118
-62
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ TestFile
66
logs/
77
*.log
88
*.pem
9+
dependency-reduced-pom.xml

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowResultChunk.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3-
import static com.databricks.jdbc.common.DatabricksJdbcConstants.IS_FAKE_SERVICE_TEST_PROP;
43
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createExternalLink;
54
import static com.databricks.jdbc.common.util.ValidationUtil.checkHTTPError;
65

76
import com.databricks.jdbc.common.CompressionCodec;
87
import com.databricks.jdbc.common.util.DecompressionUtil;
8+
import com.databricks.jdbc.common.util.DriverUtil;
99
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
1010
import com.databricks.jdbc.exception.DatabricksParsingException;
1111
import com.databricks.jdbc.exception.DatabricksSQLException;
@@ -215,7 +215,7 @@ void setStatus(ChunkStatus status) {
215215
/** Checks if the link is valid */
216216
boolean isChunkLinkInvalid() {
217217
return status == ChunkStatus.PENDING
218-
|| (!Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP))
218+
|| (!DriverUtil.isRunningAgainstFake()
219219
&& expiryTime.minusSeconds(SECONDS_BUFFER_FOR_EXPIRY).isBefore(Instant.now()));
220220
}
221221

src/main/java/com/databricks/jdbc/common/util/DatabricksThriftUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
public class DatabricksThriftUtil {
2323

2424
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksThriftUtil.class);
25-
2625
public static final List<TStatusCode> SUCCESS_STATUS_LIST =
2726
List.of(TStatusCode.SUCCESS_STATUS, TStatusCode.SUCCESS_WITH_INFO_STATUS);
2827

@@ -317,15 +316,15 @@ public static void checkDirectResultsForErrorStatus(
317316
}
318317
if (directResults.isSetResultSetMetadata()) {
319318
LOGGER.debug("direct results metadata being verified for success response");
320-
verifySuccessStatus(directResults.getResultSetMetadata().status, context);
319+
verifySuccessStatus(directResults.getResultSetMetadata().getStatus(), context);
321320
}
322321
if (directResults.isSetCloseOperation()) {
323322
LOGGER.debug("direct results close operation verified for success response");
324-
verifySuccessStatus(directResults.getCloseOperation().status, context);
323+
verifySuccessStatus(directResults.getCloseOperation().getStatus(), context);
325324
}
326325
if (directResults.isSetResultSet()) {
327326
LOGGER.debug("direct result set being verified for success response");
328-
verifySuccessStatus(directResults.getResultSet().status, context);
327+
verifySuccessStatus(directResults.getResultSet().getStatus(), context);
329328
}
330329
}
331330
}

src/main/java/com/databricks/jdbc/common/util/DriverUtil.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ public class DriverUtil {
2626
private static final String DBSQL_VERSION_SQL = "SELECT current_version().dbsql_version";
2727
public static final int DBSQL_MIN_MAJOR_VERSION_FOR_SEA_SUPPORT = 2024;
2828
public static final int DBSQL_MIN_MINOR_VERSION_FOR_SEA_SUPPORT = 30;
29-
3029
private static final String[] VERSION_PARTS = VERSION.split("[.-]");
3130

3231
public static String getVersion() {
@@ -86,7 +85,7 @@ private static Throwable getRootCause(Throwable throwable) {
8685
static void ensureUpdatedDBRVersionInUse(IDatabricksConnection connection)
8786
throws DatabricksValidationException {
8887
if (connection.getConnectionContext().getClientType() != DatabricksClientType.SQL_EXEC
89-
|| Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP))) {
88+
|| isRunningAgainstFake()) {
9089
// Check applicable only for SEA flow
9190
return;
9291
}
@@ -127,4 +126,8 @@ private static boolean doesDriverSupportSEA(String dbsqlVersion) {
127126
}
128127
return majorVersion > DBSQL_MIN_MAJOR_VERSION_FOR_SEA_SUPPORT;
129128
}
129+
130+
public static boolean isRunningAgainstFake() {
131+
return Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP));
132+
}
130133
}

src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static io.netty.util.NetUtil.LOCALHOST;
66

77
import com.databricks.jdbc.api.IDatabricksConnectionContext;
8+
import com.databricks.jdbc.common.util.DriverUtil;
89
import com.databricks.jdbc.common.util.UserAgentManager;
910
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
1011
import com.databricks.jdbc.dbclient.impl.common.ConfiguratorUtils;
@@ -66,7 +67,7 @@ public class DatabricksHttpClient implements IDatabricksHttpClient, Closeable {
6667
public CloseableHttpResponse execute(HttpUriRequest request) throws DatabricksHttpException {
6768
LOGGER.debug(
6869
String.format("Executing HTTP request [{%s}]", RequestSanitizer.sanitizeRequest(request)));
69-
if (!Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP))) {
70+
if (!DriverUtil.isRunningAgainstFake()) {
7071
// TODO : allow gzip in wiremock
7172
request.setHeader("Content-Encoding", "gzip");
7273
}
@@ -118,7 +119,7 @@ private CloseableHttpClient makeClosableHttpClient(
118119
.setRetryHandler(retryHandler)
119120
.addInterceptorFirst(retryHandler);
120121
setupProxy(connectionContext, builder);
121-
if (Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP))) {
122+
if (DriverUtil.isRunningAgainstFake()) {
122123
setFakeServiceRouteInHttpClient(builder);
123124
}
124125
return builder.build();

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 81 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package com.databricks.jdbc.dbclient.impl.thrift;
22

3-
import static com.databricks.jdbc.common.DatabricksJdbcConstants.IS_FAKE_SERVICE_TEST_PROP;
43
import static com.databricks.jdbc.common.EnvironmentVariables.*;
54
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.*;
6-
import static com.databricks.jdbc.model.client.thrift.generated.TStatusCode.*;
75

86
import com.databricks.jdbc.api.IDatabricksConnectionContext;
97
import com.databricks.jdbc.api.IDatabricksSession;
108
import com.databricks.jdbc.api.impl.*;
119
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1210
import com.databricks.jdbc.common.StatementType;
11+
import com.databricks.jdbc.common.util.DriverUtil;
1312
import com.databricks.jdbc.dbclient.impl.common.ClientConfigurator;
1413
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1514
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
@@ -47,9 +46,7 @@ final class DatabricksThriftAccessor {
4746
Map<String, String> authHeaders = databricksConfig.authenticate();
4847
String endPointUrl = connectionContext.getEndpointURL();
4948

50-
final boolean isFakeServiceTest =
51-
Boolean.parseBoolean(System.getProperty(IS_FAKE_SERVICE_TEST_PROP));
52-
if (!isFakeServiceTest) {
49+
if (!DriverUtil.isRunningAgainstFake()) {
5350
// Create a new thrift client for each thread as client state is not thread safe. Note that
5451
// the underlying protocol uses the same http client which is thread safe
5552
this.thriftClient =
@@ -127,7 +124,7 @@ TFetchResultsResp getResultSetResp(TOperationHandle operationHandle, String cont
127124
throws DatabricksHttpException {
128125
refreshHeadersIfRequired();
129126
return getResultSetResp(
130-
new TStatus().setStatusCode(SUCCESS_STATUS),
127+
new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS),
131128
operationHandle,
132129
context,
133130
DEFAULT_ROW_LIMIT,
@@ -170,7 +167,7 @@ TFetchResultsResp getMoreResults(IDatabricksStatementInternal parentStatement)
170167
parentStatement.getStatementId().toSQLExecStatementId());
171168
int maxRows = (parentStatement == null) ? DEFAULT_ROW_LIMIT : parentStatement.getMaxRows();
172169
return getResultSetResp(
173-
new TStatus().setStatusCode(SUCCESS_STATUS),
170+
new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS),
174171
getOperationHandle(parentStatement.getStatementId()),
175172
context,
176173
maxRows,
@@ -213,50 +210,53 @@ private TFetchResultsResp getResultSetResp(
213210
return response;
214211
}
215212

216-
private void longPolling(TOperationHandle operationHandle)
217-
throws TException, InterruptedException, DatabricksHttpException {
218-
TGetOperationStatusReq request =
219-
new TGetOperationStatusReq()
220-
.setOperationHandle(operationHandle)
221-
.setGetProgressUpdate(false);
222-
TGetOperationStatusResp response;
223-
TStatus status;
224-
do {
225-
response = getThriftClient().GetOperationStatus(request);
226-
status = response.getStatus();
227-
if (status.getStatusCode() == TStatusCode.STILL_EXECUTING_STATUS) {
228-
Thread.sleep(DEFAULT_SLEEP_DELAY);
229-
}
230-
} while (status.getStatusCode() == TStatusCode.STILL_EXECUTING_STATUS);
231-
verifySuccessStatus(status, String.format("Request {%s}, Response {%s}", request, response));
232-
}
233-
234213
DatabricksResultSet execute(
235214
TExecuteStatementReq request,
236215
IDatabricksStatementInternal parentStatement,
237216
IDatabricksSession session,
238217
StatementType statementType)
239218
throws SQLException {
240219
refreshHeadersIfRequired();
220+
221+
// Set direct result configuration
241222
int maxRows = (parentStatement == null) ? DEFAULT_ROW_LIMIT : parentStatement.getMaxRows();
242223
if (enableDirectResults) {
243224
TSparkGetDirectResults directResults =
244225
new TSparkGetDirectResults().setMaxBytes(DEFAULT_BYTE_LIMIT).setMaxRows(maxRows);
245226
request.setGetDirectResults(directResults);
246227
}
228+
247229
TExecuteStatementResp response;
248-
TFetchResultsResp resultSet = null;
230+
TFetchResultsResp resultSet;
231+
249232
try {
250233
response = getThriftClient().ExecuteStatement(request);
251-
if (Arrays.asList(ERROR_STATUS, INVALID_HANDLE_STATUS).contains(response.status.statusCode)) {
252-
throw new DatabricksSQLException(response.status.errorMessage, response.status.sqlState);
253-
}
234+
checkResponseForErrors(response);
235+
236+
TGetOperationStatusResp statusResp = null;
254237
if (response.isSetDirectResults()) {
255238
checkDirectResultsForErrorStatus(response.getDirectResults(), response.toString());
239+
statusResp = response.getDirectResults().getOperationStatus();
240+
checkOperationStatusForErrors(statusResp);
241+
}
242+
243+
// Polling until query operation state is finished
244+
TGetOperationStatusReq statusReq =
245+
new TGetOperationStatusReq()
246+
.setOperationHandle(response.getOperationHandle())
247+
.setGetProgressUpdate(false);
248+
while (shouldContinuePolling(statusResp)) {
249+
statusResp = getThriftClient().GetOperationStatus(statusReq);
250+
checkOperationStatusForErrors(statusResp);
251+
}
252+
253+
if (hasResultDataInDirectResults(response)) {
254+
// The first response has result data
255+
// There is no polling in this case as status was already finished
256256
resultSet = response.getDirectResults().getResultSet();
257257
resultSet.setResultSetMetadata(response.getDirectResults().getResultSetMetadata());
258258
} else {
259-
longPolling(response.getOperationHandle());
259+
// Fetch the result data after polling
260260
resultSet =
261261
getResultSetResp(
262262
response.getStatus(),
@@ -265,11 +265,11 @@ DatabricksResultSet execute(
265265
maxRows,
266266
true);
267267
}
268-
} catch (TException | InterruptedException e) {
268+
} catch (TException e) {
269269
String errorMessage =
270270
String.format(
271271
"Error while receiving response from Thrift server. Request {%s}, Error {%s}",
272-
request.toString(), e.getMessage());
272+
request, e.getMessage());
273273
LOGGER.error(e, errorMessage);
274274
throw new DatabricksHttpException(errorMessage, e);
275275
}
@@ -288,12 +288,11 @@ DatabricksResultSet executeAsync(
288288
StatementType statementType)
289289
throws SQLException {
290290
refreshHeadersIfRequired();
291-
TExecuteStatementResp response = null;
292-
DatabricksHttpTTransport transport =
293-
(DatabricksHttpTTransport) getThriftClient().getInputProtocol().getTransport();
291+
TExecuteStatementResp response;
294292
try {
295293
response = getThriftClient().ExecuteStatement(request);
296-
if (Arrays.asList(ERROR_STATUS, INVALID_HANDLE_STATUS).contains(response.status.statusCode)) {
294+
if (Arrays.asList(TStatusCode.ERROR_STATUS, TStatusCode.INVALID_HANDLE_STATUS)
295+
.contains(response.status.statusCode)) {
297296
LOGGER.error(
298297
"Received error response {%s} from Thrift Server for request {%s}",
299298
response, request.toString());
@@ -335,7 +334,8 @@ DatabricksResultSet getStatementResult(
335334
try {
336335
response = getThriftClient().GetOperationStatus(request);
337336
statusCode = response.getStatus().getStatusCode();
338-
if (statusCode == SUCCESS_STATUS || statusCode == SUCCESS_WITH_INFO_STATUS) {
337+
if (statusCode == TStatusCode.SUCCESS_STATUS
338+
|| statusCode == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
339339
resultSet =
340340
getResultSetResp(response.getStatus(), operationHandle, response.toString(), -1, true);
341341
}
@@ -512,4 +512,48 @@ private TCLIService.Client createThriftClient(
512512

513513
return new TCLIService.Client(protocol);
514514
}
515+
516+
private void checkResponseForErrors(TExecuteStatementResp response) throws SQLException {
517+
if (!response.isSetOperationHandle()) {
518+
throw new DatabricksSQLException("Operation handle not set");
519+
}
520+
if (isErrorStatusCode(response.status.statusCode)) {
521+
throw new DatabricksSQLException(
522+
response.status.getErrorMessage(), response.status.getSqlState());
523+
}
524+
}
525+
526+
private void checkOperationStatusForErrors(TGetOperationStatusResp statusResp)
527+
throws SQLException {
528+
if (statusResp != null
529+
&& statusResp.isSetOperationState()
530+
&& isErrorOperationState(statusResp.getOperationState())) {
531+
throw new DatabricksSQLException("Operation state erroneous");
532+
}
533+
}
534+
535+
private boolean shouldContinuePolling(TGetOperationStatusResp statusResp) {
536+
return statusResp == null
537+
|| !statusResp.isSetOperationState()
538+
|| isPendingOperationState(statusResp.getOperationState());
539+
}
540+
541+
private boolean hasResultDataInDirectResults(TExecuteStatementResp response) {
542+
return response.isSetDirectResults()
543+
&& response.getDirectResults().isSetResultSet()
544+
&& response.getDirectResults().isSetResultSetMetadata();
545+
}
546+
547+
private boolean isErrorStatusCode(TStatusCode statusCode) {
548+
return statusCode == TStatusCode.ERROR_STATUS
549+
|| statusCode == TStatusCode.INVALID_HANDLE_STATUS;
550+
}
551+
552+
private boolean isErrorOperationState(TOperationState state) {
553+
return state == TOperationState.ERROR_STATE || state == TOperationState.CLOSED_STATE;
554+
}
555+
556+
private boolean isPendingOperationState(TOperationState state) {
557+
return state == TOperationState.RUNNING_STATE || state == TOperationState.PENDING_STATE;
558+
}
515559
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1111
import com.databricks.jdbc.common.IDatabricksComputeResource;
1212
import com.databricks.jdbc.common.StatementType;
13+
import com.databricks.jdbc.common.util.DriverUtil;
1314
import com.databricks.jdbc.dbclient.IDatabricksClient;
1415
import com.databricks.jdbc.dbclient.IDatabricksMetadataClient;
1516
import com.databricks.jdbc.dbclient.impl.common.MetadataResultSetBuilder;
@@ -140,6 +141,10 @@ public DatabricksResultSet executeStatement(
140141
.setQueryTimeout(parentStatement.getStatement().getQueryTimeout())
141142
.setCanReadArrowResult(this.connectionContext.shouldEnableArrow())
142143
.setCanDownloadResult(true);
144+
if (!DriverUtil.isRunningAgainstFake()) {
145+
// run queries in async mode if not using fake services
146+
request.setRunAsync(true);
147+
}
143148
return thriftAccessor.execute(request, parentStatement, session, statementType);
144149
}
145150

src/test/java/com/databricks/jdbc/TestConstants.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class TestConstants {
4343
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/6051921418418893/1115-130834-ms4m0yv;AuthMech=3;UserAgentEntry=TEST/24.2.0.2712019";
4444
public static final String CLUSTER_JDBC_URL =
4545
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/6051921418418893/1115-130834-ms4m0yv;AuthMech=3;UserAgentEntry=MyApp";
46-
4746
public static final TRowSet binaryRowSet =
4847
new TRowSet()
4948
.setColumns(
@@ -83,7 +82,6 @@ public class TestConstants {
8382
.setColumns(
8483
Collections.singletonList(
8584
TColumn.i64Val(new TI64Column().setValues(List.of(1L, 5L)))));
86-
8785
public static final TRowSet stringRowSet =
8886
new TRowSet()
8987
.setColumns(
@@ -163,7 +161,6 @@ public class TestConstants {
163161
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:443/default;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/5c89f447c476a5a8;UseSystemProxy=1;UseProxy=1;ProxyHost=127.0.0.1;ProxyPort=8080;ProxyAuth=1;ProxyUID=proxyUser;ProxyPwd=proxyPassword;UseCFProxy=1;CFProxyHost=127.0.1.2;CFProxyPort=8081;CFProxyAuth=2;CFProxyUID=cfProxyUser;CFProxyPwd=cfProxyPassword;";
164162
public static final String VALID_URL_POLLING =
165163
"jdbc:databricks://e2-dogfood.staging.cloud.databricks.com:4473;ssl=1;asyncexecpollinterval=500;AuthMech=3;httpPath=/sql/1.0/warehouses/5c89f447c476a5a8;QueryResultCompressionType=1";
166-
167164
public static final List<TSparkArrowBatch> ARROW_BATCH_LIST =
168165
Collections.singletonList(
169166
new TSparkArrowBatch().setRowCount(0).setBatch(new byte[] {65, 66, 67}));

0 commit comments

Comments
 (0)