Skip to content

Commit 5f89527

Browse files
authored
added logs for cloud fetch download speed (#913)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> Add logs for cloud fetch download speed relevant ticket: https://databricks.atlassian.net/browse/XTA-11037 ## Testing <!-- Describe how the changes have been tested--> verified logs are printed when log level is set to INFO ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. -->
1 parent 70fac50 commit 5f89527

File tree

8 files changed

+51
-11
lines changed

8 files changed

+51
-11
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- Provide an option to push telemetry logs (using the flag `ForceEnableTelemetry=1`). For more details see [documentation](https://docs.databricks.com/aws/en/integrations/jdbc-oss/properties#-telemetry-collection)
99
- Added putFiles methods in DBFSVolumeClient for async multi-file upload.
1010
- Added validation on UID param to ensure it is either not set or set to 'token'.
11+
- Added CloudFetch download speed logging at INFO level
1112
- Added vendor error codes to SQLExceptions raised for incorrect UID, host or token.
1213

1314
### Updated

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,11 @@ public int getCloudFetchThreadPoolSize() {
422422
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.CLOUD_FETCH_THREAD_POOL_SIZE));
423423
}
424424

425+
@Override
426+
public double getCloudFetchSpeedThreshold() {
427+
return Double.parseDouble(getParameter(DatabricksJdbcUrlParams.CLOUD_FETCH_SPEED_THRESHOLD));
428+
}
429+
425430
@Override
426431
public String getCatalog() {
427432
return getParameter(DatabricksJdbcUrlParams.CONN_CATALOG);

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,12 @@ public boolean releaseChunk() {
147147
*
148148
* @param httpClient the HTTP client to use for downloading
149149
* @param compressionCodec the compression codec to use for decompression
150+
* @param speedThreshold the minimum expected download speed in MB/s for logging warnings
150151
* @throws DatabricksParsingException if there is an error parsing the data
151152
* @throws IOException if there is an error downloading or reading the data
152153
*/
153154
protected abstract void downloadData(
154-
IDatabricksHttpClient httpClient, CompressionCodec compressionCodec)
155+
IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, double speedThreshold)
155156
throws DatabricksParsingException, IOException;
156157

157158
/** Handles a failure during the download or processing of this chunk. */

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowResultChunk.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public static Builder builder() {
6666
* @throws IOException if there is an error during download or data reading
6767
*/
6868
@Override
69-
protected void downloadData(IDatabricksHttpClient httpClient, CompressionCodec compressionCodec)
69+
protected void downloadData(
70+
IDatabricksHttpClient httpClient, CompressionCodec compressionCodec, double speedThreshold)
7071
throws DatabricksParsingException, IOException {
7172
CloseableHttpResponse response = null;
7273
long startTime = System.nanoTime();
@@ -77,11 +78,15 @@ protected void downloadData(IDatabricksHttpClient httpClient, CompressionCodec c
7778
// Retry would be done in http client, we should not bother about that here
7879
response = httpClient.execute(getRequest, true);
7980
checkHTTPError(response);
81+
82+
long downloadTimeMs = (System.nanoTime() - startTime) / 1_000_000;
83+
long contentLength = response.getEntity().getContentLength();
84+
logDownloadMetrics(
85+
downloadTimeMs, contentLength, chunkLink.getExternalLink(), speedThreshold);
86+
8087
TelemetryCollector.getInstance()
8188
.recordChunkDownloadLatency(
82-
getStatementIdString(statementId),
83-
chunkIndex,
84-
((System.nanoTime() - startTime) / 1000_000)); // Convert nano to millis
89+
getStatementIdString(statementId), chunkIndex, downloadTimeMs);
8590
setStatus(ChunkStatus.DOWNLOAD_SUCCEEDED);
8691
String decompressionContext =
8792
String.format(
@@ -133,6 +138,26 @@ private void addHeaders(HttpGet getRequest, Map<String, String> headers) {
133138
}
134139
}
135140

141+
private void logDownloadMetrics(
142+
long downloadTimeMs, long contentLength, String url, double speedThreshold) {
143+
if (downloadTimeMs > 0 && contentLength > 0) {
144+
double speedMBps = (contentLength / 1024.0 / 1024.0) / (downloadTimeMs / 1000.0);
145+
String baseUrl = url.split("\\?")[0];
146+
147+
LOGGER.info(
148+
String.format(
149+
"CloudFetch download: %.4f MB/s, %d bytes in %dms from %s",
150+
speedMBps, contentLength, downloadTimeMs, baseUrl));
151+
152+
if (speedMBps < speedThreshold) {
153+
LOGGER.warn(
154+
String.format(
155+
"CloudFetch download slower than threshold: %.4f MB/s < %.4f MB/s",
156+
speedMBps, speedThreshold));
157+
}
158+
}
159+
}
160+
136161
public static class Builder {
137162
private long chunkIndex;
138163
private long numRows;

src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public Void call() throws DatabricksSQLException, ExecutionException, Interrupte
6060
chunk.setChunkLink(link);
6161
}
6262

63-
chunk.downloadData(httpClient, chunkDownloader.getCompressionCodec());
63+
chunk.downloadData(
64+
httpClient,
65+
chunkDownloader.getCompressionCodec(),
66+
connectionContext != null ? connectionContext.getCloudFetchSpeedThreshold() : 0.1);
6467
downloadSuccessful = true;
6568
} catch (IOException | DatabricksSQLException e) {
6669
retries++;

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ public interface IDatabricksConnectionContext {
132132
/** Returns the number of threads to be used for fetching data from cloud storage */
133133
int getCloudFetchThreadPoolSize();
134134

135+
/** Returns the minimum expected download speed threshold in MB/s for CloudFetch operations */
136+
double getCloudFetchSpeedThreshold();
137+
135138
Boolean getDirectResultMode();
136139

137140
Boolean shouldRetryTemporarilyUnavailableError();

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ public enum DatabricksJdbcUrlParams {
145145
MAX_CONCURRENT_PRESIGNED_REQUESTS(
146146
"MaxVolumeOperationConcurrentPresignedRequests",
147147
"Maximum number of concurrent presigned requests",
148-
"50");
148+
"50"),
149+
CLOUD_FETCH_SPEED_THRESHOLD(
150+
"CloudFetchSpeedThreshold", "Minimum expected download speed in MB/s", "0.1");
149151

150152
private final String paramName;
151153
private final String defaultValue;

src/test/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTaskTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ void testRetryLogicWithSocketException() throws Exception {
6565
.doThrow(throwableError)
6666
.doNothing()
6767
.when(chunk)
68-
.downloadData(httpClient, CompressionCodec.NONE);
68+
.downloadData(httpClient, CompressionCodec.NONE, 0.1);
6969

7070
chunkDownloadTask.call();
7171

72-
verify(chunk, times(3)).downloadData(httpClient, CompressionCodec.NONE);
72+
verify(chunk, times(3)).downloadData(httpClient, CompressionCodec.NONE, 0.1);
7373
assertTrue(downloadFuture.isDone());
7474
assertDoesNotThrow(() -> downloadFuture.get());
7575
}
@@ -88,11 +88,11 @@ void testRetryLogicExhaustedWithSocketException() throws Exception {
8888
new SocketException("Connection reset"),
8989
DatabricksDriverErrorCode.INVALID_STATE))
9090
.when(chunk)
91-
.downloadData(httpClient, CompressionCodec.NONE);
91+
.downloadData(httpClient, CompressionCodec.NONE, 0.1);
9292

9393
assertThrows(DatabricksSQLException.class, () -> chunkDownloadTask.call());
9494
verify(chunk, times(ChunkDownloadTask.MAX_RETRIES))
95-
.downloadData(httpClient, CompressionCodec.NONE);
95+
.downloadData(httpClient, CompressionCodec.NONE, 0.1);
9696
assertTrue(downloadFuture.isDone());
9797
ExecutionException executionException =
9898
assertThrows(ExecutionException.class, () -> downloadFuture.get());

0 commit comments

Comments
 (0)