Skip to content

Commit 5c08711

Browse files
authored
[ES-1006739] Add content length parameter for input stream commands (#386)
* Adding GET extended command for UC volume operation * Add changes for content length for input commands
1 parent 003c746 commit 5c08711

File tree

10 files changed

+44
-26
lines changed

10 files changed

+44
-26
lines changed

src/main/java/com/databricks/jdbc/client/IDatabricksUCVolumeClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.InputStream;
44
import java.sql.SQLException;
55
import java.util.List;
6+
import org.apache.http.entity.InputStreamEntity;
67

78
public interface IDatabricksUCVolumeClient {
89

@@ -88,9 +89,9 @@ boolean getObject(
8889
* @param schema the schema name of the cloud storage
8990
* @param volume the UC volume name of the cloud storage
9091
* @param objectPath the path of the object (file) from the volume as the root directory
91-
* @return an instance of input stream
92+
* @return an instance of input stream entity
9293
*/
93-
InputStream getObject(String catalog, String schema, String volume, String objectPath)
94+
InputStreamEntity getObject(String catalog, String schema, String volume, String objectPath)
9495
throws SQLException;
9596

9697
/**
@@ -123,6 +124,7 @@ boolean putObject(
123124
* @param objectPath the destination path where the object (file) is to be uploaded from the
124125
* volume as the root directory
125126
* @param inputStream the input stream from where the data is to be uploaded
127+
* @param contentLength the length of the input stream
126128
* @param toOverwrite a boolean indicating whether to overwrite the object if it already exists
127129
* @return a boolean value indicating status of the PUT operation
128130
*/
@@ -132,6 +134,7 @@ boolean putObject(
132134
String volume,
133135
String objectPath,
134136
InputStream inputStream,
137+
long contentLength,
135138
boolean toOverwrite)
136139
throws SQLException;
137140

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksUCVolumeClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.sql.*;
1313
import java.util.ArrayList;
1414
import java.util.List;
15+
import org.apache.http.entity.InputStreamEntity;
1516

1617
/** Implementation for DatabricksUCVolumeClient */
1718
public class DatabricksUCVolumeClient implements IDatabricksUCVolumeClient {
@@ -301,8 +302,8 @@ public boolean getObject(
301302
}
302303

303304
@Override
304-
public InputStream getObject(String catalog, String schema, String volume, String objectPath)
305-
throws SQLException {
305+
public InputStreamEntity getObject(
306+
String catalog, String schema, String volume, String objectPath) throws SQLException {
306307

307308
LoggingUtil.log(
308309
LogLevel.DEBUG,
@@ -375,6 +376,7 @@ public boolean putObject(
375376
String volume,
376377
String objectPath,
377378
InputStream inputStream,
379+
long contentLength,
378380
boolean toOverwrite)
379381
throws SQLException {
380382

@@ -392,7 +394,8 @@ public boolean putObject(
392394
try (Statement statement = connection.createStatement()) {
393395
IDatabricksStatement databricksStatement = (IDatabricksStatement) statement;
394396
databricksStatement.allowInputStreamForVolumeOperation(true);
395-
databricksStatement.setInputStreamForUCVolume(inputStream);
397+
databricksStatement.setInputStreamForUCVolume(
398+
new InputStreamEntity(inputStream, contentLength));
396399

397400
ResultSet resultSet = statement.executeQuery(putObjectQueryForInputStream);
398401
LoggingUtil.log(LogLevel.INFO, "PUT query executed successfully");

src/main/java/com/databricks/jdbc/core/DatabricksResultSet.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import org.apache.http.HttpEntity;
30+
import org.apache.http.entity.InputStreamEntity;
3031

3132
public class DatabricksResultSet implements ResultSet, IDatabricksResultSet {
3233
protected static final String AFFECTED_ROWS_COUNT = "num_affected_rows";
@@ -41,6 +42,7 @@ public class DatabricksResultSet implements ResultSet, IDatabricksResultSet {
4142
private SQLWarning warnings = null;
4243
private boolean wasNull;
4344
private VolumeInputStream volumeInputStream = null;
45+
private long volumeStreamContentLength = -1L;
4446

4547
public DatabricksResultSet(
4648
StatementStatus statementStatus,
@@ -1623,12 +1625,13 @@ public void setVolumeOperationEntityStream(HttpEntity httpEntity)
16231625
checkIfClosed();
16241626
this.volumeInputStream =
16251627
new VolumeInputStream(httpEntity, executionResult, this.parentStatement);
1628+
this.volumeStreamContentLength = httpEntity.getContentLength();
16261629
}
16271630

16281631
@Override
1629-
public InputStream getVolumeOperationInputStream() throws SQLException {
1632+
public InputStreamEntity getVolumeOperationInputStream() throws SQLException {
16301633
checkIfClosed();
1631-
return this.volumeInputStream;
1634+
return new InputStreamEntity(this.volumeInputStream, this.volumeStreamContentLength);
16321635
}
16331636

16341637
private Object getObjectInternal(int columnIndex) throws SQLException {

src/main/java/com/databricks/jdbc/core/DatabricksStatement.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
import com.databricks.jdbc.commons.LogLevel;
1111
import com.databricks.jdbc.commons.util.*;
1212
import com.google.common.annotations.VisibleForTesting;
13-
import java.io.InputStream;
1413
import java.sql.*;
1514
import java.util.HashMap;
1615
import java.util.Map;
1716
import java.util.concurrent.*;
17+
import org.apache.http.entity.InputStreamEntity;
1818

1919
public class DatabricksStatement implements IDatabricksStatement, Statement {
2020

@@ -27,7 +27,7 @@ public class DatabricksStatement implements IDatabricksStatement, Statement {
2727
private SQLWarning warnings = null;
2828
private int maxRows = DEFAULT_ROW_LIMIT;
2929
private boolean escapeProcessing = DEFAULT_ESCAPE_PROCESSING;
30-
private InputStream inputStream = null;
30+
private InputStreamEntity inputStream = null;
3131
private boolean allowInputStreamForUCVolume = false;
3232

3333
public DatabricksStatement(DatabricksConnection connection) {
@@ -657,7 +657,8 @@ public boolean isAllowedInputStreamForVolumeOperation() throws DatabricksSQLExce
657657
}
658658

659659
@Override
660-
public void setInputStreamForUCVolume(InputStream inputStream) throws DatabricksSQLException {
660+
public void setInputStreamForUCVolume(InputStreamEntity inputStream)
661+
throws DatabricksSQLException {
661662
if (isAllowedInputStreamForVolumeOperation()) {
662663
this.inputStream = inputStream;
663664
} else {
@@ -666,7 +667,7 @@ public void setInputStreamForUCVolume(InputStream inputStream) throws Databricks
666667
}
667668

668669
@Override
669-
public InputStream getInputStreamForUCVolume() throws DatabricksSQLException {
670+
public InputStreamEntity getInputStreamForUCVolume() throws DatabricksSQLException {
670671
if (isAllowedInputStreamForVolumeOperation()) {
671672
return inputStream;
672673
}

src/main/java/com/databricks/jdbc/core/IDatabricksResultSet.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import com.databricks.sdk.service.sql.StatementStatus;
44
import java.io.IOException;
5-
import java.io.InputStream;
65
import java.sql.SQLException;
76
import org.apache.http.HttpEntity;
7+
import org.apache.http.entity.InputStreamEntity;
88

99
public interface IDatabricksResultSet {
1010
String statementId();
@@ -17,5 +17,5 @@ public interface IDatabricksResultSet {
1717

1818
void setVolumeOperationEntityStream(HttpEntity httpEntity) throws SQLException, IOException;
1919

20-
InputStream getVolumeOperationInputStream() throws SQLException;
20+
InputStreamEntity getVolumeOperationInputStream() throws SQLException;
2121
}

src/main/java/com/databricks/jdbc/core/IDatabricksStatement.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.databricks.jdbc.core;
22

3-
import java.io.InputStream;
43
import java.sql.SQLException;
54
import java.sql.Statement;
5+
import org.apache.http.entity.InputStreamEntity;
66

77
/** Interface for Databricks specific statement. */
88
public interface IDatabricksStatement {
@@ -26,7 +26,7 @@ public interface IDatabricksStatement {
2626

2727
boolean isAllowedInputStreamForVolumeOperation() throws DatabricksSQLException;
2828

29-
void setInputStreamForUCVolume(InputStream inputStream) throws DatabricksSQLException;
29+
void setInputStreamForUCVolume(InputStreamEntity inputStream) throws DatabricksSQLException;
3030

31-
InputStream getInputStreamForUCVolume() throws DatabricksSQLException;
31+
InputStreamEntity getInputStreamForUCVolume() throws DatabricksSQLException;
3232
}

src/test/java/com/databricks/jdbc/core/DatabricksResultSetTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ void testGetFloat() throws SQLException {
202202
void testGetUnicode() throws SQLException, UnsupportedEncodingException {
203203
DatabricksResultSet resultSet = getResultSet(StatementState.SUCCEEDED, null);
204204
String testString = "test";
205-
InputStream expectedStream = new java.io.ByteArrayInputStream(testString.getBytes("UTF-16"));
206205
when(mockedExecutionResult.getObject(0)).thenReturn(testString);
207206
when(mockedResultSetMetadata.getColumnType(1)).thenReturn(Types.VARCHAR);
208207
assertNotNull(resultSet.getUnicodeStream(1));
@@ -716,8 +715,10 @@ void testVolumeOperationInputStream() throws Exception {
716715
DatabricksResultSet resultSet =
717716
getResultSet(StatementState.SUCCEEDED, mockedDatabricksStatement);
718717
HttpEntity mockEntity = mock(HttpEntity.class);
718+
when(mockEntity.getContentLength()).thenReturn(10L);
719719
resultSet.setVolumeOperationEntityStream(mockEntity);
720720
assertNotNull(resultSet.getVolumeOperationInputStream());
721+
assertEquals(10L, resultSet.getVolumeOperationInputStream().getContentLength());
721722
}
722723

723724
@Test

src/test/java/com/databricks/jdbc/core/DatabricksStatementTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Properties;
2121
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.TimeUnit;
23+
import org.apache.http.entity.InputStreamEntity;
2324
import org.junit.jupiter.api.Test;
2425
import org.junit.jupiter.api.extension.ExtendWith;
2526
import org.mockito.Mock;
@@ -271,18 +272,20 @@ public void testInputStreamForVolumeOperation() throws Exception {
271272
assertFalse(statement.isAllowedInputStreamForVolumeOperation());
272273
assertNull(statement.getInputStreamForUCVolume());
273274
assertThrows(
274-
DatabricksSQLException.class, () -> statement.setInputStreamForUCVolume(mockStream));
275+
DatabricksSQLException.class,
276+
() -> statement.setInputStreamForUCVolume(new InputStreamEntity(mockStream, -1L)));
275277

276278
statement.allowInputStreamForVolumeOperation(true);
277-
statement.setInputStreamForUCVolume(mockStream);
279+
statement.setInputStreamForUCVolume(new InputStreamEntity(mockStream));
278280

279281
assertTrue(statement.isAllowedInputStreamForVolumeOperation());
280282
assertNotNull(statement.getInputStreamForUCVolume());
281283

282284
statement.close();
283285
assertThrows(DatabricksSQLException.class, statement::getInputStreamForUCVolume);
284286
assertThrows(
285-
DatabricksSQLException.class, () -> statement.setInputStreamForUCVolume(mockStream));
287+
DatabricksSQLException.class,
288+
() -> statement.setInputStreamForUCVolume(new InputStreamEntity(mockStream, -1L)));
286289
assertThrows(DatabricksSQLException.class, statement::isAllowedInputStreamForVolumeOperation);
287290
assertThrows(
288291
DatabricksSQLException.class, () -> statement.allowInputStreamForVolumeOperation(false));

src/test/java/com/databricks/jdbc/core/DatabricksUCVolumeClientTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Collections;
1616
import java.util.List;
1717
import java.util.stream.Stream;
18+
import org.apache.http.entity.InputStreamEntity;
1819
import org.junit.jupiter.api.Test;
1920
import org.junit.jupiter.api.extension.ExtendWith;
2021
import org.junit.jupiter.params.ParameterizedTest;
@@ -527,7 +528,7 @@ private static Stream<Arguments> provideParametersForDeleteObject_InvalidObjectP
527528
@ParameterizedTest
528529
@MethodSource("provideParametersForGetObjectWithInputStream")
529530
public void testGetObjectWithInputStream(
530-
String catalog, String schema, String volume, String objectPath, InputStream expected)
531+
String catalog, String schema, String volume, String objectPath, InputStreamEntity expected)
531532
throws SQLException {
532533
DatabricksUCVolumeClient client = new DatabricksUCVolumeClient(connection);
533534

@@ -541,16 +542,17 @@ public void testGetObjectWithInputStream(
541542
when(databricksResultSet.next()).thenReturn(true);
542543
when(databricksResultSet.getVolumeOperationInputStream()).thenReturn(expected);
543544

544-
InputStream result = client.getObject(catalog, schema, volume, objectPath);
545+
InputStreamEntity result = client.getObject(catalog, schema, volume, objectPath);
545546

546547
assertEquals(expected, result);
547548
verify(databricksStatement).executeQuery(getObjectQuery);
548549
verify(databricksStatement).allowInputStreamForVolumeOperation(true);
549550
}
550551

551552
private static Stream<Arguments> provideParametersForGetObjectWithInputStream() {
552-
InputStream inputStream =
553-
new ByteArrayInputStream("test data".getBytes(StandardCharsets.UTF_8));
553+
InputStreamEntity inputStream =
554+
new InputStreamEntity(
555+
new ByteArrayInputStream("test data".getBytes(StandardCharsets.UTF_8)), 10L);
554556
return Stream.of(
555557
Arguments.of("test_catalog", "test_schema", "test_volume", "test_objectpath", inputStream));
556558
}
@@ -563,6 +565,7 @@ public void testPutObjectWithInputStream(
563565
String volume,
564566
String objectPath,
565567
InputStream inputStream,
568+
long length,
566569
boolean toOverwrite,
567570
boolean expected)
568571
throws SQLException {
@@ -579,7 +582,7 @@ public void testPutObjectWithInputStream(
579582
.thenReturn(VOLUME_OPERATION_STATUS_SUCCEEDED);
580583

581584
boolean result =
582-
client.putObject(catalog, schema, volume, objectPath, inputStream, toOverwrite);
585+
client.putObject(catalog, schema, volume, objectPath, inputStream, length, toOverwrite);
583586

584587
assertEquals(expected, result);
585588
verify(databricksStatement).executeQuery(putObjectQuery);
@@ -596,6 +599,7 @@ private static Stream<Arguments> provideParametersForPutObjectWithInputStream()
596599
"test_volume",
597600
"test_objectpath",
598601
inputStream,
602+
10L,
599603
false,
600604
true));
601605
}

src/test/java/com/databricks/jdbc/integration/e2e/UCVolumeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ public void testPutObjectWithInputStream() throws Exception {
527527

528528
boolean result =
529529
client.putObject(
530-
UC_VOLUME_CATALOG, UC_VOLUME_SCHEMA, volume, objectPath, inputStream, toOverwrite);
530+
UC_VOLUME_CATALOG, UC_VOLUME_SCHEMA, volume, objectPath, inputStream, 10L, toOverwrite);
531531
assertEquals(expected, result);
532532
}
533533
}

0 commit comments

Comments
 (0)