Skip to content

Commit b054a1e

Browse files
committed
feat: gate staging cleanup on publish and commit
1 parent 95db159 commit b054a1e

File tree

2 files changed

+107
-136
lines changed

2 files changed

+107
-136
lines changed

internal/orchestrator/committer.go

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math/big"
77
"sort"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/rs/zerolog/log"
@@ -26,8 +27,8 @@ type Committer struct {
2627
storage storage.IStorage
2728
commitFromBlock *big.Int
2829
rpc rpc.IRPCClient
29-
lastCommittedBlock *big.Int
30-
lastCommittedLock sync.RWMutex
30+
lastCommittedBlock atomic.Int64
31+
lastPublishedBlock atomic.Int64
3132
publisher *publisher.Publisher
3233
workMode WorkMode
3334
workModeChan chan WorkMode
@@ -60,15 +61,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
6061

6162
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
6263
committer := &Committer{
63-
triggerIntervalMs: triggerInterval,
64-
blocksPerCommit: blocksPerCommit,
65-
storage: storage,
66-
commitFromBlock: commitFromBlock,
67-
rpc: rpc,
68-
lastCommittedBlock: commitFromBlock,
69-
publisher: publisher.GetInstance(),
70-
workMode: "",
64+
triggerIntervalMs: triggerInterval,
65+
blocksPerCommit: blocksPerCommit,
66+
storage: storage,
67+
commitFromBlock: commitFromBlock,
68+
rpc: rpc,
69+
publisher: publisher.GetInstance(),
70+
workMode: "",
7171
}
72+
committer.lastCommittedBlock.Store(commitFromBlock.Int64())
7273

7374
for _, opt := range opts {
7475
opt(committer)
@@ -78,15 +79,11 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
7879
}
7980

8081
func (c *Committer) getLastCommittedBlock() *big.Int {
81-
c.lastCommittedLock.RLock()
82-
defer c.lastCommittedLock.RUnlock()
83-
return new(big.Int).Set(c.lastCommittedBlock)
82+
return big.NewInt(c.lastCommittedBlock.Load())
8483
}
8584

8685
func (c *Committer) setLastCommittedBlock(b *big.Int) {
87-
c.lastCommittedLock.Lock()
88-
c.lastCommittedBlock = new(big.Int).Set(b)
89-
c.lastCommittedLock.Unlock()
86+
c.lastCommittedBlock.Store(b.Int64())
9087
}
9188

9289
func (c *Committer) initializeParallelPublisher() {
@@ -106,6 +103,7 @@ func (c *Committer) initializeParallelPublisher() {
106103
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
107104
log.Error().Err(err).Msg("failed to set last published block number")
108105
}
106+
c.lastPublishedBlock.Store(mainMax.Int64())
109107
}
110108
return
111109
}
@@ -114,7 +112,10 @@ func (c *Committer) initializeParallelPublisher() {
114112
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil {
115113
log.Error().Err(err).Msg("failed to set last published block number")
116114
}
115+
c.lastPublishedBlock.Store(mainMax.Int64())
116+
return
117117
}
118+
c.lastPublishedBlock.Store(lastPublished.Int64())
118119
}
119120

120121
func (c *Committer) Start(ctx context.Context) {
@@ -204,8 +205,8 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
204205
log.Error().Err(err).Msg("failed to get last published block number")
205206
continue
206207
}
207-
lastCommitted := c.getLastCommittedBlock()
208-
if lastPublished != nil && lastPublished.Cmp(lastCommitted) >= 0 {
208+
lastCommitted := c.lastCommittedBlock.Load()
209+
if lastPublished != nil && lastPublished.Int64() >= lastCommitted {
209210
continue
210211
}
211212
if err := c.publish(ctx); err != nil {
@@ -216,25 +217,48 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration)
216217
}
217218

218219
func (c *Committer) cleanupStagingData() {
219-
// Get the last committed block number from main storage
220-
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
220+
chainID := c.rpc.GetChainID()
221+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
221222
if err != nil {
222223
log.Error().Msgf("Error getting latest committed block number: %v", err)
223224
return
224225
}
226+
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
227+
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Int64())
228+
}
225229

226-
if latestCommittedBlockNumber.Sign() == 0 {
227-
log.Debug().Msg("No blocks committed yet, skipping staging data cleanup")
228-
return
230+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
231+
if err != nil {
232+
log.Error().Err(err).Msg("failed to get last published block number")
233+
} else if lastPublished != nil && lastPublished.Sign() > 0 {
234+
c.lastPublishedBlock.Store(lastPublished.Int64())
229235
}
230236

231-
// Delete all staging data older than the latest committed block number
232-
if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil {
233-
log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err)
237+
c.cleanupProcessedStagingBlocks()
238+
}
239+
240+
func (c *Committer) cleanupProcessedStagingBlocks() {
241+
committed := c.lastCommittedBlock.Load()
242+
published := c.lastPublishedBlock.Load()
243+
if published == 0 || committed == 0 {
234244
return
235245
}
236-
237-
log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber)
246+
limit := committed
247+
if published < limit {
248+
limit = published
249+
}
250+
if limit <= 0 {
251+
return
252+
}
253+
chainID := c.rpc.GetChainID()
254+
blockNumber := big.NewInt(limit)
255+
stagingDeleteStart := time.Now()
256+
if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil {
257+
log.Error().Err(err).Msg("Failed to delete staging data")
258+
return
259+
}
260+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
261+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
238262
}
239263

240264
func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
@@ -462,6 +486,8 @@ func (c *Committer) publish(ctx context.Context) error {
462486
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
463487
return err
464488
}
489+
c.lastPublishedBlock.Store(highest.Int64())
490+
go c.cleanupProcessedStagingBlocks()
465491
return nil
466492
}
467493

@@ -484,25 +510,19 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
484510
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
485511

486512
if config.Cfg.Publisher.Mode == "default" {
513+
highest := highestBlock.Number.Int64()
487514
go func() {
488515
if err := c.publisher.PublishBlockData(blockData); err != nil {
489516
log.Error().Err(err).Msg("Failed to publish block data to kafka")
517+
return
490518
}
519+
c.lastPublishedBlock.Store(highest)
520+
c.cleanupProcessedStagingBlocks()
491521
}()
492522
}
493523

494-
if c.workMode == WorkModeBackfill {
495-
go func() {
496-
stagingDeleteStart := time.Now()
497-
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
498-
log.Error().Err(err).Msg("Failed to delete staging data")
499-
}
500-
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
501-
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
502-
}()
503-
}
504-
505-
c.setLastCommittedBlock(highestBlock.Number)
524+
c.lastCommittedBlock.Store(highestBlock.Number.Int64())
525+
go c.cleanupProcessedStagingBlocks()
506526

507527
// Update metrics for successful commits
508528
metrics.SuccessfulCommits.Add(float64(len(blockData)))

0 commit comments

Comments
 (0)