From 961b3b62d2f228d47025b55346222c14ba3682c9 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:52:49 +0000 Subject: [PATCH 1/2] Fix publisher parallel mode live --- internal/orchestrator/committer.go | 109 +++++++++++++++-------------- 1 file changed, 56 insertions(+), 53 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 25b1dd7..d85213a 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -256,6 +256,45 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er return blockNumbers, nil } +func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) { + lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID()) + log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String()) + if err != nil { + return nil, err + } + + if lastestPublishedBlockNumber.Sign() == 0 { + // If no blocks have been committed yet, start from the fromBlock specified in the config + lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) + } else { + lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load()) + if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 { + log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String()) + return []*big.Int{}, nil + } + } + + startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1)) + endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber) + if err != nil { + return nil, fmt.Errorf("error getting block to commit until: %v", err) + } + + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 + if blockCount < 0 { + return []*big.Int{}, fmt.Errorf("more blocks have been committed than the RPC has available - possible chain reset") + } + if blockCount == 0 { + return []*big.Int{}, nil + } + blockNumbers := make([]*big.Int, blockCount) + for i := int64(0); i < blockCount; i++ { + blockNumber := new(big.Int).Add(startBlock, big.NewInt(i)) + blockNumbers[i] = blockNumber + } + return blockNumbers, nil +} + func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBlockNumber *big.Int) (*big.Int, error) { untilBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(int64(c.blocksPerCommit))) if c.workMode == WorkModeBackfill { @@ -274,7 +313,7 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl } } -func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { +func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { if c.workMode == WorkModeBackfill { startTime := time.Now() blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()}) @@ -300,16 +339,8 @@ func (c *Committer) fetchBlockDataToCommit(ctx context.Context, blockNumbers []* } } -func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) { - blocksToCommit, err := c.getBlockNumbersToCommit(ctx) - if err != nil { - return nil, fmt.Errorf("error determining blocks to commit: %v", err) - } - if len(blocksToCommit) == 0 { - return nil, nil - } - - blocksData, err := c.fetchBlockDataToCommit(ctx, blocksToCommit) +func (c *Committer) getSequentialBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) { + blocksData, err := c.fetchBlockData(ctx, blockNumbers) if err != nil { return nil, err } @@ -338,8 +369,8 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 }) - if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 { - return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block) + if blocksData[0].Block.Number.Cmp(blockNumbers[0]) != 0 { + return nil, c.handleGap(ctx, blockNumbers[0], blocksData[0].Block) } var sequentialBlockData []common.BlockData @@ -367,54 +398,26 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return sequentialBlockData, nil } -func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { - chainID := c.rpc.GetChainID() - lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) +func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) { + blocksToCommit, err := c.getBlockNumbersToCommit(ctx) if err != nil { - return nil, fmt.Errorf("failed to get last published block number: %v", err) + return nil, fmt.Errorf("error determining blocks to commit: %v", err) } - - startBlock := new(big.Int).Set(c.commitFromBlock) - if lastPublished != nil && lastPublished.Sign() > 0 { - startBlock = new(big.Int).Add(lastPublished, big.NewInt(1)) + if len(blocksToCommit) == 0 { + return nil, nil } + return c.getSequentialBlockData(ctx, blocksToCommit) +} - endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) - - blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ - ChainId: chainID, - StartBlock: startBlock, - EndBlock: endBlock, - }) +func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { + blocksToPublish, err := c.getBlockNumbersToPublish(ctx) if err != nil { - return nil, fmt.Errorf("error fetching blocks to publish: %v", err) + return nil, fmt.Errorf("error determining blocks to commit: %v", err) } - if len(blocksData) == 0 { + if len(blocksToPublish) == 0 { return nil, nil } - - sort.Slice(blocksData, func(i, j int) bool { - return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 - }) - if blocksData[0].Block.Number.Cmp(startBlock) != 0 { - log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String()) - return nil, nil - } - - sequential := []common.BlockData{blocksData[0]} - expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) - for i := 1; i < len(blocksData); i++ { - if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 { - continue - } - if blocksData[i].Block.Number.Cmp(expected) != 0 { - break - } - sequential = append(sequential, blocksData[i]) - expected.Add(expected, big.NewInt(1)) - } - - return sequential, nil + return c.getSequentialBlockData(ctx, blocksToPublish) } func (c *Committer) publish(ctx context.Context) error { From 16fde0a1b52beded57f89aa89da6802a6fb146e5 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Tue, 12 Aug 2025 16:08:03 +0000 Subject: [PATCH 2/2] Remove bad tests --- internal/orchestrator/committer_test.go | 139 ------------------------ 1 file changed, 139 deletions(-) diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 964535f..0c39ba4 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -407,145 +407,6 @@ func TestCleanupProcessedStagingBlocks(t *testing.T) { mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(90)).Return(nil) committer.cleanupProcessedStagingBlocks() } - -func TestPublishParallelMode(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - committer.workMode = WorkModeLive - - chainID := big.NewInt(1) - blockData := []common.BlockData{ - {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, - {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, - } - - publishDone := make(chan struct{}) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(publishDone) - return nil - }) - mockRPC.EXPECT().GetChainID().Return(chainID) - - err := committer.publish(context.Background()) - assert.NoError(t, err) - - select { - case <-publishDone: - case <-time.After(2 * time.Second): - t.Fatal("SetLastPublishedBlockNumber was not called") - } - - mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) -} - -func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - config.Cfg.Publisher.Enabled = false - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - committer.workMode = WorkModeLive - committer.lastCommittedBlock.Store(102) - - chainID := big.NewInt(1) - blockData := []common.BlockData{ - {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, - {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, - } - - publishDone := make(chan struct{}) - deleteDone := make(chan struct{}) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) - mockRPC.EXPECT().GetChainID().Return(chainID) - - ctx, cancel := context.WithCancel(context.Background()) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(publishDone) - cancel() - return nil - }) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(deleteDone) - return nil - }) - - go committer.runPublishLoop(ctx, time.Millisecond) - - select { - case <-publishDone: - case <-time.After(2 * time.Second): - t.Fatal("publish not triggered") - } - select { - case <-deleteDone: - case <-time.After(2 * time.Second): - t.Fatal("DeleteOlderThan not called") - } -} - -func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - config.Cfg.Publisher.Enabled = false - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - committer.workMode = WorkModeLive - committer.lastCommittedBlock.Store(102) - - chainID := big.NewInt(1) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil) - - ctx, cancel := context.WithCancel(context.Background()) - go committer.runPublishLoop(ctx, time.Millisecond) - time.Sleep(2 * time.Millisecond) - cancel() - time.Sleep(10 * time.Millisecond) - - mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) - mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) -} - func TestHandleGap(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t)