Skip to content

Commit eea71f4

Browse files
committed
Until block for committer
1 parent 92a35ab commit eea71f4

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type CommitterConfig struct {
3030
Interval int `mapstructure:"interval"`
3131
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
3232
FromBlock int `mapstructure:"fromBlock"`
33+
UntilBlock int `mapstructure:"untilBlock"`
3334
}
3435

3536
type ReorgHandlerConfig struct {

internal/orchestrator/committer.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Committer struct {
2626
blocksPerCommit int
2727
storage storage.IStorage
2828
commitFromBlock *big.Int
29+
commitUntilBlock *big.Int
2930
rpc rpc.IRPCClient
3031
lastCommittedBlock atomic.Uint64
3132
lastPublishedBlock atomic.Uint64
@@ -60,12 +61,23 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
6061
blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT
6162
}
6263

64+
commitUntilBlock := config.Cfg.Committer.UntilBlock
65+
if commitUntilBlock == 0 {
66+
// default to match the poller.untilBlock
67+
if config.Cfg.Poller.UntilBlock != 0 {
68+
commitUntilBlock = config.Cfg.Poller.UntilBlock
69+
} else {
70+
commitUntilBlock = -1
71+
}
72+
}
73+
6374
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
6475
committer := &Committer{
6576
triggerIntervalMs: triggerInterval,
6677
blocksPerCommit: blocksPerCommit,
6778
storage: storage,
6879
commitFromBlock: commitFromBlock,
80+
commitUntilBlock: big.NewInt(int64(commitUntilBlock)),
6981
rpc: rpc,
7082
publisher: publisher.GetInstance(),
7183
workMode: "",
@@ -204,6 +216,7 @@ func (c *Committer) Start(ctx context.Context) {
204216
}
205217

206218
c.runCommitLoop(ctx, interval)
219+
207220
log.Info().Msg("Committer shutting down")
208221
c.publisher.Close()
209222
}
@@ -232,6 +245,11 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
232245
log.Debug().Msg("Committer work mode not set, skipping commit")
233246
continue
234247
}
248+
if c.commitUntilBlock.Sign() > 0 && c.lastCommittedBlock.Load() > c.commitUntilBlock.Uint64() {
249+
// Completing the commit loop if we've committed more than commit until block
250+
log.Info().Msgf("Committer reached configured untilBlock %s, the last commit block is %d, stopping commits", c.commitUntilBlock.String(), c.lastCommittedBlock.Load())
251+
return
252+
}
235253
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
236254
if err != nil {
237255
log.Error().Err(err).Msg("Error getting block data to commit")

0 commit comments

Comments
 (0)