Skip to content

Commit 8d54184

Browse files
Refactor HTTP client (#524)
- add a singleton factory that manages http client per connection - make connection manager an instance variable. each client/jdbc-connection will have a separate pool of reusable tcp connections - make HTTP client closable - lint changes
1 parent 68aaba8 commit 8d54184

22 files changed

+261
-265
lines changed

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.databricks.jdbc.common.util.ValidationUtil;
1313
import com.databricks.jdbc.dbclient.IDatabricksClient;
1414
import com.databricks.jdbc.dbclient.impl.common.StatementId;
15+
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
1516
import com.databricks.jdbc.exception.DatabricksSQLClientInfoException;
1617
import com.databricks.jdbc.exception.DatabricksSQLException;
1718
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
@@ -135,8 +136,8 @@ public void close() throws DatabricksSQLException {
135136
statement.close(false);
136137
statementSet.remove(statement);
137138
}
138-
139139
this.session.close();
140+
DatabricksHttpClientFactory.getInstance().removeClient(this.session.getConnectionContext());
140141
}
141142

142143
@Override

src/main/java/com/databricks/jdbc/api/impl/DatabricksPreparedStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public int[] executeBatch() {
7373
sql, databricksParameterMetaData.getParameterBindings(), StatementType.UPDATE, false);
7474
updateCount[i] = (int) resultSet.getUpdateCount();
7575
} catch (SQLException e) {
76-
LOGGER.error(e.getMessage());
76+
LOGGER.error(e, e.getMessage());
7777
updateCount[i] = -1;
7878
}
7979
}

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowResultChunk.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,11 @@ private static List<List<ValueVector>> getRecordBatchList(
361361
} catch (ClosedByInterruptException e) {
362362
// release resources if thread is interrupted when reading arrow data
363363
LOGGER.error(
364-
String.format(
365-
"Data parsing interrupted for chunk index [%s] and statement [%s]. Error [%s]",
366-
chunkIndex, statementId, e));
364+
e,
365+
"Data parsing interrupted for chunk index [%s] and statement [%s]. Error [%s]",
366+
chunkIndex,
367+
statementId,
368+
e.getMessage());
367369
purgeArrowData(recordBatchList);
368370
} catch (IOException e) {
369371
LOGGER.error(

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.databricks.jdbc.common.CompressionType;
99
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
1010
import com.databricks.jdbc.dbclient.impl.common.StatementId;
11-
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClient;
11+
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
1212
import com.databricks.jdbc.exception.DatabricksParsingException;
1313
import com.databricks.jdbc.exception.DatabricksSQLException;
1414
import com.databricks.jdbc.model.client.thrift.generated.TColumnDesc;
@@ -41,7 +41,7 @@ public ArrowStreamResult(
4141
resultData,
4242
statementId,
4343
session,
44-
DatabricksHttpClient.getInstance(session.getConnectionContext()));
44+
DatabricksHttpClientFactory.getInstance().getClient(session.getConnectionContext()));
4545
}
4646

4747
@VisibleForTesting
@@ -79,7 +79,7 @@ public ArrowStreamResult(
7979
isInlineArrow,
8080
parentStatementId,
8181
session,
82-
DatabricksHttpClient.getInstance(session.getConnectionContext()));
82+
DatabricksHttpClientFactory.getInstance().getClient(session.getConnectionContext()));
8383
}
8484

