From 4c5320e1a4a4712e6385ad6d4360887199972dbd Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 02:20:00 -0400 Subject: [PATCH 1/3] fix: refine publisher pre-commit flag handling --- cmd/root.go | 2 + configs/config.example.yml | 4 +- configs/config.go | 1 + configs/test_config.yml | 1 + internal/orchestrator/committer.go | 47 ++++++++--- internal/orchestrator/committer_test.go | 81 +++++++++++++++++- internal/storage/clickhouse.go | 25 ++++++ internal/storage/connector.go | 2 + internal/storage/postgres.go | 29 +++++++ test/mocks/MockIStagingStorage.go | 105 ++++++++++++++++++++++++ 10 files changed, 282 insertions(+), 15 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 1cce104..f152a87 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,6 +135,7 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") + rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher") rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks") @@ -250,6 +251,7 @@ func init() { viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression")) viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout")) viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled")) + viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode")) viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers")) viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled")) viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName")) diff --git a/configs/config.example.yml b/configs/config.example.yml index 33c46d6..21fc18a 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -190,9 +190,11 @@ api: publisher: # Whether the publisher is enabled enabled: true + # Publisher mode: "pre-commit" publishes before writing to storage, "post-commit" publishes after commit + mode: post-commit # Kafka broker addresses (comma-separated) brokers: localhost:9092 - + # Block publishing configuration blocks: # Whether to publish block data diff --git a/configs/config.go b/configs/config.go index d2d1017..0be0feb 100644 --- a/configs/config.go +++ b/configs/config.go @@ -172,6 +172,7 @@ type EventPublisherConfig struct { type PublisherConfig struct { Enabled bool `mapstructure:"enabled"` + Mode string `mapstructure:"mode"` Brokers string `mapstructure:"brokers"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` diff --git a/configs/test_config.yml b/configs/test_config.yml index 09c29bd..6504d32 100644 --- a/configs/test_config.yml +++ b/configs/test_config.yml @@ -64,6 +64,7 @@ api: publisher: enabled: false + mode: post-commit validation: mode: minimal diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index e95978e..6337647 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -295,11 +295,39 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { blockNumbers := make([]*big.Int, len(blockData)) + highestBlock := blockData[0].Block for i, block := range blockData { blockNumbers[i] = block.Block.Number + if block.Block.Number.Cmp(highestBlock.Number) > 0 { + highestBlock = block.Block + } } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) + shouldPostCommitPublish := true + + if config.Cfg.Publisher.Mode == "pre-commit" { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + log.Error().Err(err).Msg("Failed to get last published block number, falling back to post-commit") + } else if lastPublished == nil || lastPublished.Cmp(highestBlock.Number) < 0 { + go func() { + if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + return + } + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil { + log.Error().Err(err).Msg("Failed to set last published block number") + } + }() + shouldPostCommitPublish = false + } else { + log.Debug().Msgf("Skipping publish, latest published block %s >= current %s", lastPublished.String(), highestBlock.Number.String()) + shouldPostCommitPublish = false + } + } + mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) @@ -308,11 +336,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - } - }() + if shouldPostCommitPublish { + go func() { + if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + } + }() + } if c.workMode == WorkModeBackfill { go func() { @@ -325,13 +355,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er }() } - // Find highest block number from committed blocks - highestBlock := blockData[0].Block - for _, block := range blockData { - if block.Block.Number.Cmp(highestBlock.Number) > 0 { - highestBlock = block.Block - } - } c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number) // Update metrics for successful commits diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index e1c4f16..bf2e1d9 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "errors" "math/big" "testing" "time" @@ -324,9 +325,10 @@ func TestCommit(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeBackfill + chainID := big.NewInt(1) blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } // Create a channel to signal when DeleteStagingData is called @@ -350,6 +352,81 @@ func TestCommit(t *testing.T) { } } +func TestCommitPreCommitPublisherMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "pre-commit" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + publishDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + return nil + }) + mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) + + err := committer.commit(context.Background(), blockData) + assert.NoError(t, err) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("SetLastPublishedBlockNumber was not called") + } +} + +func TestCommitPreCommitPublisherModeFallback(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "pre-commit" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(nil, errors.New("boom")) + mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) + + err := committer.commit(context.Background(), blockData) + assert.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) +} + func TestHandleGap(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 26b5fea..37e6664 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { return batch.Send() } +func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String()) + return c.conn.Exec(context.Background(), query) +} + func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 66c1d90..6286713 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,8 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3dc773d..c28c016 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -344,6 +344,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return err } +func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1` + + var blockNumberString string + err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value) + VALUES ($1, 'publish', $2) + ON CONFLICT (chain_id, cursor_type) + DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()` + + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { query := `SELECT MAX(block_number) FROM block_data WHERE 1=1` diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 5931f59..14f8e68 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -72,6 +72,111 @@ func (_c *MockIStagingStorage_DeleteStagingData_Call) RunAndReturn(run func([]co return _c } +// GetLastPublishedBlockNumber provides a mock function with given fields: chainId +func (_m *MockIStagingStorage) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + ret := _m.Called(chainId) + + if len(ret) == 0 { + panic("no return value specified for GetLastPublishedBlockNumber") + } + + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int) (*big.Int, error)); ok { + return rf(chainId) + } + if rf, ok := ret.Get(0).(func(*big.Int) *big.Int); ok { + r0 = rf(chainId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int) error); ok { + r1 = rf(chainId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIStagingStorage_GetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastPublishedBlockNumber' +type MockIStagingStorage_GetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// GetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +func (_e *MockIStagingStorage_Expecter) GetLastPublishedBlockNumber(chainId interface{}) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_GetLastPublishedBlockNumber_Call{Call: _e.mock.On("GetLastPublishedBlockNumber", chainId)} +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Return(maxBlockNumber *big.Int, err error) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(maxBlockNumber, err) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + +// SetLastPublishedBlockNumber provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for SetLastPublishedBlockNumber") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_SetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLastPublishedBlockNumber' +type MockIStagingStorage_SetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// SetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) SetLastPublishedBlockNumber(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_SetLastPublishedBlockNumber_Call{Call: _e.mock.On("SetLastPublishedBlockNumber", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Return(_a0 error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + // GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeStart, rangeEnd func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { ret := _m.Called(chainId, rangeStart, rangeEnd) From 7fd4e932732dea32e9c4171e24519f9ba1d9d8c4 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 03:22:56 -0400 Subject: [PATCH 2/3] feat: add parallel publisher mode --- cmd/root.go | 2 +- configs/config.example.yml | 4 +- configs/test_config.yml | 2 +- internal/orchestrator/committer.go | 144 +++++++++++++++++++----- internal/orchestrator/committer_test.go | 44 ++++---- 5 files changed, 141 insertions(+), 55 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index f152a87..6ba9702 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,7 +135,7 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") - rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit") + rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher") rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks") diff --git a/configs/config.example.yml b/configs/config.example.yml index 21fc18a..9636521 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -190,8 +190,8 @@ api: publisher: # Whether the publisher is enabled enabled: true - # Publisher mode: "pre-commit" publishes before writing to storage, "post-commit" publishes after commit - mode: post-commit + # Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing + mode: default # Kafka broker addresses (comma-separated) brokers: localhost:9092 diff --git a/configs/test_config.yml b/configs/test_config.yml index 6504d32..a817c6d 100644 --- a/configs/test_config.yml +++ b/configs/test_config.yml @@ -64,7 +64,7 @@ api: publisher: enabled: false - mode: post-commit + mode: default validation: mode: minimal diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 6337647..f977eb2 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sort" + "sync" "time" "github.com/rs/zerolog/log" @@ -83,11 +84,33 @@ func (c *Committer) Start(ctx context.Context) { // Clean up staging data before starting the committer c.cleanupStagingData() + if config.Cfg.Publisher.Mode == "parallel" { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + c.runCommitLoop(ctx, interval) + }() + go func() { + defer wg.Done() + c.runPublishLoop(ctx, interval) + }() + <-ctx.Done() + wg.Wait() + log.Info().Msg("Committer shutting down") + c.publisher.Close() + return + } + + c.runCommitLoop(ctx, interval) + log.Info().Msg("Committer shutting down") + c.publisher.Close() +} + +func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { for { select { case <-ctx.Done(): - log.Info().Msg("Committer shutting down") - c.publisher.Close() return case workMode := <-c.workModeChan: if workMode != c.workMode && workMode != "" { @@ -116,6 +139,24 @@ func (c *Committer) Start(ctx context.Context) { } } +func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } + } +} + func (c *Committer) cleanupStagingData() { // Get the last committed block number from main storage latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) @@ -293,6 +334,78 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return sequentialBlockData, nil } +func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + return nil, fmt.Errorf("failed to get last published block number: %v", err) + } + + startBlock := new(big.Int).Set(c.commitFromBlock) + if lastPublished != nil && lastPublished.Sign() > 0 { + startBlock = new(big.Int).Add(lastPublished, big.NewInt(1)) + } + + endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 + blockNumbers := make([]*big.Int, blockCount) + for i := int64(0); i < blockCount; i++ { + blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i)) + } + + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers}) + if err != nil { + return nil, fmt.Errorf("error fetching blocks to publish: %v", err) + } + if len(blocksData) == 0 { + return nil, nil + } + + sort.Slice(blocksData, func(i, j int) bool { + return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 + }) + if blocksData[0].Block.Number.Cmp(startBlock) != 0 { + log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String()) + return nil, nil + } + + sequential := []common.BlockData{blocksData[0]} + expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) + for i := 1; i < len(blocksData); i++ { + if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 { + continue + } + if blocksData[i].Block.Number.Cmp(expected) != 0 { + break + } + sequential = append(sequential, blocksData[i]) + expected.Add(expected, big.NewInt(1)) + } + + return sequential, nil +} + +func (c *Committer) publish(ctx context.Context) error { + blockData, err := c.getSequentialBlockDataToPublish(ctx) + if err != nil { + return err + } + if len(blockData) == 0 { + return nil + } + + if err := c.publisher.PublishBlockData(blockData); err != nil { + return err + } + + chainID := c.rpc.GetChainID() + highest := blockData[len(blockData)-1].Block.Number + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { + return err + } + return nil +} + func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { blockNumbers := make([]*big.Int, len(blockData)) highestBlock := blockData[0].Block @@ -303,31 +416,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er } } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) - - shouldPostCommitPublish := true - - if config.Cfg.Publisher.Mode == "pre-commit" { - chainID := c.rpc.GetChainID() - lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) - if err != nil { - log.Error().Err(err).Msg("Failed to get last published block number, falling back to post-commit") - } else if lastPublished == nil || lastPublished.Cmp(highestBlock.Number) < 0 { - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - return - } - if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil { - log.Error().Err(err).Msg("Failed to set last published block number") - } - }() - shouldPostCommitPublish = false - } else { - log.Debug().Msgf("Skipping publish, latest published block %s >= current %s", lastPublished.String(), highestBlock.Number.String()) - shouldPostCommitPublish = false - } - } - mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) @@ -336,7 +424,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - if shouldPostCommitPublish { + if config.Cfg.Publisher.Mode == "default" { go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index bf2e1d9..57af836 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -2,7 +2,6 @@ package orchestrator import ( "context" - "errors" "math/big" "testing" "time" @@ -352,9 +351,9 @@ func TestCommit(t *testing.T) { } } -func TestCommitPreCommitPublisherMode(t *testing.T) { +func TestCommitParallelPublisherMode(t *testing.T) { defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "pre-commit" + config.Cfg.Publisher.Mode = "parallel" mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) @@ -374,29 +373,18 @@ func TestCommitPreCommitPublisherMode(t *testing.T) { {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - publishDone := make(chan struct{}) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(publishDone) - return nil - }) mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) err := committer.commit(context.Background(), blockData) assert.NoError(t, err) - select { - case <-publishDone: - case <-time.After(2 * time.Second): - t.Fatal("SetLastPublishedBlockNumber was not called") - } + mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) } -func TestCommitPreCommitPublisherModeFallback(t *testing.T) { +func TestPublishParallelMode(t *testing.T) { defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "pre-commit" + config.Cfg.Publisher.Mode = "parallel" mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) @@ -416,15 +404,25 @@ func TestCommitPreCommitPublisherModeFallback(t *testing.T) { {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } + publishDone := make(chan struct{}) + mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(nil, errors.New("boom")) - mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + return nil + }) - err := committer.commit(context.Background(), blockData) + err := committer.publish(context.Background()) assert.NoError(t, err) - time.Sleep(100 * time.Millisecond) - mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("SetLastPublishedBlockNumber was not called") + } } func TestHandleGap(t *testing.T) { From d6362fea30380e0133592cce7fdb09d5ffbe4e29 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 03:24:04 -0400 Subject: [PATCH 3/3] Update internal/storage/clickhouse.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: Jake Loo <2171134+jakeloo@users.noreply.github.com> --- internal/storage/clickhouse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 37e6664..2edcf48 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1077,7 +1077,7 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database) - if chainId.Sign() > 0 { + if chainId != nil && chainId.Sign() > 0 { query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) } var blockNumberString string