Skip to content

Commit 700befa

Browse files
committed
channel based download
1 parent 1c7900c commit 700befa

File tree

1 file changed

+60
-89
lines changed

1 file changed

+60
-89
lines changed

internal/committer/committer.go

Lines changed: 60 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ var kafkaPublisher *storage.KafkaPublisher
5656
var tempDir = filepath.Join(os.TempDir(), "committer")
5757
var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`)
5858
var mu sync.RWMutex
59+
var downloadComplete chan *BlockRange
5960

6061
func Init(chainId *big.Int) {
6162
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
63+
downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)
6264

6365
initClickHouse()
6466
initS3()
@@ -157,37 +159,74 @@ func Commit(chainId *big.Int) error {
157159
}
158160

159161
nextCommitBlockNumber := new(big.Int).Add(maxBlockNumber, big.NewInt(1))
160-
log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting sequential processing")
162+
log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting producer-consumer processing")
161163

164+
// Start the block range processor goroutine
165+
processorDone := make(chan struct{})
166+
go func() {
167+
blockRangeProcessor(nextCommitBlockNumber)
168+
close(processorDone)
169+
}()
170+
171+
// Download files synchronously and send to channel
162172
for i, blockRange := range blockRanges {
163173
log.Info().
164174
Int("processing", i+1).
165175
Int("total", len(blockRanges)).
166176
Str("file", blockRange.S3Key).
167177
Str("start_block", blockRange.StartBlock.String()).
168178
Str("end_block", blockRange.EndBlock.String()).
169-
Msg("Processing file")
170-
171-
// Start downloading the next file in background (lookahead)
172-
if i+1 < len(blockRanges) {
173-
nextBlockRange := &blockRanges[i+1]
174-
log.Debug().
175-
Str("next_file", nextBlockRange.S3Key).
176-
Msg("Starting lookahead download")
177-
go func(br *BlockRange) {
178-
if err := downloadFile(br); err != nil {
179-
log.Error().Err(err).Str("file", br.S3Key).Msg("Failed to download file in background")
180-
}
181-
}(nextBlockRange)
182-
}
179+
Msg("Starting download")
183180

184-
// Download and parse the current file
185-
log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download")
186181
if err := downloadFile(&blockRange); err != nil {
187182
log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
188183
}
189-
log.Debug().Str("file", blockRange.S3Key).Msg("File download completed")
190184

185+
log.Debug().Str("file", blockRange.S3Key).Msg("Download completed, sending to channel")
186+
downloadComplete <- &blockRange
187+
}
188+
189+
// Close channel to signal processor that all downloads are done
190+
log.Info().Msg("All downloads completed, waiting for processing to finish")
191+
close(downloadComplete)
192+
<-processorDone
193+
log.Info().Msg("All processing completed successfully")
194+
195+
return nil
196+
}
197+
198+
func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) {
199+
// Use toString() to force ClickHouse to return a string instead of UInt256
200+
query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64())
201+
rows, err := clickhouseConn.Query(context.Background(), query)
202+
if err != nil {
203+
return nil, err
204+
}
205+
defer rows.Close()
206+
207+
if !rows.Next() {
208+
return big.NewInt(0), nil
209+
}
210+
211+
var maxBlockNumberStr string
212+
if err := rows.Scan(&maxBlockNumberStr); err != nil {
213+
return nil, err
214+
}
215+
216+
// Convert string to big.Int to handle UInt256 values
217+
maxBlockNumber, ok := new(big.Int).SetString(maxBlockNumberStr, 10)
218+
if !ok {
219+
return nil, fmt.Errorf("failed to parse block number: %s", maxBlockNumberStr)
220+
}
221+
222+
return maxBlockNumber, nil
223+
}
224+
225+
// blockRangeProcessor processes BlockRanges from the download channel and publishes to Kafka
226+
func blockRangeProcessor(nextCommitBlockNumber *big.Int) {
227+
log.Info().Str("next_commit_block", nextCommitBlockNumber.String()).Msg("Starting block range processor")
228+
229+
for blockRange := range downloadComplete {
191230
log.Info().
192231
Str("file", blockRange.S3Key).
193232
Str("next_commit_block", nextCommitBlockNumber.String()).
@@ -199,7 +238,7 @@ func Commit(chainId *big.Int) error {
199238
log.Warn().
200239
Str("file", blockRange.S3Key).
201240
Msg("No block data found in parquet file, skipping")
202-
return nil
241+
continue
203242
}
204243

205244
// Process block data sequentially
@@ -235,7 +274,7 @@ func Commit(chainId *big.Int) error {
235274
log.Panic().
236275
Str("file", blockRange.S3Key).
237276
Msg("All blocks already processed, skipping Kafka publish")
238-
return nil
277+
continue
239278
}
240279

241280
blocksToProcess := blockRange.BlockData[startIndex:]
@@ -330,40 +369,11 @@ func Commit(chainId *big.Int) error {
330369
}
331370

332371
log.Info().
333-
Int("processed", i+1).
334-
Int("total", len(blockRanges)).
335372
Str("file", blockRange.S3Key).
336373
Msg("Completed processing file")
337374
}
338375

339-
return nil
340-
}
341-
342-
func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) {
343-
// Use toString() to force ClickHouse to return a string instead of UInt256
344-
query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64())
345-
rows, err := clickhouseConn.Query(context.Background(), query)
346-
if err != nil {
347-
return nil, err
348-
}
349-
defer rows.Close()
350-
351-
if !rows.Next() {
352-
return big.NewInt(0), nil
353-
}
354-
355-
var maxBlockNumberStr string
356-
if err := rows.Scan(&maxBlockNumberStr); err != nil {
357-
return nil, err
358-
}
359-
360-
// Convert string to big.Int to handle UInt256 values
361-
maxBlockNumber, ok := new(big.Int).SetString(maxBlockNumberStr, 10)
362-
if !ok {
363-
return nil, fmt.Errorf("failed to parse block number: %s", maxBlockNumberStr)
364-
}
365-
366-
return maxBlockNumber, nil
376+
log.Info().Msg("Block range processor finished")
367377
}
368378

369379
// listS3ParquetFiles lists all parquet files in S3 with the chain prefix
@@ -463,45 +473,6 @@ func filterAndSortBlockRanges(files []string, maxBlockNumber *big.Int) ([]BlockR
463473
func downloadFile(blockRange *BlockRange) error {
464474
log.Debug().Str("file", blockRange.S3Key).Msg("Starting file download")
465475

466-
// Check if file is already downloaded
467-
mu.RLock()
468-
if blockRange.IsDownloaded {
469-
mu.RUnlock()
470-
log.Debug().Str("file", blockRange.S3Key).Msg("File already downloaded, skipping")
471-
return nil
472-
}
473-
mu.RUnlock()
474-
475-
// Check if file is already being downloaded by another goroutine
476-
mu.Lock()
477-
if blockRange.IsDownloading {
478-
mu.Unlock()
479-
log.Debug().Str("file", blockRange.S3Key).Msg("File is already being downloaded, waiting...")
480-
481-
// Poll every 250ms until download is complete
482-
for {
483-
time.Sleep(250 * time.Millisecond)
484-
mu.RLock()
485-
if blockRange.IsDownloaded {
486-
mu.RUnlock()
487-
log.Debug().Str("file", blockRange.S3Key).Msg("Download completed by another goroutine")
488-
return nil
489-
}
490-
mu.RUnlock()
491-
}
492-
}
493-
494-
// Mark as downloading
495-
blockRange.IsDownloading = true
496-
mu.Unlock()
497-
498-
// Ensure downloading flag is cleared on error
499-
defer func() {
500-
mu.Lock()
501-
blockRange.IsDownloading = false
502-
mu.Unlock()
503-
}()
504-
505476
// Ensure temp directory exists
506477
if err := os.MkdirAll(tempDir, 0755); err != nil {
507478
return fmt.Errorf("failed to create temp directory: %w", err)

0 commit comments

Comments
 (0)