Skip to content

Commit c23b0a5

Browse files
committed
Merge branch 'metadata-benchmarks' into large-queries-benchmark
2 parents f94820b + fc77d1d commit c23b0a5

File tree

5 files changed

+232
-20
lines changed

5 files changed

+232
-20
lines changed

src/main/java/com/databricks/jdbc/client/impl/thrift/commons/DatabricksThriftAccessor.java

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class DatabricksThriftAccessor {
2727
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksThriftAccessor.class);
2828
private final DatabricksConfig databricksConfig;
2929
private final TCLIService.Client thriftClient;
30+
private static final TSparkGetDirectResults DEFAULT_DIRECT_RESULTS =
31+
new TSparkGetDirectResults().setMaxRows(DEFAULT_ROW_LIMIT).setMaxBytes(DEFAULT_BYTE_LIMIT);
3032

3133
public DatabricksThriftAccessor(IDatabricksConnectionContext connectionContext) {
3234
DatabricksHttpTTransport transport =
@@ -117,9 +119,7 @@ public TFetchResultsResp getResultSetResp(
117119
TFetchResultsReq request =
118120
new TFetchResultsReq()
119121
.setOperationHandle(operationHandle)
120-
.setIncludeResultSetMetadata(
121-
true) // TODO check: even though this is set, we had to make another call for
122-
// metadata.
122+
.setIncludeResultSetMetadata(true)
123123
.setFetchType((short) 0) // 0 represents Query output. 1 represents Log
124124
.setMaxRows(maxRows)
125125
.setMaxBytes(DEFAULT_BYTE_LIMIT);
@@ -139,26 +139,37 @@ public TFetchResultsResp getResultSetResp(
139139
}
140140
verifySuccessStatus(
141141
response.getStatus().getStatusCode(),
142-
String.format("Error while fetching results. TFetchResultsResp {}. "));
142+
String.format(
143+
"Error while fetching results. TFetchResultsResp {%s}. ", response.toString()));
143144
return response;
144145
}
145146

146147
private TFetchResultsResp execute(
147148
TExecuteStatementReq request, IDatabricksStatement parentStatement)
148149
throws TException, SQLException {
149-
TExecuteStatementResp tExecuteStatementResp = thriftClient.ExecuteStatement(request);
150150
int maxRows = (parentStatement == null) ? DEFAULT_ROW_LIMIT : parentStatement.getMaxRows();
151+
TSparkGetDirectResults directResults =
152+
new TSparkGetDirectResults().setMaxBytes(DEFAULT_BYTE_LIMIT).setMaxRows(maxRows);
153+
request.setGetDirectResults(directResults);
154+
TExecuteStatementResp response = thriftClient.ExecuteStatement(request);
155+
if (response.isSetDirectResults()) {
156+
return response.getDirectResults().getResultSet();
157+
}
151158
return getResultSetResp(
152-
tExecuteStatementResp.getStatus().getStatusCode(),
153-
tExecuteStatementResp.getOperationHandle(),
154-
tExecuteStatementResp.toString(),
159+
response.getStatus().getStatusCode(),
160+
response.getOperationHandle(),
161+
response.toString(),
155162
maxRows,
156163
true);
157164
}
158165

