Skip to content

Commit 1744c2b

Browse files
committed
parallel commit + wh
1 parent 609f752 commit 1744c2b

File tree

1 file changed

+14
-40
lines changed

1 file changed

+14
-40
lines changed

internal/orchestrator/committer.go

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"math/big"
77
"sort"
8-
"sync"
98
"sync/atomic"
109
"time"
1110

@@ -108,30 +107,6 @@ func (c *Committer) Start(ctx context.Context) {
108107

109108
c.cleanupProcessedStagingBlocks()
110109

111-
if config.Cfg.Publisher.Mode == "parallel" {
112-
var wg sync.WaitGroup
113-
publishInterval := interval / 2
114-
if publishInterval <= 0 {
115-
publishInterval = interval
116-
}
117-
wg.Add(2)
118-
go func() {
119-
defer wg.Done()
120-
c.runPublishLoop(ctx, publishInterval)
121-
}()
122-
// allow the publisher to start before the committer
123-
time.Sleep(publishInterval)
124-
go func() {
125-
defer wg.Done()
126-
c.runCommitLoop(ctx, interval)
127-
}()
128-
<-ctx.Done()
129-
wg.Wait()
130-
log.Info().Msg("Committer shutting down")
131-
c.publisher.Close()
132-
return
133-
}
134-
135110
c.runCommitLoop(ctx, interval)
136111
log.Info().Msg("Committer shutting down")
137112
c.publisher.Close()
@@ -162,9 +137,20 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
162137
log.Debug().Msg("No block data to commit")
163138
continue
164139
}
165-
if err := c.commit(ctx, blockDataToCommit); err != nil {
166-
log.Error().Err(err).Msg("Error committing blocks")
167-
}
140+
go func() {
141+
highest := blockDataToCommit[len(blockDataToCommit)-1].Block.Number.Uint64()
142+
if err := c.publisher.PublishBlockData(blockDataToCommit); err != nil {
143+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
144+
return
145+
}
146+
c.lastPublishedBlock.Store(highest)
147+
}()
148+
go func() {
149+
if err := c.commit(ctx, blockDataToCommit); err != nil {
150+
log.Error().Err(err).Msg("Error committing blocks")
151+
}
152+
c.cleanupProcessedStagingBlocks()
153+
}()
168154
}
169155
}
170156
}
@@ -458,18 +444,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
458444
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
459445
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
460446

461-
if config.Cfg.Publisher.Mode == "default" {
462-
highest := highestBlock.Number.Uint64()
463-
go func() {
464-
if err := c.publisher.PublishBlockData(blockData); err != nil {
465-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
466-
return
467-
}
468-
c.lastPublishedBlock.Store(highest)
469-
c.cleanupProcessedStagingBlocks()
470-
}()
471-
}
472-
473447
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
474448
go c.cleanupProcessedStagingBlocks()
475449

0 commit comments

Comments
 (0)