Skip to content

Commit ead21e0

Browse files
authored
Adding GET extended command for UC volume operation (#384)
1 parent 9609354 commit ead21e0

File tree

3 files changed

+85
-3
lines changed

3 files changed

+85
-3
lines changed

src/main/java/com/databricks/jdbc/client/IDatabricksUCVolumeClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ boolean getObject(
8181
String catalog, String schema, String volume, String objectPath, String localPath)
8282
throws SQLException;
8383

84+
/**
85+
* getObject(): Retrieves an object as input stream from the UC Volume
86+
*
87+
* @param catalog the catalog name of the cloud storage
88+
* @param schema the schema name of the cloud storage
89+
* @param volume the UC volume name of the cloud storage
90+
* @param objectPath the path of the object (file) from the volume as the root directory
91+
* @return an instance of input stream
92+
*/
93+
InputStream getObject(String catalog, String schema, String volume, String objectPath)
94+
throws SQLException;
95+
8496
/**
8597
* putObject(): Upload data from a local path to a specified path within a UC Volume.
8698
*

src/main/java/com/databricks/jdbc/client/impl/sdk/DatabricksUCVolumeClient.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import com.databricks.jdbc.client.IDatabricksUCVolumeClient;
77
import com.databricks.jdbc.commons.LogLevel;
88
import com.databricks.jdbc.commons.util.LoggingUtil;
9-
import com.databricks.jdbc.core.DatabricksStatement;
9+
import com.databricks.jdbc.core.IDatabricksResultSet;
10+
import com.databricks.jdbc.core.IDatabricksStatement;
1011
import java.io.InputStream;
1112
import java.sql.*;
1213
import java.util.ArrayList;
@@ -41,6 +42,12 @@ private String createGetObjectQuery(
4142
"GET '/Volumes/%s/%s/%s/%s' TO '%s'", catalog, schema, volume, objectPath, localPath);
4243
}
4344

45+
private String createGetObjectQueryForInputStream(
46+
String catalog, String schema, String volume, String objectPath) {
47+
return String.format(
48+
"GET '/Volumes/%s/%s/%s/%s' TO '__input_stream__'", catalog, schema, volume, objectPath);
49+
}
50+
4451
private String createPutObjectQuery(
4552
String catalog,
4653
String schema,
@@ -293,6 +300,36 @@ public boolean getObject(
293300
return volumeOperationStatus;
294301
}
295302

303+
@Override
304+
public InputStream getObject(String catalog, String schema, String volume, String objectPath)
305+
throws SQLException {
306+
307+
LoggingUtil.log(
308+
LogLevel.DEBUG,
309+
String.format(
310+
"Entering getObject method with parameters: catalog={%s}, schema={%s}, volume={%s}, objectPath={%s}",
311+
catalog, schema, volume, objectPath));
312+
313+
String getObjectQuery = createGetObjectQueryForInputStream(catalog, schema, volume, objectPath);
314+
315+
Statement statement = connection.createStatement();
316+
IDatabricksStatement databricksStatement = (IDatabricksStatement) statement;
317+
databricksStatement.allowInputStreamForVolumeOperation(true);
318+
319+
ResultSet resultSet = statement.executeQuery(getObjectQuery);
320+
LoggingUtil.log(LogLevel.INFO, "GET query executed successfully");
321+
322+
try {
323+
if (resultSet.next()) {
324+
return ((IDatabricksResultSet) resultSet).getVolumeOperationInputStream();
325+
}
326+
return null;
327+
} catch (SQLException e) {
328+
LoggingUtil.log(LogLevel.ERROR, "GET query execution failed " + e);
329+
throw e;
330+
}
331+
}
332+
296333
public boolean putObject(
297334
String catalog,
298335
String schema,
@@ -353,11 +390,11 @@ public boolean putObject(
353390
boolean volumeOperationStatus = false;
354391

355392
try (Statement statement = connection.createStatement()) {
356-
DatabricksStatement databricksStatement = (DatabricksStatement) statement;
393+
IDatabricksStatement databricksStatement = (IDatabricksStatement) statement;
357394
databricksStatement.allowInputStreamForVolumeOperation(true);
358395
databricksStatement.setInputStreamForUCVolume(inputStream);
359396

360-
ResultSet resultSet = databricksStatement.executeQuery(putObjectQueryForInputStream);
397+
ResultSet resultSet = statement.executeQuery(putObjectQueryForInputStream);
361398
LoggingUtil.log(LogLevel.INFO, "PUT query executed successfully");
362399

363400
if (resultSet.next()) {

src/test/java/com/databricks/jdbc/core/DatabricksUCVolumeClientTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class DatabricksUCVolumeClientTest {
3030
@Mock Statement statement;
3131

3232
@Mock DatabricksStatement databricksStatement;
33+
@Mock DatabricksResultSet databricksResultSet;
3334

3435
@Mock ResultSet resultSet;
3536
@Mock ResultSet resultSet_abc_volume1;
@@ -523,6 +524,37 @@ private static Stream<Arguments> provideParametersForDeleteObject_InvalidObjectP
523524
Arguments.of("test_catalog", "test_schema", "test_volume", "invalid_objectpath"));
524525
}
525526

527+
@ParameterizedTest
528+
@MethodSource("provideParametersForGetObjectWithInputStream")
529+
public void testGetObjectWithInputStream(
530+
String catalog, String schema, String volume, String objectPath, InputStream expected)
531+
throws SQLException {
532+
DatabricksUCVolumeClient client = new DatabricksUCVolumeClient(connection);
533+
534+
when(connection.createStatement()).thenReturn(databricksStatement);
535+
536+
String getObjectQuery =
537+
String.format(
538+
"GET '/Volumes/%s/%s/%s/%s' TO '__input_stream__'",
539+
catalog, schema, volume, objectPath);
540+
when(databricksStatement.executeQuery(getObjectQuery)).thenReturn(databricksResultSet);
541+
when(databricksResultSet.next()).thenReturn(true);
542+
when(databricksResultSet.getVolumeOperationInputStream()).thenReturn(expected);
543+
544+
InputStream result = client.getObject(catalog, schema, volume, objectPath);
545+
546+
assertEquals(expected, result);
547+
verify(databricksStatement).executeQuery(getObjectQuery);
548+
verify(databricksStatement).allowInputStreamForVolumeOperation(true);
549+
}
550+
551+
private static Stream<Arguments> provideParametersForGetObjectWithInputStream() {
552+
InputStream inputStream =
553+
new ByteArrayInputStream("test data".getBytes(StandardCharsets.UTF_8));
554+
return Stream.of(
555+
Arguments.of("test_catalog", "test_schema", "test_volume", "test_objectpath", inputStream));
556+
}
557+
526558
@ParameterizedTest
527559
@MethodSource("provideParametersForPutObjectWithInputStream")
528560
public void testPutObjectWithInputStream(
@@ -551,6 +583,7 @@ public void testPutObjectWithInputStream(
551583

552584
assertEquals(expected, result);
553585
verify(databricksStatement).executeQuery(putObjectQuery);
586+
verify(databricksStatement).allowInputStreamForVolumeOperation(true);
554587
}
555588

556589
private static Stream<Arguments> provideParametersForPutObjectWithInputStream() {

0 commit comments

Comments
 (0)