Skip to content

Commit 6854d5b

Browse files
authored
[ES-1422040] [ES-1421766] Add fixes for unimplemented + preparedStatementType (#797)
* Add fixes for unimplemented + preparedStatementType * add comments * fix integration * fix integ test * temp * fix integ test * Add tests * address comment
1 parent 3f281ed commit 6854d5b

File tree

8 files changed

+113
-18
lines changed

8 files changed

+113
-18
lines changed

src/main/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatement.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.getDatabricksTypeFromSQLType;
55
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.inferDatabricksType;
66
import static com.databricks.jdbc.common.util.SQLInterpolator.interpolateSQL;
7+
import static com.databricks.jdbc.common.util.ValidationUtil.throwErrorIfNull;
78

89
import com.databricks.jdbc.common.StatementType;
910
import com.databricks.jdbc.common.util.DatabricksTypeUtil;
@@ -250,6 +251,25 @@ public void clearParameters() throws SQLException {
250251
this.databricksParameterMetaData.getParameterBindings().clear();
251252
}
252253

254+
@Override
255+
public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
256+
throwErrorIfNull("Prepared statement SQL setObject targetSqlType", targetSqlType);
257+
this.setObject(parameterIndex, x, targetSqlType.getVendorTypeNumber());
258+
}
259+
260+
@Override
261+
public void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength)
262+
throws SQLException {
263+
throwErrorIfNull("Prepared statement SQL setObject targetSqlType", targetSqlType);
264+
this.setObject(parameterIndex, x, targetSqlType.getVendorTypeNumber(), scaleOrLength);
265+
}
266+
267+
@Override
268+
public long executeLargeUpdate() throws SQLException {
269+
throw new DatabricksSQLFeatureNotImplementedException(
270+
"executeLargeUpdate in preparedStatement is not implemented in OSS JDBC");
271+
}
272+
253273
@Override
254274
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
255275
LOGGER.debug("public void setObject(int parameterIndex, Object x, int targetSqlType)");

src/main/java/com/databricks/jdbc/common/util/ValidationUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ public static void throwErrorIfNull(Map<String, String> fields, String context)
3838
}
3939
}
4040

