diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index f116937..14183ae 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -47,8 +47,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer { func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond - ticker := time.NewTicker(interval) - defer ticker.Stop() log.Debug().Msgf("Committer running") for { @@ -56,7 +54,8 @@ func (c *Committer) Start(ctx context.Context) { case <-ctx.Done(): log.Info().Msg("Committer shutting down") return - case <-ticker.C: + default: + time.Sleep(interval) blockDataToCommit, err := c.getSequentialBlockDataToCommit() if err != nil { log.Error().Err(err).Msg("Error getting block data to commit") diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index a5eb131..6a79061 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -130,9 +130,11 @@ func TestCommit(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, } committer := NewCommitter(mockRPC, mockStorage) @@ -186,10 +188,12 @@ func TestStartCommitter(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, } committer := NewCommitter(mockRPC, mockStorage) @@ -218,9 +222,11 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, } committer := NewCommitter(mockRPC, mockStorage) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index a967bf6..c672a91 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1084,6 +1084,9 @@ func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big // TODO make this atomic func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { + if len(*data) == 0 { + return nil + } blocks := make([]common.Block, 0, len(*data)) logs := make([]common.Log, 0) transactions := make([]common.Transaction, 0)