8585
@VisibleForTesting

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ public Void call() throws DatabricksSQLException {
4444
retries++;
4545
if (retries >= MAX_RETRIES) {
4646
LOGGER.error(
47-
String.format(
48-
"Failed to download chunk after %d attempts. Chunk index: %d, Error: %s",
49-
MAX_RETRIES, chunk.getChunkIndex(), e.getMessage()));
47+
e,
48+
"Failed to download chunk after %d attempts. Chunk index: %d, Error: %s",
49+
MAX_RETRIES,
50+
chunk.getChunkIndex(),
51+
e.getMessage());
5052
chunk.setStatus(ArrowResultChunk.ChunkStatus.DOWNLOAD_FAILED);
5153
throw new DatabricksSQLException("Failed to download chunk after multiple attempts", e);
5254
} else {

src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.databricks.jdbc.common.CompressionType;
55
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
66
import com.databricks.jdbc.dbclient.impl.common.StatementId;
7-
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClient;
87
import com.databricks.jdbc.exception.DatabricksParsingException;
98
import com.databricks.jdbc.exception.DatabricksSQLException;
109
import com.databricks.jdbc.log.JdbcLogger;
@@ -15,7 +14,6 @@
1514
import com.databricks.jdbc.model.core.ResultData;
1615
import com.databricks.jdbc.model.core.ResultManifest;
1716
import com.databricks.sdk.service.sql.BaseChunkInfo;
18-
import com.google.common.annotations.VisibleForTesting;
1917
import java.util.Collection;
2018
import java.util.concurrent.ConcurrentHashMap;
2119
import java.util.concurrent.ExecutorService;
@@ -62,23 +60,6 @@ public class RemoteChunkProvider implements ChunkProvider, ChunkDownloadCallback
6260
initializeData();
6361
}
6462

65-
RemoteChunkProvider(
66-
StatementId statementId,
67-
TRowSet resultData,
68-
IDatabricksSession session,
69-
int chunksDownloaderThreadPoolSize,
70-
CompressionType compressionType)
71-
throws DatabricksParsingException {
72-
this(
73-
statementId,
74-
resultData,
75-
session,
76-
DatabricksHttpClient.getInstance(session.getConnectionContext()),
77-
chunksDownloaderThreadPoolSize,
78-
compressionType);
79-
}
80-
81-
@VisibleForTesting
8263
RemoteChunkProvider(
8364
StatementId statementId,
8465
TRowSet resultData,
@@ -142,9 +123,10 @@ public ArrowResultChunk getChunk() throws DatabricksSQLException {
142123
} catch (InterruptedException e) {
143124
LOGGER.error(
144125
e,
145-
String.format(
146-
"Caught interrupted exception while waiting for chunk [%s] for statement [%s]. Exception [%s]",
147-
chunk.getChunkIndex(), statementId, e));
126+
"Caught interrupted exception while waiting for chunk [%s] for statement [%s]. Exception [%s]",
127+
chunk.getChunkIndex(),
128+
statementId,
129+
e.getMessage());
148130
}
149131
}
150132

src/main/java/com/databricks/jdbc/api/impl/batch/DatabricksBatchExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
120120

121121
logCommandExecutionTime(i, commandStartTime, false);
122122

123-
LOGGER.error(
124-
String.format("Error executing batch command at index %d: %s", i, e.getMessage()), e);
123+
LOGGER.error(e, "Error executing batch command at index %d: %s", i, e.getMessage());
125124

126125
String message =
127126
String.format("Batch execution failed at command %d: %s", i, e.getMessage());
@@ -136,7 +135,7 @@ public int[] executeBatch() throws DatabricksBatchUpdateException {
136135

137136
return updateCounts;
138137
} catch (DatabricksBatchUpdateException e) {
139-
LOGGER.error(String.format("BatchUpdateException occurred: %s", e.getMessage()), e);
138+
LOGGER.error(e, "BatchUpdateException occurred: %s", e.getMessage());
140139
throw e;
141140
}
142141
}

src/main/java/com/databricks/jdbc/api/impl/volume/VolumeOperationProcessor.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ void process() {
7676
"Running volume operation {%s} on local file {%s}",
7777
operationType, localFilePath == null ? "" : localFilePath));
7878
if (operationUrl == null || operationUrl.isEmpty()) {
79-
LOGGER.error("Volume operation URL is not set");
8079
status = VolumeOperationStatus.ABORTED;
8180
errorMessage = "Volume operation URL is not set";
81+
LOGGER.error(errorMessage);
8282
return;
8383
}
8484
validateLocalFilePath();
@@ -116,9 +116,9 @@ private void validateLocalFilePath() {
116116
}
117117

