diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 599ea9a..5aaf6c2 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -29,6 +29,7 @@ type Committer struct { publisher *publisher.Publisher workMode WorkMode workModeChan chan WorkMode + validator *Validator } type CommitterOption func(*Committer) @@ -39,6 +40,12 @@ func WithCommitterWorkModeChan(ch chan WorkMode) CommitterOption { } } +func WithValidator(validator *Validator) CommitterOption { + return func(c *Committer) { + c.validator = validator + } +} + func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...CommitterOption) *Committer { triggerInterval := config.Cfg.Committer.Interval if triggerInterval == 0 { @@ -210,6 +217,18 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return nil, nil } + if c.validator != nil { + validBlocks, invalidBlocks, err := c.validator.ValidateBlocks(blocksData) + if err != nil { + return nil, err + } + if len(invalidBlocks) > 0 { + log.Warn().Msgf("Found %d invalid blocks in commit batch, continuing with %d valid blocks", len(invalidBlocks), len(validBlocks)) + // continue with valid blocks only + blocksData = validBlocks + } + } + // Sort blocks by block number sort.Slice(blocksData, func(i, j int) bool { return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index d2e4bb4..154dc89 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -85,7 +85,8 @@ func (o *Orchestrator) Start() { committerWorkModeChan := make(chan WorkMode, 1) workModeMonitor.RegisterChannel(committerWorkModeChan) defer workModeMonitor.UnregisterChannel(committerWorkModeChan) - committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan)) + validator := NewValidator(o.rpc, o.storage) + committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan), WithValidator(validator)) committer.Start(ctx) }() }