Skip to content

Commit 1c7900c

Browse files
committed
lookahead download
1 parent 7e1f0ac commit 1c7900c

File tree

1 file changed

+60
-7
lines changed

1 file changed

+60
-7
lines changed

internal/committer/committer.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ import (
2929

3030
// BlockRange represents a range of blocks in an S3 parquet file
3131
type BlockRange struct {
32-
StartBlock *big.Int `json:"start_block"`
33-
EndBlock *big.Int `json:"end_block"`
34-
S3Key string `json:"s3_key"`
35-
IsDownloaded bool `json:"is_downloaded"`
36-
LocalPath string `json:"local_path,omitempty"`
37-
BlockData []common.BlockData `json:"block_data,omitempty"`
32+
StartBlock *big.Int `json:"start_block"`
33+
EndBlock *big.Int `json:"end_block"`
34+
S3Key string `json:"s3_key"`
35+
IsDownloaded bool `json:"is_downloaded"`
36+
IsDownloading bool `json:"is_downloading"`
37+
LocalPath string `json:"local_path,omitempty"`
38+
BlockData []common.BlockData `json:"block_data,omitempty"`
3839
}
3940

4041
// ParquetBlockData represents the block data structure in parquet files
@@ -167,7 +168,20 @@ func Commit(chainId *big.Int) error {
167168
Str("end_block", blockRange.EndBlock.String()).
168169
Msg("Processing file")
169170

170-
// Download and parse the file synchronously
171+
// Start downloading the next file in background (lookahead)
172+
if i+1 < len(blockRanges) {
173+
nextBlockRange := &blockRanges[i+1]
174+
log.Debug().
175+
Str("next_file", nextBlockRange.S3Key).
176+
Msg("Starting lookahead download")
177+
go func(br *BlockRange) {
178+
if err := downloadFile(br); err != nil {
179+
log.Error().Err(err).Str("file", br.S3Key).Msg("Failed to download file in background")
180+
}
181+
}(nextBlockRange)
182+
}
183+
184+
// Download and parse the current file
171185
log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download")
172186
if err := downloadFile(&blockRange); err != nil {
173187
log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
@@ -449,6 +463,45 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR
449463
func downloadFile(blockRange *BlockRange) error {
450464
log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download")
451465

466+
// Check if file is already downloaded
467+
mu.RLock()
468+
if blockRange.IsDownloaded {
469+
mu.RUnlock()
470+
log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, skipping")
471+
return nil
472+
}
473+
mu.RUnlock()
474+
475+
// Check if file is already being downloaded by another goroutine
476+
mu.Lock()
477+
if blockRange.IsDownloading {
478+
mu.Unlock()
479+
log.Debug().Str("file", blockRange.S3Key).Msg("File is already being downloaded, waiting...")
480+
481+
// Poll every 250ms until download is complete
482+
for {
483+
time.Sleep(250 * time.Millisecond)
484+
mu.RLock()
485+
if blockRange.IsDownloaded {
486+
mu.RUnlock()
487+
log.Debug().Str("file", blockRange.S3Key).Msg("Download completed by another goroutine")
488+
return nil
489+
}
490+
mu.RUnlock()
491+
}
492+
}
493+
494+
// Mark as downloading
495+
blockRange.IsDownloading = true
496+
mu.Unlock()
497+
498+
// Ensure downloading flag is cleared on error
499+
defer func() {
500+
mu.Lock()
501+
blockRange.IsDownloading = false
502+
mu.Unlock()
503+
}()
504+
452505
// Ensure temp directory exists
453506
if err := os.MkdirAll(tempDir, 0755); err != nil {
454507
return fmt.Errorf("failed to create temp directory: %w", err)

0 commit comments

Comments
 (0)