Skip to content

Commit f63269e

Browse files
authored
Added the GET and DELETE inside the DBFS client (#587)
* Added the GET and DELETE inside the DBFS client * Added java doc and refractored workspace client initialisation * Added unit tests * Improved the unit test coverage to 85% * Fixed the DBFS client to use ConnectionContext for initializing * Refractored the code for Volume Factory * Added java docs
1 parent a93c4b7 commit f63269e

21 files changed

+813
-165
lines changed

src/main/java/com/databricks/client/jdbc/Driver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public Connection connect(String url, Properties info) throws DatabricksSQLExcep
4848
DriverUtil.setUpLogging(connectionContext);
4949
UserAgentManager.setUserAgent(connectionContext);
5050
DeviceInfoLogUtil.logProperties();
51+
5152
DatabricksConnection connection = new DatabricksConnection(connectionContext);
53+
5254
boolean isConnectionOpen = false;
5355
try {
5456
connection.open();

src/main/java/com/databricks/jdbc/api/IDatabricksConnection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ public interface IDatabricksConnection extends Connection {
2121
/** Returns the corresponding sql connection object */
2222
Connection getConnection();
2323

24-
/**
25-
* Returns the respective volume client instance. Currently supports - UCVolumeClient and
26-
* DBFSVolumeClient
27-
*/
28-
IDatabricksVolumeClient getVolumeClient();
29-
3024
/** Opens the connection and initiates the underlying session */
3125
void open() throws DatabricksSQLException;
3226

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import com.databricks.jdbc.api.IDatabricksConnectionContext;
66
import com.databricks.jdbc.api.IDatabricksSession;
77
import com.databricks.jdbc.api.IDatabricksStatement;
8-
import com.databricks.jdbc.api.impl.volume.DBFSVolumeClient;
9-
import com.databricks.jdbc.api.impl.volume.DatabricksUCVolumeClient;
108
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
119
import com.databricks.jdbc.common.DatabricksJdbcConstants;
1210
import com.databricks.jdbc.common.util.UserAgentManager;
@@ -519,22 +517,6 @@ public Connection getConnection() {
519517
return this;
520518
}
521519

522-
@Override
523-
public IDatabricksVolumeClient getVolumeClient() {
524-
if (volumeClient == null) {
525-
synchronized (this) {
526-
if (volumeClient == null) {
527-
if (this.session.getConnectionContext().useFileSystemAPI()) {
528-
volumeClient = new DBFSVolumeClient(this);
529-
} else {
530-
volumeClient = new DatabricksUCVolumeClient(this);
531-
}
532-
}
533-
}
534-
}
535-
return volumeClient;
536-
}
537-
538520
@Override
539521
public IDatabricksConnectionContext getConnectionContext() {
540522
return connectionContext;

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,28 @@ public static IDatabricksConnectionContext create(String url, Properties propert
1919
throws DatabricksSQLException {
2020
return DatabricksConnectionContext.parse(url, properties);
2121
}
22+
23+
/**
24+
* Creates an instance of {@link IDatabricksConnectionContext} from the given URL, user and
25+
* password
26+
*
27+
* @param url JDBC URL
28+
* @param user JDBC connection properties
29+
* @param password JDBC connection properties
30+
* @return an instance of {@link IDatabricksConnectionContext}
31+
* @throws DatabricksSQLException if the URL or properties are invalid
32+
*/
33+
public static IDatabricksConnectionContext create(String url, String user, String password)
34+
throws DatabricksSQLException {
35+
java.util.Properties info = new java.util.Properties();
36+
37+
if (user != null) {
38+
info.put("user", user);
39+
}
40+
if (password != null) {
41+
info.put("password", password);
42+
}
43+
44+
return create(url, info);
45+
}
2246
}

src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java

Lines changed: 121 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,38 @@
22

33
import static com.databricks.jdbc.api.impl.volume.DatabricksUCVolumeClient.getObjectFullPath;
44
import static com.databricks.jdbc.common.DatabricksJdbcConstants.JSON_HTTP_HEADERS;
5-
import static com.databricks.jdbc.dbclient.impl.sqlexec.PathConstants.CREATE_UPLOAD_URL_PATH;
5+
import static com.databricks.jdbc.dbclient.impl.sqlexec.PathConstants.*;
66

77
import com.databricks.jdbc.api.IDatabricksConnectionContext;
88
import com.databricks.jdbc.api.IDatabricksVolumeClient;
9-
import com.databricks.jdbc.api.impl.DatabricksConnection;
10-
import com.databricks.jdbc.dbclient.IDatabricksClient;
119
import com.databricks.jdbc.dbclient.impl.common.ClientConfigurator;
1210
import com.databricks.jdbc.exception.DatabricksVolumeOperationException;
1311
import com.databricks.jdbc.log.JdbcLogger;
1412
import com.databricks.jdbc.log.JdbcLoggerFactory;
15-
import com.databricks.jdbc.model.client.filesystem.CreateUploadUrlRequest;
16-
import com.databricks.jdbc.model.client.filesystem.CreateUploadUrlResponse;
13+
import com.databricks.jdbc.model.client.filesystem.*;
1714
import com.databricks.sdk.WorkspaceClient;
1815
import com.databricks.sdk.core.DatabricksException;
16+
import com.google.common.annotations.VisibleForTesting;
1917
import java.io.InputStream;
20-
import java.sql.Connection;
2118
import java.util.List;
2219
import org.apache.http.entity.InputStreamEntity;
2320

2421
/** Implementation of Volume Client that directly calls SQL Exec API for the Volume Operations */
2522
public class DBFSVolumeClient implements IDatabricksVolumeClient {
2623

2724
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DBFSVolumeClient.class);
28-
private final DatabricksConnection connection;
25+
private final IDatabricksConnectionContext connectionContext;
26+
@VisibleForTesting final WorkspaceClient workspaceClient;
2927

30-
public DBFSVolumeClient(Connection connection) {
31-
this.connection = (DatabricksConnection) connection;
28+
@VisibleForTesting
29+
public DBFSVolumeClient(WorkspaceClient workspaceClient) {
30+
this.connectionContext = null;
31+
this.workspaceClient = workspaceClient;
32+
}
33+
34+
public DBFSVolumeClient(IDatabricksConnectionContext connectionContext) {
35+
this.connectionContext = connectionContext;
36+
this.workspaceClient = getWorkspaceClientFromConnectionContext(connectionContext);
3237
}
3338

3439
/** {@inheritDoc} */
@@ -75,10 +80,30 @@ public List<String> listObjects(
7580
@Override
7681
public boolean getObject(
7782
String catalog, String schema, String volume, String objectPath, String localPath)
78-
throws UnsupportedOperationException {
79-
String errorMessage = "getObject returning boolean function is unsupported in DBFSVolumeClient";
80-
LOGGER.error(errorMessage);
81-
throw new UnsupportedOperationException(errorMessage);
83+
throws DatabricksVolumeOperationException {
84+
LOGGER.debug(
85+
String.format(
86+
"Entering getObject method with parameters: catalog={%s}, schema={%s}, volume={%s}, objectPath={%s}, localPath={%s}",
87+
catalog, schema, volume, objectPath, localPath));
88+
89+
boolean isOperationSucceeded = false;
90+
91+
try {
92+
// Fetching the Pre signed URL
93+
CreateDownloadUrlResponse response =
94+
getCreateDownloadUrlResponse(getObjectFullPath(catalog, schema, volume, objectPath));
95+
96+
// Downloading the object from the pre signed Url
97+
VolumeOperationProcessorDirect volumeOperationProcessorDirect =
98+
getVolumeOperationProcessorDirect(response.getUrl(), localPath);
99+
volumeOperationProcessorDirect.executeGetOperation();
100+
isOperationSucceeded = true;
101+
} catch (DatabricksVolumeOperationException e) {
102+
String errorMessage = String.format("Failed to get object - {%s}", e.getMessage());
103+
LOGGER.error(e, errorMessage);
104+
throw e;
105+
}
106+
return isOperationSucceeded;
82107
}
83108

84109
/** {@inheritDoc} */
@@ -115,7 +140,7 @@ public boolean putObject(
115140

116141
// Uploading the object to the Pre Signed Url
117142
VolumeOperationProcessorDirect volumeOperationProcessorDirect =
118-
new VolumeOperationProcessorDirect(response.getUrl(), localPath, connection.getSession());
143+
getVolumeOperationProcessorDirect(response.getUrl(), localPath);
119144
volumeOperationProcessorDirect.executePutOperation();
120145
isOperationSucceeded = true;
121146
} catch (DatabricksVolumeOperationException e) {
@@ -145,23 +170,48 @@ public boolean putObject(
145170
/** {@inheritDoc} */
146171
@Override
147172
public boolean deleteObject(String catalog, String schema, String volume, String objectPath)
148-
throws UnsupportedOperationException {
149-
String errorMessage = "deleteObject function is unsupported in DBFSVolumeClient";
150-
LOGGER.error(errorMessage);
151-
throw new UnsupportedOperationException(errorMessage);
173+
throws DatabricksVolumeOperationException {
174+
LOGGER.debug(
175+
String.format(
176+
"Entering deleteObject method with parameters: catalog={%s}, schema={%s}, volume={%s}, objectPath={%s}",
177+
catalog, schema, volume, objectPath));
178+
179+
boolean isOperationSucceeded = false;
180+
try {
181+
// Fetching the Pre Signed Url
182+
CreateDeleteUrlResponse response =
183+
getCreateDeleteUrlResponse(getObjectFullPath(catalog, schema, volume, objectPath));
184+
185+
// Uploading the object to the Pre Signed Url
186+
VolumeOperationProcessorDirect volumeOperationProcessorDirect =
187+
getVolumeOperationProcessorDirect(response.getUrl(), null);
188+
volumeOperationProcessorDirect.executeDeleteOperation();
189+
isOperationSucceeded = true;
190+
} catch (DatabricksVolumeOperationException e) {
191+
String errorMessage = String.format("Failed to delete object - {%s}", e.getMessage());
192+
LOGGER.error(e, errorMessage);
193+
throw e;
194+
}
195+
return isOperationSucceeded;
196+
}
197+
198+
WorkspaceClient getWorkspaceClientFromConnectionContext(
199+
IDatabricksConnectionContext connectionContext) {
200+
return new ClientConfigurator(connectionContext).getWorkspaceClient();
201+
}
202+
203+
VolumeOperationProcessorDirect getVolumeOperationProcessorDirect(
204+
String operationUrl, String localFilePath) {
205+
return new VolumeOperationProcessorDirect(operationUrl, localFilePath, connectionContext);
152206
}
153207

154-
/** Fetches the pre signed url using the SQL Exec API */
155-
private CreateUploadUrlResponse getCreateUploadUrlResponse(String objectPath)
208+
/** Fetches the pre signed url for uploading to the volume using the SQL Exec API */
209+
CreateUploadUrlResponse getCreateUploadUrlResponse(String objectPath)
156210
throws DatabricksVolumeOperationException {
157211
LOGGER.debug(
158212
String.format(
159213
"Entering getCreateUploadUrlResponse method with parameters: objectPath={%s}",
160214
objectPath));
161-
IDatabricksClient client = connection.getSession().getDatabricksClient();
162-
IDatabricksConnectionContext connectionContext = client.getConnectionContext();
163-
WorkspaceClient workspaceClient =
164-
new ClientConfigurator(connectionContext).getWorkspaceClient();
165215

166216
CreateUploadUrlRequest request = new CreateUploadUrlRequest(objectPath);
167217
try {
@@ -175,4 +225,51 @@ private CreateUploadUrlResponse getCreateUploadUrlResponse(String objectPath)
175225
throw new DatabricksVolumeOperationException(errorMessage, e);
176226
}
177227
}
228+
229+
/** Fetches the pre signed url for downloading the object contents using the SQL Exec API */
230+
CreateDownloadUrlResponse getCreateDownloadUrlResponse(String objectPath)
231+
throws DatabricksVolumeOperationException {
232+
LOGGER.debug(
233+
String.format(
234+
"Entering getCreateDownloadUrlResponse method with parameters: objectPath={%s}",
235+
objectPath));
236+
237+
CreateDownloadUrlRequest request = new CreateDownloadUrlRequest(objectPath);
238+
239+
try {
240+
return workspaceClient
241+
.apiClient()
242+
.POST(
243+
CREATE_DOWNLOAD_URL_PATH,
244+
request,
245+
CreateDownloadUrlResponse.class,
246+
JSON_HTTP_HEADERS);
247+
} catch (DatabricksException e) {
248+
String errorMessage =
249+
String.format("Failed to get create download url response - {%s}", e.getMessage());
250+
LOGGER.error(e, errorMessage);
251+
throw new DatabricksVolumeOperationException(errorMessage, e);
252+
}
253+
}
254+
255+
/** Fetches the pre signed url for deleting object from the volume using the SQL Exec API */
256+
CreateDeleteUrlResponse getCreateDeleteUrlResponse(String objectPath)
257+
throws DatabricksVolumeOperationException {
258+
LOGGER.debug(
259+
String.format(
260+
"Entering getCreateDeleteUrlResponse method with parameters: objectPath={%s}",
261+
objectPath));
262+
CreateDeleteUrlRequest request = new CreateDeleteUrlRequest(objectPath);
263+
264+
try {
265+
return workspaceClient
266+
.apiClient()
267+
.POST(CREATE_DELETE_URL_PATH, request, CreateDeleteUrlResponse.class, JSON_HTTP_HEADERS);
268+
} catch (DatabricksException e) {
269+
String errorMessage =
270+
String.format("Failed to get create delete url response - {%s}", e.getMessage());
271+
LOGGER.error(e, errorMessage);
272+
throw new DatabricksVolumeOperationException(errorMessage, e);
273+
}
274+
}
178275
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.databricks.jdbc.api.impl.volume;
2+
3+
import com.databricks.jdbc.api.IDatabricksConnectionContext;
4+
import com.databricks.jdbc.api.IDatabricksVolumeClient;
5+
import com.databricks.jdbc.log.JdbcLogger;
6+
import com.databricks.jdbc.log.JdbcLoggerFactory;
7+
import java.sql.Connection;
8+
9+
/** Factory class for creating instances of {@link IDatabricksVolumeClient}. */
10+
public class DatabricksVolumeClientFactory {
11+
private static final JdbcLogger LOGGER =
12+
JdbcLoggerFactory.getLogger(DatabricksVolumeClientFactory.class);
13+
14+
/**
15+
* Creates an instance of the DatabricksUCVolumeClient from the given connection.
16+
*
17+
* @param con Connection
18+
* @return an instance of {@link IDatabricksVolumeClient}
19+
*/
20+
public static IDatabricksVolumeClient getVolumeClient(Connection con) {
21+
LOGGER.debug(
22+
String.format(
23+
"Entering public static IDatabricksVolumeClient getVolumeClient with Connection con = {%s}",
24+
con));
25+
return new DatabricksUCVolumeClient(con);
26+
}
27+
28+
/**
29+
* Creates an instance of the DBFVolumeClient from the given connectionContext.
30+
*
31+
* @param connectionContext IDatabricksConnectionContext
32+
* @return an instance of {@link IDatabricksVolumeClient}
33+
*/
34+
public static IDatabricksVolumeClient getVolumeClient(
35+
IDatabricksConnectionContext connectionContext) {
36+
LOGGER.debug(
37+
String.format(
38+
"Entering public static IDatabricksVolumeClient getVolumeClient with IDatabricksConnectionContext connectionContext = {%s}",
39+
connectionContext));
40+
return new DBFSVolumeClient(connectionContext);
41+
}
42+
}

src/main/java/com/databricks/jdbc/api/impl/volume/VolumeOperationProcessor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,11 +333,6 @@ private void executeDeleteOperation() {
333333
}
334334
}
335335

336-
private boolean isSuccessfulHttpResponse(CloseableHttpResponse response) {
337-
return response.getStatusLine().getStatusCode() >= 200
338-
&& response.getStatusLine().getStatusCode() < 300;
339-
}
340-
341336
enum VolumeOperationStatus {
342337
PENDING,
343338
RUNNING,

0 commit comments

Comments
 (0)