Skip to content

Commit 7020377

Browse files
committed
minor change
1 parent 5ded7dc commit 7020377

File tree

1 file changed

+14
-15
lines changed

1 file changed

+14
-15
lines changed

internal/committer/committer.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,27 @@ func CommitStreaming() error {
6161
}
6262

6363
// if nothing to process, return
64-
if len(blockRanges) == 0 {
64+
if len(blockRanges) != 0 {
6565
log.Info().
6666
Int64("maxBlockNumber", maxBlockNumber).
6767
Msg("No files to process - all blocks are up to date from S3")
68-
return nil
69-
}
7068

71-
// Initialize nextBlockNumber for streaming processing
72-
nextBlockNumber = uint64(maxBlockNumber + 1)
73-
log.Info().Uint64("next_commit_block", nextBlockNumber).Msg("Starting streaming producer-consumer processing")
69+
// Initialize nextBlockNumber for streaming processing
70+
nextBlockNumber = uint64(maxBlockNumber + 1)
71+
log.Info().Uint64("next_commit_block", nextBlockNumber).Msg("Starting streaming producer-consumer processing")
7472

75-
blockParserDone := make(chan struct{})
76-
blockProcessorDone := make(chan struct{})
77-
go blockParserRoutine(blockParserDone)
78-
go blockProcessorRoutine(blockProcessorDone)
73+
blockParserDone := make(chan struct{})
74+
blockProcessorDone := make(chan struct{})
75+
go blockParserRoutine(blockParserDone)
76+
go blockProcessorRoutine(blockProcessorDone)
7977

80-
downloadFilesForBlockRange(blockRanges)
81-
close(downloadedFilePathChannel)
78+
downloadFilesForBlockRange(blockRanges)
79+
close(downloadedFilePathChannel)
8280

83-
<-blockParserDone
84-
close(blockDataChannel)
85-
<-blockProcessorDone
81+
<-blockParserDone
82+
close(blockDataChannel)
83+
<-blockProcessorDone
84+
}
8685

8786
log.Info().Msg("Consuming latest blocks from RPC")
8887
pollLatest()

0 commit comments

Comments
 (0)