118118
if (allowedVolumeIngestionPaths.isEmpty()) {
119-
LOGGER.error("Volume ingestion paths are not set");
120119
status = VolumeOperationStatus.ABORTED;
121-
errorMessage = "Volume operation not supported";
120+
errorMessage = "Volume ingestion paths are not set";
121+
LOGGER.error(errorMessage);
122122
return;
123123
}
124124
if (operationType.equalsIgnoreCase(REMOVE_OPERATION)) {
@@ -127,7 +127,7 @@ private void validateLocalFilePath() {
127127
if (localFilePath == null
128128
|| localFilePath.isEmpty()
129129
|| localFilePath.contains(PARENT_DIRECTORY_REF)) {
130-
LOGGER.error(String.format("Local file path is invalid {%s}", localFilePath));
130+
LOGGER.error("Local file path is invalid {%s}", localFilePath);
131131
status = VolumeOperationStatus.ABORTED;
132132
errorMessage = "Local file path is invalid";
133133
return;
@@ -138,7 +138,7 @@ private void validateLocalFilePath() {
138138
.filter(x -> x)
139139
.findFirst();
140140
if (pathMatched.isEmpty() || !pathMatched.get()) {
141-
LOGGER.error(String.format("Local file path is not allowed {%s}", localFilePath));
141+
LOGGER.error("Local file path is not allowed {%s}", localFilePath);
142142
status = VolumeOperationStatus.ABORTED;
143143
errorMessage = "Local file path is not allowed";
144144
}
@@ -193,8 +193,7 @@ private void executeGetOperation() {
193193
// Copy the data in local file as requested by user
194194
File localFile = new File(localFilePath);
195195
if (localFile.exists()) {
196-
LOGGER.error(
197-
String.format("Local file already exists for GET operation {%s}", localFilePath));
196+
LOGGER.error("Local file already exists for GET operation {%s}", localFilePath);
198197
status = VolumeOperationStatus.ABORTED;
199198
errorMessage = "Local file already exists";
200199
return;
@@ -203,9 +202,8 @@ private void executeGetOperation() {
203202
try (CloseableHttpResponse response = databricksHttpClient.execute(httpGet)) {
204203
if (!isSuccessfulHttpResponse(response)) {
205204
LOGGER.error(
206-
String.format(
207-
"Failed to fetch content from volume with error {%s} for local file {%s}",
208-
response.getStatusLine().getStatusCode(), localFilePath));
205+
"Failed to fetch content from volume with error {%s} for local file {%s}",
206+
response.getStatusLine().getStatusCode(), localFilePath);
209207
status = VolumeOperationStatus.FAILED;
210208
errorMessage = "Failed to download file";
211209
return;
@@ -224,16 +222,16 @@ private void executeGetOperation() {
224222
}
225223
status = VolumeOperationStatus.SUCCEEDED;
226224
} catch (FileNotFoundException e) {
227-
LOGGER.error(
228-
String.format("Local file path is invalid or a directory {%s}", localFilePath));
225+
LOGGER.error("Local file path is invalid or a directory {%s}", localFilePath);
229226
status = VolumeOperationStatus.FAILED;
230227
errorMessage = "Local file path is invalid or a directory";
231228
} catch (IOException e) {
232229
// TODO: Add retries
233230
LOGGER.error(
234-
String.format(
235-
"Failed to write to local file {%s} with error {%s}",
236-
localFilePath, e.getMessage()));
231+
e,
232+
"Failed to write to local file {%s} with error {%s}",
233+
localFilePath,
234+
e.getMessage());
237235
status = VolumeOperationStatus.FAILED;
238236
errorMessage = "Failed to write to local file: " + e.getMessage();
239237
} finally {
@@ -276,40 +274,36 @@ private void executePutOperation() {
276274
status = VolumeOperationStatus.SUCCEEDED;
277275
} else {
278276
LOGGER.error(
279-
String.format(
280-
"Failed to upload file {%s} with error code: {%s}",
281-
localFilePath, response.getStatusLine().getStatusCode()));
277+
"Failed to upload file {%s} with error code: {%s}",
278+
localFilePath, response.getStatusLine().getStatusCode());
282279
// TODO: Add retries
283280
status = VolumeOperationStatus.FAILED;
284281
errorMessage =
285282
"Failed to upload file with error code: " + response.getStatusLine().getStatusCode();
286283
}
287284
} catch (IOException | DatabricksHttpException e) {
288-
LOGGER.error(
289-
String.format(
290-
"Failed to upload file {%s} with error {%s}", localFilePath, e.getMessage()));
285+
LOGGER.error("Failed to upload file {%s} with error {%s}", localFilePath, e.getMessage());
291286
status = VolumeOperationStatus.FAILED;
292287
errorMessage = "Failed to upload file: " + e.getMessage();
293288
}
294289
}
295290

296291
private boolean localFileHasErrorForPutOperation(File file) {
297292
if (!file.exists() || file.isDirectory()) {
298-
LOGGER.error(
299-
String.format("Local file does not exist or is a directory {%s}", localFilePath));
293+
LOGGER.error("Local file does not exist or is a directory {%s}", localFilePath);
300294
status = VolumeOperationStatus.ABORTED;
301295
errorMessage = "Local file does not exist or is a directory";
302296
return true;
303297
}
304298
if (file.length() == 0) {
305-
LOGGER.error(String.format("Local file is empty {%s}", localFilePath));
299+
LOGGER.error("Local file is empty {%s}", localFilePath);
306300
status = VolumeOperationStatus.ABORTED;
307301
errorMessage = "Local file is empty";
308302
return true;
309303
}
310304

311305
if (file.length() > PUT_SIZE_LIMITS) {
312-
LOGGER.error(String.format("Local file too large {%s}", localFilePath));
306+
LOGGER.error("Local file too large {%s}", localFilePath);
313307
status = VolumeOperationStatus.ABORTED;
314308
errorMessage = "Local file too large";
315309
return true;
@@ -326,14 +320,13 @@ private void executeDeleteOperation() {
326320
status = VolumeOperationStatus.SUCCEEDED;
327321
} else {
328322
LOGGER.error(
329-
String.format(
330-
"Failed to delete volume with error code: {%s}",
331-
response.getStatusLine().getStatusCode()));
323+
"Failed to delete volume with error code: {%s}",
324+
response.getStatusLine().getStatusCode());
332325
status = VolumeOperationStatus.FAILED;
333326
errorMessage = "Failed to delete volume";
334327
}
335328
} catch (DatabricksHttpException | IOException e) {
336-
LOGGER.error(String.format("Failed to delete volume with error {%s}", e.getMessage()), e);
329+
LOGGER.error(e, "Failed to delete volume with error {%s}", e.getMessage());
337330
status = VolumeOperationStatus.FAILED;
338331
errorMessage = "Failed to delete volume: " + e.getMessage();
339332
}
@@ -344,11 +337,11 @@ private boolean isSuccessfulHttpResponse(CloseableHttpResponse response) {
344337
&& response.getStatusLine().getStatusCode() < 300;
345338
}
346339

347-
static enum VolumeOperationStatus {
340+
enum VolumeOperationStatus {
348341
PENDING,
349342
RUNNING,
350343
ABORTED,
351344
SUCCEEDED,
352-
FAILED;
345+
FAILED
353346
}
354347
}

src/main/java/com/databricks/jdbc/api/impl/volume/VolumeOperationResult.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@
99
import com.databricks.jdbc.common.ErrorCodes;
1010
import com.databricks.jdbc.common.ErrorTypes;
1111
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
12-
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClient;
12+
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
1313
import com.databricks.jdbc.exception.DatabricksSQLException;
1414
import com.databricks.jdbc.model.core.ResultManifest;
1515
import com.fasterxml.jackson.core.JsonProcessingException;
1616
import com.fasterxml.jackson.databind.ObjectMapper;
1717
import com.google.common.annotations.VisibleForTesting;
1818
import com.google.common.base.Strings;
1919
import java.io.IOException;
20-
import java.sql.SQLException;
2120
import java.util.HashMap;
2221
import java.util.Map;
2322
import org.apache.http.HttpEntity;
@@ -49,7 +48,8 @@ public VolumeOperationResult(
4948
this.session = session;
5049
this.resultHandler = resultHandler;
5150
this.statement = statement;
52-
this.httpClient = DatabricksHttpClient.getInstance(session.getConnectionContext());
51+
this.httpClient =
52+
DatabricksHttpClientFactory.getInstance().getClient(session.getConnectionContext());
5353
this.currentRowIndex = -1;
5454
}
5555

@@ -133,7 +133,7 @@ private Map<String, String> getHeaders(String headersVal) throws DatabricksSQLEx
133133
}
134134

135135
private void validateMetadata() throws DatabricksSQLException {
136-
// For now we only support one row for Volume operation
136+
// For now, we only support one row for Volume operation
137137
if (rowCount > 1) {
138138
throw new DatabricksSQLException("Too many rows for Volume Operation");
139139
}
@@ -192,7 +192,7 @@ public void setVolumeOperationEntityStream(HttpEntity httpEntity) throws IOExcep
192192
this.volumeStreamContentLength = httpEntity.getContentLength();
193193
}
194194

195-
public InputStreamEntity getVolumeOperationInputStream() throws SQLException {
195+
public InputStreamEntity getVolumeOperationInputStream() {
196196
return new InputStreamEntity(this.volumeInputStream, this.volumeStreamContentLength);
197197
}
198198

src/main/java/com/databricks/jdbc/auth/OAuthEndpointResolver.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.databricks.jdbc.auth;
22

33
import com.databricks.jdbc.api.IDatabricksConnectionContext;
4-
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClient;
4+
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
5+
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
56
import com.databricks.jdbc.exception.DatabricksHttpException;
67
import com.databricks.jdbc.exception.DatabricksParsingException;
78
import com.databricks.jdbc.log.JdbcLogger;
@@ -46,9 +47,8 @@ private String getTokenEndpointWithDiscovery() {
4647
try {
4748
return getTokenEndpointFromDiscoveryEndpoint();
4849
} catch (DatabricksException e) {
49-
String errorMessage =
50-
"Failed to get token endpoint from discovery endpoint. Falling back to default token endpoint.";
51-
LOGGER.error(errorMessage);
50+
LOGGER.error(
51+
"Failed to get token endpoint from discovery endpoint. Falling back to default token endpoint.");
5252
return getDefaultTokenEndpoint();
5353
}
5454
}
@@ -77,7 +77,8 @@ DatabricksConfig getBarebonesDatabricksConfig() throws DatabricksParsingExceptio
7777
private String getTokenEndpointFromDiscoveryEndpoint() {
7878
try {
7979
URIBuilder uriBuilder = new URIBuilder(context.getOAuthDiscoveryURL());
80-
DatabricksHttpClient httpClient = DatabricksHttpClient.getInstance(context);
80+
IDatabricksHttpClient httpClient =
81+
DatabricksHttpClientFactory.getInstance().getClient(context);
8182
HttpGet getRequest = new HttpGet(uriBuilder.build());
8283
try (CloseableHttpResponse response = httpClient.execute(getRequest)) {
8384
if (response.getStatusLine().getStatusCode() != 200) {

0 commit comments

Comments
 (0)