Skip to content

Commit d3c767d

Browse files
authored
[PECOBLR-301] Handle server returned Thrift version as part of open session response gracefully (#779)
* init changes * Refactor TSparkArrowTypes initialization based on protocol version * added comments * added several unit tests * added more unit tests * ran ProtocolFeatureUtilTest for all protocol versions * store protocol version as TProtocolVersion enum. * move db compute check to protocol feature util
1 parent 5b6437e commit d3c767d

File tree

6 files changed

+734
-52
lines changed

6 files changed

+734
-52
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package com.databricks.jdbc.common.util;
2+
3+
import com.databricks.jdbc.model.client.thrift.generated.TProtocolVersion;
4+
5+
/**
6+
* Utility class for checking Spark protocol version features. Provides methods to determine if
7+
* specific protocol features are supported.
8+
*/
9+
public final class ProtocolFeatureUtil {
10+
// Prevent instantiation
11+
private ProtocolFeatureUtil() {}
12+
13+
/**
14+
* Checks if the given protocol version supports getting additional information in OpenSession.
15+
*
16+
* @param protocolVersion The protocol version to check
17+
* @return true if getInfos in OpenSession is supported, false otherwise
18+
*/
19+
public static boolean supportsGetInfosInOpenSession(TProtocolVersion protocolVersion) {
20+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V1) >= 0;
21+
}
22+
23+
/**
24+
* Checks if the given protocol version supports direct results.
25+
*
26+
* @param protocolVersion The protocol version to check
27+
* @return true if direct results are supported, false otherwise
28+
*/
29+
public static boolean supportsDirectResults(TProtocolVersion protocolVersion) {
30+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V1) >= 0;
31+
}
32+
33+
/**
34+
* Checks if the given protocol version supports modified hasMoreRows semantics.
35+
*
36+
* @param protocolVersion The protocol version to check
37+
* @return true if modified hasMoreRows semantics are supported, false otherwise
38+
*/
39+
public static boolean supportsModifiedHasMoreRowsSemantics(TProtocolVersion protocolVersion) {
40+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V1) >= 0;
41+
}
42+
43+
/**
44+
* Checks if the given protocol version supports cloud result fetching.
45+
*
46+
* @param protocolVersion The protocol version to check
47+
* @return true if cloud fetch is supported, false otherwise
48+
*/
49+
public static boolean supportsCloudFetch(TProtocolVersion protocolVersion) {
50+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V3) >= 0;
51+
}
52+
53+
/**
54+
* Checks if the given protocol version supports multiple catalogs in metadata operations.
55+
*
56+
* @param protocolVersion The protocol version to check
57+
* @return true if multiple catalogs are supported, false otherwise
58+
*/
59+
public static boolean supportsMultipleCatalogs(TProtocolVersion protocolVersion) {
60+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V4) >= 0;
61+
}
62+
63+
/**
64+
* Checks if the given protocol version supports Arrow metadata in result sets.
65+
*
66+
* @param protocolVersion The protocol version to check
67+
* @return true if Arrow metadata is supported, false otherwise
68+
*/
69+
public static boolean supportsArrowMetadata(TProtocolVersion protocolVersion) {
70+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5) >= 0;
71+
}
72+
73+
/**
74+
* Checks if the given protocol version supports getting result set metadata from fetch results.
75+
*
76+
* @param protocolVersion The protocol version to check
77+
* @return true if getting result set metadata from fetch is supported, false otherwise
78+
*/
79+
public static boolean supportsResultSetMetadataFromFetch(TProtocolVersion protocolVersion) {
80+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5) >= 0;
81+
}
82+
83+
/**
84+
* Checks if the given protocol version supports advanced Arrow types.
85+
*
86+
* @param protocolVersion The protocol version to check
87+
* @return true if advanced Arrow types are supported, false otherwise
88+
*/
89+
public static boolean supportsAdvancedArrowTypes(TProtocolVersion protocolVersion) {
90+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5) >= 0;
91+
}
92+
93+
/**
94+
* Checks if the given protocol version supports compressed Arrow batches.
95+
*
96+
* @param protocolVersion The protocol version to check
97+
* @return true if compressed Arrow batches are supported, false otherwise
98+
*/
99+
public static boolean supportsCompressedArrowBatches(TProtocolVersion protocolVersion) {
100+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6) >= 0;
101+
}
102+
103+
/**
104+
* Checks if the given protocol version supports async metadata execution.
105+
*
106+
* @param protocolVersion The protocol version to check
107+
* @return true if async metadata execution is supported, false otherwise
108+
*/
109+
public static boolean supportsAsyncMetadataExecution(TProtocolVersion protocolVersion) {
110+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6) >= 0;
111+
}
112+
113+
/**
114+
* Checks if the given protocol version supports result persistence mode.
115+
*
116+
* @param protocolVersion The protocol version to check
117+
* @return true if result persistence mode is supported, false otherwise
118+
*/
119+
public static boolean supportsResultPersistenceMode(TProtocolVersion protocolVersion) {
120+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7) >= 0;
121+
}
122+
123+
/**
124+
* Checks if the given protocol version supports parameterized queries.
125+
*
126+
* @param protocolVersion The protocol version to check
127+
* @return true if parameterized queries are supported, false otherwise
128+
*/
129+
public static boolean supportsParameterizedQueries(TProtocolVersion protocolVersion) {
130+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8) >= 0;
131+
}
132+
133+
/**
134+
* Checks if the given protocol version supports async metadata operations.
135+
*
136+
* @param protocolVersion The protocol version to check
137+
* @return true if async metadata operations are supported, false otherwise
138+
*/
139+
public static boolean supportsAsyncMetadataOperations(TProtocolVersion protocolVersion) {
140+
return protocolVersion.compareTo(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V9) >= 0;
141+
}
142+
143+
/**
144+
* Checks if the given protocol version indicates a non-Databricks compute.
145+
*
146+
* @param protocolVersion The protocol version to check
147+
* @return true if this is a non-Databricks compute, false otherwise
148+
*/
149+
public static boolean isNonDatabricksCompute(TProtocolVersion protocolVersion) {
150+
return protocolVersion.compareTo(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) <= 0;
151+
}
152+
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1010
import com.databricks.jdbc.common.StatementType;
1111
import com.databricks.jdbc.common.util.DriverUtil;
12+
import com.databricks.jdbc.common.util.ProtocolFeatureUtil;
1213
import com.databricks.jdbc.dbclient.impl.common.ClientConfigurator;
1314
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1415
import com.databricks.jdbc.dbclient.impl.common.TimeoutHandler;
@@ -53,6 +54,7 @@ final class DatabricksThriftAccessor {
5354
private final boolean enableDirectResults;
5455
private final int asyncPollIntervalMillis;
5556
private final int maxRowsPerBlock;
57+
private TProtocolVersion serverProtocolVersion = JDBC_THRIFT_VERSION;
5658

5759
DatabricksThriftAccessor(IDatabricksConnectionContext connectionContext)
5860
throws DatabricksParsingException {
@@ -373,7 +375,7 @@ DatabricksConfig getDatabricksConfig() {
373375
return databricksConfig;
374376
}
375377

376-
private TFetchResultsResp getResultSetResp(
378+
TFetchResultsResp getResultSetResp(
377379
TStatus responseStatus,
378380
TOperationHandle operationHandle,
379381
String context,
@@ -388,8 +390,9 @@ private TFetchResultsResp getResultSetResp(
388390
.setMaxRows(
389391
maxRowsPerBlock) // Max number of rows that should be returned in the rowset.
390392
.setMaxBytes(DEFAULT_BYTE_LIMIT);
391-
if (fetchMetadata) {
392-
request.setIncludeResultSetMetadata(true);
393+
if (fetchMetadata
394+
&& ProtocolFeatureUtil.supportsResultSetMetadataFromFetch(serverProtocolVersion)) {
395+
request.setIncludeResultSetMetadata(true); // fetch metadata if supported
393396
}
394397
TFetchResultsResp response;
395398
try {
@@ -617,6 +620,10 @@ private boolean isPendingOperationState(TOperationState state) {
617620
return state == TOperationState.RUNNING_STATE || state == TOperationState.PENDING_STATE;
618621
}
619622

623+
void setServerProtocolVersion(TProtocolVersion protocolVersion) {
624+
serverProtocolVersion = protocolVersion;
625+
}
626+
620627
private TimeoutHandler getTimeoutHandler(TExecuteStatementResp response, int timeoutInSeconds) {
621628
final TOperationHandle operationHandle = response.getOperationHandle();
622629

0 commit comments

Comments
 (0)