Skip to content

Commit a8504f9

Browse files
committed
add log for cloud fetch speed
1 parent 746c05d commit a8504f9

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

internal/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ type CloudFetchConfig struct {
468468
MaxDownloadThreads int
469469
MaxFilesInMemory int
470470
MinTimeToExpiry time.Duration
471+
CloudFetchSpeedThresholdMbps float64 // Minimum download speed in MBps before WARN logging (default: 0.1)
471472
}
472473

473474
func (cfg CloudFetchConfig) WithDefaults() CloudFetchConfig {
@@ -485,6 +486,10 @@ func (cfg CloudFetchConfig) WithDefaults() CloudFetchConfig {
485486
cfg.MinTimeToExpiry = 0 * time.Second
486487
}
487488

489+
if cfg.CloudFetchSpeedThresholdMbps <= 0 {
490+
cfg.CloudFetchSpeedThresholdMbps = 0.1
491+
}
492+
488493
return cfg
489494
}
490495

@@ -494,5 +499,6 @@ func (cfg CloudFetchConfig) DeepCopy() CloudFetchConfig {
494499
MaxDownloadThreads: cfg.MaxDownloadThreads,
495500
MaxFilesInMemory: cfg.MaxFilesInMemory,
496501
MinTimeToExpiry: cfg.MinTimeToExpiry,
502+
CloudFetchSpeedThresholdMbps: cfg.CloudFetchSpeedThresholdMbps,
497503
}
498504
}

internal/rows/arrowbased/batchloader.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"strings"
89
"time"
910

1011
"github.com/databricks/databricks-sql-go/internal/config"
@@ -143,6 +144,7 @@ func (bi *cloudBatchIterator) Next() (SparkArrowBatch, error) {
143144
link: link,
144145
resultChan: make(chan cloudFetchDownloadTaskResult),
145146
minTimeToExpiry: bi.cfg.MinTimeToExpiry,
147+
speedThresholdMbps: bi.cfg.CloudFetchSpeedThresholdMbps,
146148
}
147149
task.Run()
148150
bi.downloadTasks.Enqueue(task)
@@ -190,6 +192,7 @@ type cloudFetchDownloadTask struct {
190192
minTimeToExpiry time.Duration
191193
link *cli_service.TSparkArrowResultLink
192194
resultChan chan cloudFetchDownloadTaskResult
195+
speedThresholdMbps float64
193196
}
194197

195198
func (cft *cloudFetchDownloadTask) GetResult() (SparkArrowBatch, error) {
@@ -232,7 +235,7 @@ func (cft *cloudFetchDownloadTask) Run() {
232235
cft.link.StartRowOffset,
233236
cft.link.RowCount,
234237
)
235-
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry)
238+
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps)
236239
if err != nil {
237240
cft.resultChan <- cloudFetchDownloadTaskResult{batch: nil, err: err}
238241
return
@@ -262,10 +265,30 @@ func (cft *cloudFetchDownloadTask) Run() {
262265
}()
263266
}
264267

268+
// logCloudFetchSpeed calculates and logs download speed metrics
269+
func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) {
270+
if contentLength > 0 && duration.Seconds() > 0 {
271+
// Extract base URL (up to first ?)
272+
baseURL := fullURL
273+
if idx := strings.Index(baseURL, "?"); idx != -1 {
274+
baseURL = baseURL[:idx]
275+
}
276+
277+
speedMbps := float64(contentLength) / (1024 * 1024) / duration.Seconds()
278+
279+
logger.Info().Msgf("CloudFetch: Result File Download speed from cloud storage %s %.4f Mbps", baseURL, speedMbps)
280+
281+
if speedMbps < speedThresholdMbps {
282+
logger.Warn().Msgf("CloudFetch: Results download is slower than threshold speed of %.4f Mbps", speedThresholdMbps)
283+
}
284+
}
285+
}
286+
265287
func fetchBatchBytes(
266288
ctx context.Context,
267289
link *cli_service.TSparkArrowResultLink,
268290
minTimeToExpiry time.Duration,
291+
speedThresholdMbps float64,
269292
) (io.ReadCloser, error) {
270293
if isLinkExpired(link.ExpiryTime, minTimeToExpiry) {
271294
return nil, errors.New(dbsqlerr.ErrLinkExpired)
@@ -283,6 +306,7 @@ func fetchBatchBytes(
283306
}
284307
}
285308

309+
startTime := time.Now()
286310
client := http.DefaultClient
287311
res, err := client.Do(req)
288312
if err != nil {
@@ -293,6 +317,9 @@ func fetchBatchBytes(
293317
return nil, dbsqlerrint.NewDriverError(ctx, msg, err)
294318
}
295319

320+
// Log download speed metrics
321+
logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps)
322+
296323
return res.Body, nil
297324
}
298325

0 commit comments

Comments
 (0)