159166
private TFetchResultsResp listFunctions(TGetFunctionsReq request)
160167
throws DatabricksHttpException, TException {
161168
TGetFunctionsResp response = thriftClient.GetFunctions(request);
169+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
170+
if (response.isSetDirectResults()) {
171+
return response.getDirectResults().getResultSet();
172+
}
162173
return getResultSetResp(
163174
response.getStatus().getStatusCode(),
164175
response.getOperationHandle(),
@@ -169,10 +180,11 @@ private TFetchResultsResp listFunctions(TGetFunctionsReq request)
169180

170181
private TFetchResultsResp listPrimaryKeys(TGetPrimaryKeysReq request)
171182
throws DatabricksHttpException, TException {
172-
request.setGetDirectResults(
173-
new TSparkGetDirectResults().setMaxRows(100000).setMaxBytes(1000000));
183+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
174184
TGetPrimaryKeysResp response = thriftClient.GetPrimaryKeys(request);
175-
System.out.println("Without resultSet is : " + response.toString());
185+
if (response.isSetDirectResults()) {
186+
return response.getDirectResults().getResultSet();
187+
}
176188
return getResultSetResp(
177189
response.getStatus().getStatusCode(),
178190
response.getOperationHandle(),
@@ -183,7 +195,12 @@ private TFetchResultsResp listPrimaryKeys(TGetPrimaryKeysReq request)
183195

184196
private TFetchResultsResp getTables(TGetTablesReq request)
185197
throws TException, DatabricksHttpException {
198+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
186199
TGetTablesResp response = thriftClient.GetTables(request);
200+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
201+
if (response.isSetDirectResults()) {
202+
return response.getDirectResults().getResultSet();
203+
}
187204
return getResultSetResp(
188205
response.getStatus().getStatusCode(),
189206
response.getOperationHandle(),
@@ -194,7 +211,11 @@ private TFetchResultsResp getTables(TGetTablesReq request)
194211

195212
private TFetchResultsResp getTableTypes(TGetTableTypesReq request)
196213
throws TException, DatabricksHttpException {
214+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
197215
TGetTableTypesResp response = thriftClient.GetTableTypes(request);
216+
if (response.isSetDirectResults()) {
217+
return response.getDirectResults().getResultSet();
218+
}
198219
return getResultSetResp(
199220
response.getStatus().getStatusCode(),
200221
response.getOperationHandle(),
@@ -205,7 +226,11 @@ private TFetchResultsResp getTableTypes(TGetTableTypesReq request)
205226

206227
private TFetchResultsResp getCatalogs(TGetCatalogsReq request)
207228
throws TException, DatabricksHttpException {
229+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
208230
TGetCatalogsResp response = thriftClient.GetCatalogs(request);
231+
if (response.isSetDirectResults()) {
232+
return response.getDirectResults().getResultSet();
233+
}
209234
return getResultSetResp(
210235
response.getStatus().getStatusCode(),
211236
response.getOperationHandle(),
@@ -216,7 +241,11 @@ private TFetchResultsResp getCatalogs(TGetCatalogsReq request)
216241

217242
private TFetchResultsResp listSchemas(TGetSchemasReq request)
218243
throws TException, DatabricksHttpException {
244+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
219245
TGetSchemasResp response = thriftClient.GetSchemas(request);
246+
if (response.isSetDirectResults()) {
247+
return response.getDirectResults().getResultSet();
248+
}
220249
return getResultSetResp(
221250
response.getStatus().getStatusCode(),
222251
response.getOperationHandle(),
@@ -227,7 +256,11 @@ private TFetchResultsResp listSchemas(TGetSchemasReq request)
227256

228257
private TFetchResultsResp getTypeInfo(TGetTypeInfoReq request)
229258
throws TException, DatabricksHttpException {
259+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
230260
TGetTypeInfoResp response = thriftClient.GetTypeInfo(request);
261+
if (response.isSetDirectResults()) {
262+
return response.getDirectResults().getResultSet();
263+
}
231264
return getResultSetResp(
232265
response.getStatus().getStatusCode(),
233266
response.getOperationHandle(),
@@ -238,11 +271,15 @@ private TFetchResultsResp getTypeInfo(TGetTypeInfoReq request)
238271

239272
private TFetchResultsResp listColumns(TGetColumnsReq request)
240273
throws TException, DatabricksHttpException {
241-
TGetColumnsResp tGetColumnsResp = thriftClient.GetColumns(request);
274+
request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
275+
TGetColumnsResp response = thriftClient.GetColumns(request);
276+
if (response.isSetDirectResults()) {
277+
return response.getDirectResults().getResultSet();
278+
}
242279
return getResultSetResp(
243-
tGetColumnsResp.getStatus().getStatusCode(),
244-
tGetColumnsResp.getOperationHandle(),
245-
tGetColumnsResp.toString(),
280+
response.getStatus().getStatusCode(),
281+
response.getOperationHandle(),
282+
response.toString(),
246283
DEFAULT_ROW_LIMIT,
247284
false);
248285
}

src/main/java/com/databricks/jdbc/commons/EnvironmentVariables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ public final class EnvironmentVariables {
1212
false; // By default, we should not process the sql
1313

1414
public static final TProtocolVersion JDBC_THRIFT_VERSION =
15-
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
15+
TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V9;
1616
}

src/test/java/com/databricks/jdbc/client/impl/thrift/DatabricksThriftServiceClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void testCreateSession() throws DatabricksSQLException {
3636
.setInitialNamespace(getNamespace(CATALOG, SCHEMA))
3737
.setConfiguration(EMPTY_MAP)
3838
.setCanUseMultipleCatalogs(true)
39-
.setClient_protocol(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10);
39+
.setClient_protocol(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V9);
4040
TOpenSessionResp openSessionResp =
4141
new TOpenSessionResp()
4242
.setSessionHandle(SESSION_HANDLE)

0 commit comments

Comments
 (0)