41+
public static void throwErrorIfNull(String field, Object value) throws DatabricksSQLException {
42+
if (value != null) {
43+
return;
44+
}
45+
String errorMessage = String.format("Unsupported null Input for field {%s}", field);
46+
LOGGER.error(errorMessage);
47+
throw new DatabricksValidationException(errorMessage);
48+
}
49+
4150
public static void checkHTTPError(HttpResponse response) throws DatabricksHttpException {
4251
int statusCode = response.getStatusLine().getStatusCode();
4352
String statusLine = response.getStatusLine().toString();

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ public DatabricksResultSet executeStatement(
161161
throws SQLException {
162162
LOGGER.debug(
163163
String.format(
164-
"public DatabricksResultSet executeStatement(String sql = {%s}, compute resource = {%s}, Map<Integer, ImmutableSqlParameter> parameters, StatementType statementType = {%s}, IDatabricksSession session)",
165-
sql, computeResource.toString(), statementType));
164+
"public DatabricksResultSet executeStatement(String sql = {%s}, compute resource = {%s}, Map<Integer, ImmutableSqlParameter> parameters = {%s}, StatementType statementType = {%s}, IDatabricksSession session)",
165+
sql, computeResource.toString(), parameters, statementType));
166166
long pollCount = 0;
167167
long executionStartTime = Instant.now().toEpochMilli();
168168
DatabricksThreadContextHolder.setStatementType(statementType);

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ DatabricksResultSet execute(
208208
new TSparkGetDirectResults().setMaxBytes(DEFAULT_BYTE_LIMIT).setMaxRows(maxRowsPerBlock);
209209
request.setGetDirectResults(directResults);
210210
}
211-
212211
TExecuteStatementResp response;
213212
TFetchResultsResp resultSet;
214213
int timeoutInSeconds =
@@ -567,13 +566,12 @@ TFetchResultsResp fetchMetadataResults(TResp response, String contextDescription
567566
private <T extends TBase<T, F>, F extends TFieldIdEnum> void checkResponseForErrors(
568567
TBase<T, F> response) throws DatabricksSQLException {
569568
F operationHandleField = response.fieldForId(operationHandleFieldId);
570-
if (!response.isSet(operationHandleField)) {
571-
throw new DatabricksSQLException(
572-
"Operation handle not set", DatabricksDriverErrorCode.INVALID_STATE);
573-
}
574569
F statusField = response.fieldForId(statusFieldId);
575570
TStatus status = (TStatus) response.getFieldValue(statusField);
576-
if (isErrorStatusCode(status.getStatusCode())) {
571+
572+
if (!response.isSet(operationHandleField) || isErrorStatusCode(status)) {
573+
// if the operationHandle has not been set, it is an error from the server.
574+
LOGGER.error("Error thrift response {%s}", response);
577575
throw new DatabricksSQLException(status.getErrorMessage(), status.getSqlState());
578576
}
579577
}
@@ -607,7 +605,12 @@ private <T extends TBase<T, F>, F extends TFieldIdEnum> boolean hasResultDataInD
607605
return directResults.isSetResultSet() && directResults.isSetResultSetMetadata();
608606
}
609607

610-
private boolean isErrorStatusCode(TStatusCode statusCode) {
608+
private boolean isErrorStatusCode(TStatus status) {
609+
if (status == null || !status.isSetStatusCode()) {
610+
LOGGER.error("Status code is not set, marking the response as failed");
611+
return true;
612+
}
613+
TStatusCode statusCode = status.getStatusCode();
611614
return statusCode == TStatusCode.ERROR_STATUS
612615
|| statusCode == TStatusCode.INVALID_HANDLE_STATUS;
613616
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,20 @@ public DatabricksResultSet executeStatementAsync(
167167
return thriftAccessor.executeAsync(request, parentStatement, session, StatementType.SQL);
168168
}
169169

170-
private TSparkParameter mapToSparkParameterListItem(ImmutableSqlParameter parameter) {
170+
@VisibleForTesting
171+
TSparkParameter mapToSparkParameterListItem(ImmutableSqlParameter parameter) {
171172
Object value = parameter.value();
172173
String typeString = parameter.type().name();
173174
if (typeString.equals(DECIMAL)) {
174175
// Add precision and scale info to type
175176
if (value instanceof BigDecimal) {
176-
typeString +=
177-
"(" + ((BigDecimal) value).precision() + "," + ((BigDecimal) value).scale() + ")";
177+
int precision = ((BigDecimal) value).precision();
178+
int scale = ((BigDecimal) value).scale();
179+
if (precision < scale) {
180+
// In type(p,q) -> p should not be less than q
181+
precision = scale;
182+
}
183+
typeString += "(" + precision + "," + scale + ")";
178184
}
179185
}
180186
return new TSparkParameter()

src/test/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatementTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

33
import static com.databricks.jdbc.TestConstants.*;
4+
import static java.sql.JDBCType.DECIMAL;
45
import static org.junit.jupiter.api.Assertions.*;
56
import static org.mockito.ArgumentMatchers.any;
67
import static org.mockito.ArgumentMatchers.eq;
@@ -319,7 +320,8 @@ public void testSetObject() throws SQLException {
319320

320321
DatabricksPreparedStatement preparedStatement =
321322
new DatabricksPreparedStatement(connection, STATEMENT);
322-
323+
assertDoesNotThrow(() -> preparedStatement.setObject(1, 1, DECIMAL));
324+
assertDoesNotThrow(() -> preparedStatement.setObject(1, 1, Types.INTEGER, 1));
323325
assertDoesNotThrow(() -> preparedStatement.setObject(1, 1, Types.INTEGER));
324326
assertEquals(Types.INTEGER, preparedStatement.getParameterMetaData().getParameterType(1));
325327
assertThrows(
@@ -531,10 +533,6 @@ void testUnsupportedMethods() throws DatabricksSQLException {
531533
DatabricksSQLFeatureNotSupportedException.class, () -> preparedStatement.setTime(1, null));
532534
assertThrows(
533535
UnsupportedOperationException.class, () -> preparedStatement.setTime(1, null, null));
534-
assertThrows(
535-
SQLFeatureNotSupportedException.class, () -> preparedStatement.setObject(1, null, null));
536-
assertThrows(
537-
SQLFeatureNotSupportedException.class, () -> preparedStatement.setObject(1, null, null, 1));
538536
assertThrows(
539537
DatabricksSQLException.class, () -> preparedStatement.executeUpdate("SELECT * from table"));
540538
assertThrows(
@@ -558,6 +556,7 @@ void testUnsupportedMethods() throws DatabricksSQLException {
558556
() ->
559557
preparedStatement.executeUpdate(
560558
"UPDATE table SET column = 1", new String[] {"column"}));
559+
assertThrows(DatabricksSQLException.class, preparedStatement::executeLargeUpdate);
561560
assertThrows(
562561
DatabricksSQLException.class,
563562
() -> preparedStatement.execute("SELECT * FROM table", new int[] {1}));

src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static com.databricks.jdbc.common.MetadataResultConstants.*;
88
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.getNamespace;
99
import static com.databricks.jdbc.dbclient.impl.common.CommandConstants.GET_TABLE_TYPE_STATEMENT_ID;
10+
import static com.databricks.sdk.service.sql.ColumnInfoTypeName.*;
1011
import static org.junit.jupiter.api.Assertions.*;
1112
import static org.mockito.ArgumentMatchers.any;
1213
import static org.mockito.ArgumentMatchers.eq;
@@ -25,6 +26,7 @@
2526
import com.databricks.jdbc.model.core.ResultColumn;
2627
import com.databricks.sdk.core.DatabricksConfig;
2728
import com.databricks.sdk.service.sql.StatementState;
29+
import java.math.BigDecimal;
2830
import java.sql.SQLException;
2931
import java.util.*;
3032
import java.util.stream.Stream;
@@ -796,4 +798,60 @@ void testResetAccessToken() {
796798
client.resetAccessToken(NEW_ACCESS_TOKEN);
797799
verify(mockDatabricksHttpTTransport).resetAccessToken(NEW_ACCESS_TOKEN);
798800
}
801+
802+
@Test
803+
public void testDecimalTypeWithValidPrecisionAndScale() {
804+
BigDecimal decimalValue = new BigDecimal("123.45"); // precision: 5, scale: 2
805+
DatabricksThriftServiceClient client =
806+
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
807+
ImmutableSqlParameter parameter =
808+
ImmutableSqlParameter.builder().cardinal(0).type(DECIMAL).value(decimalValue).build();
809+
810+
TSparkParameter result = client.mapToSparkParameterListItem(parameter);
811+
812+
assertEquals("DECIMAL(5,2)", result.getType());
813+
assertEquals("123.45", result.getValue().getStringValue());
814+
assertEquals(0, result.getOrdinal());
815+
}
816+
817+
@Test
818+
public void testDecimalTypeWithScaleGreaterThanPrecision() {
819+
BigDecimal decimalValue = new BigDecimal("0.000123"); // scale: 6, precision: 3
820+
DatabricksThriftServiceClient client =
821+
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
822+
ImmutableSqlParameter parameter =
823+
ImmutableSqlParameter.builder().cardinal(1).type(DECIMAL).value(decimalValue).build();
824+
825+
TSparkParameter result = client.mapToSparkParameterListItem(parameter);
826+
827+
assertEquals("DECIMAL(6,6)", result.getType());
828+
assertEquals("0.000123", result.getValue().getStringValue());
829+
assertEquals(1, result.getOrdinal());
830+
}
831+
832+
@Test
833+
public void testNonDecimalType() {
834+
ImmutableSqlParameter parameter =
835+
ImmutableSqlParameter.builder().cardinal(2).type(STRING).value(TEST_STRING).build();
836+
DatabricksThriftServiceClient client =
837+
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
838+
TSparkParameter result = client.mapToSparkParameterListItem(parameter);
839+
840+
assertEquals("STRING", result.getType());
841+
assertEquals(TEST_STRING, result.getValue().getStringValue());
842+
assertEquals(2, result.getOrdinal());
843+
}
844+
845+
@Test
846+
public void testNullValue() {
847+
ImmutableSqlParameter parameter =
848+
ImmutableSqlParameter.builder().cardinal(3).type(INT).value(null).build();
849+
DatabricksThriftServiceClient client =
850+
new DatabricksThriftServiceClient(thriftAccessor, connectionContext);
851+
TSparkParameter result = client.mapToSparkParameterListItem(parameter);
852+
853+
assertEquals("INT", result.getType());
854+
assertNull(result.getValue());
855+
assertEquals(3, result.getOrdinal());
856+
}
799857
}

src/test/java/com/databricks/jdbc/integration/fakeservice/tests/ErrorHandlingIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void testQuerySyntaxError() {
8787
// Operation handle is not provided
8888
// @see
8989
// com.databricks.jdbc.dbclient.impl.thrift.DatabricksThriftAccessor#checkResponseForErrors(TBase)
90-
assertTrue(e.getMessage().contains("Operation handle not set"));
90+
assertTrue(e.getMessage().contains("Error running query"));
9191
} else {
9292
assertTrue(e.getMessage().contains("Syntax error"));
9393
}

0 commit comments

Comments
 (0)