diff --git a/cmd/root.go b/cmd/root.go index 1cce104..6ba9702 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,6 +135,7 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") + rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher") rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks") @@ -250,6 +251,7 @@ func init() { viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression")) viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout")) viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled")) + viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode")) viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers")) viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled")) viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName")) diff --git a/configs/config.example.yml b/configs/config.example.yml index 33c46d6..9636521 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -190,9 +190,11 @@ api: publisher: # Whether the publisher is enabled enabled: true + # Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing + mode: default # Kafka broker addresses (comma-separated) brokers: localhost:9092 - + # Block publishing configuration blocks: # Whether to publish block data diff --git a/configs/config.go b/configs/config.go index d2d1017..0be0feb 100644 --- a/configs/config.go +++ b/configs/config.go @@ -172,6 +172,7 @@ type EventPublisherConfig struct { type PublisherConfig struct { Enabled bool `mapstructure:"enabled"` + Mode string `mapstructure:"mode"` Brokers string `mapstructure:"brokers"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` diff --git a/configs/test_config.yml b/configs/test_config.yml index 09c29bd..a817c6d 100644 --- a/configs/test_config.yml +++ b/configs/test_config.yml @@ -64,6 +64,7 @@ api: publisher: enabled: false + mode: default validation: mode: minimal diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index e95978e..f977eb2 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sort" + "sync" "time" "github.com/rs/zerolog/log" @@ -83,11 +84,33 @@ func (c *Committer) Start(ctx context.Context) { // Clean up staging data before starting the committer c.cleanupStagingData() + if config.Cfg.Publisher.Mode == "parallel" { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + c.runCommitLoop(ctx, interval) + }() + go func() { + defer wg.Done() + c.runPublishLoop(ctx, interval) + }() + <-ctx.Done() + wg.Wait() + log.Info().Msg("Committer shutting down") + c.publisher.Close() + return + } + + c.runCommitLoop(ctx, interval) + log.Info().Msg("Committer shutting down") + c.publisher.Close() +} + +func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { for { select { case <-ctx.Done(): - log.Info().Msg("Committer shutting down") - c.publisher.Close() return case workMode := <-c.workModeChan: if workMode != c.workMode && workMode != "" { @@ -116,6 +139,24 @@ func (c *Committer) Start(ctx context.Context) { } } +func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } + } +} + func (c *Committer) cleanupStagingData() { // Get the last committed block number from main storage latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) @@ -293,13 +334,88 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return sequentialBlockData, nil } +func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + return nil, fmt.Errorf("failed to get last published block number: %v", err) + } + + startBlock := new(big.Int).Set(c.commitFromBlock) + if lastPublished != nil && lastPublished.Sign() > 0 { + startBlock = new(big.Int).Add(lastPublished, big.NewInt(1)) + } + + endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 + blockNumbers := make([]*big.Int, blockCount) + for i := int64(0); i < blockCount; i++ { + blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i)) + } + + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers}) + if err != nil { + return nil, fmt.Errorf("error fetching blocks to publish: %v", err) + } + if len(blocksData) == 0 { + return nil, nil + } + + sort.Slice(blocksData, func(i, j int) bool { + return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 + }) + if blocksData[0].Block.Number.Cmp(startBlock) != 0 { + log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String()) + return nil, nil + } + + sequential := []common.BlockData{blocksData[0]} + expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) + for i := 1; i < len(blocksData); i++ { + if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 { + continue + } + if blocksData[i].Block.Number.Cmp(expected) != 0 { + break + } + sequential = append(sequential, blocksData[i]) + expected.Add(expected, big.NewInt(1)) + } + + return sequential, nil +} + +func (c *Committer) publish(ctx context.Context) error { + blockData, err := c.getSequentialBlockDataToPublish(ctx) + if err != nil { + return err + } + if len(blockData) == 0 { + return nil + } + + if err := c.publisher.PublishBlockData(blockData); err != nil { + return err + } + + chainID := c.rpc.GetChainID() + highest := blockData[len(blockData)-1].Block.Number + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { + return err + } + return nil +} + func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { blockNumbers := make([]*big.Int, len(blockData)) + highestBlock := blockData[0].Block for i, block := range blockData { blockNumbers[i] = block.Block.Number + if block.Block.Number.Cmp(highestBlock.Number) > 0 { + highestBlock = block.Block + } } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) - mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) @@ -308,11 +424,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - } - }() + if config.Cfg.Publisher.Mode == "default" { + go func() { + if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + } + }() + } if c.workMode == WorkModeBackfill { go func() { @@ -325,13 +443,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er }() } - // Find highest block number from committed blocks - highestBlock := blockData[0].Block - for _, block := range blockData { - if block.Block.Number.Cmp(highestBlock.Number) > 0 { - highestBlock = block.Block - } - } c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number) // Update metrics for successful commits diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index e1c4f16..57af836 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -324,9 +324,10 @@ func TestCommit(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeBackfill + chainID := big.NewInt(1) blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } // Create a channel to signal when DeleteStagingData is called @@ -350,6 +351,80 @@ func TestCommit(t *testing.T) { } } +func TestCommitParallelPublisherMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) + + err := committer.commit(context.Background(), blockData) + assert.NoError(t, err) + + mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) +} + +func TestPublishParallelMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + publishDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + return nil + }) + + err := committer.publish(context.Background()) + assert.NoError(t, err) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("SetLastPublishedBlockNumber was not called") + } +} + func TestHandleGap(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 26b5fea..2edcf48 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { return batch.Send() } +func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database) + if chainId != nil && chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String()) + return c.conn.Exec(context.Background(), query) +} + func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 66c1d90..6286713 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,8 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3dc773d..c28c016 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -344,6 +344,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return err } +func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1` + + var blockNumberString string + err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value) + VALUES ($1, 'publish', $2) + ON CONFLICT (chain_id, cursor_type) + DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()` + + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { query := `SELECT MAX(block_number) FROM block_data WHERE 1=1` diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 5931f59..14f8e68 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -72,6 +72,111 @@ func (_c *MockIStagingStorage_DeleteStagingData_Call) RunAndReturn(run func([]co return _c } +// GetLastPublishedBlockNumber provides a mock function with given fields: chainId +func (_m *MockIStagingStorage) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + ret := _m.Called(chainId) + + if len(ret) == 0 { + panic("no return value specified for GetLastPublishedBlockNumber") + } + + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int) (*big.Int, error)); ok { + return rf(chainId) + } + if rf, ok := ret.Get(0).(func(*big.Int) *big.Int); ok { + r0 = rf(chainId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int) error); ok { + r1 = rf(chainId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIStagingStorage_GetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastPublishedBlockNumber' +type MockIStagingStorage_GetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// GetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +func (_e *MockIStagingStorage_Expecter) GetLastPublishedBlockNumber(chainId interface{}) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_GetLastPublishedBlockNumber_Call{Call: _e.mock.On("GetLastPublishedBlockNumber", chainId)} +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Return(maxBlockNumber *big.Int, err error) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(maxBlockNumber, err) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + +// SetLastPublishedBlockNumber provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for SetLastPublishedBlockNumber") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_SetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLastPublishedBlockNumber' +type MockIStagingStorage_SetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// SetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) SetLastPublishedBlockNumber(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_SetLastPublishedBlockNumber_Call{Call: _e.mock.On("SetLastPublishedBlockNumber", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Return(_a0 error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + // GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeStart, rangeEnd func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { ret := _m.Called(chainId, rangeStart, rangeEnd)