Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {

func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()

log.Debug().Msgf("Committer running")
for {
select {
case <-ctx.Done():
log.Info().Msg("Committer shutting down")
return
case <-ticker.C:
default:
time.Sleep(interval)
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
if err != nil {
log.Error().Err(err).Msg("Error getting block data to commit")
Expand Down
18 changes: 12 additions & 6 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ func TestCommit(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)

Expand Down Expand Up @@ -186,10 +188,12 @@ func TestStartCommitter(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

committer := NewCommitter(mockRPC, mockStorage)
Expand Down Expand Up @@ -218,9 +222,11 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

committer := NewCommitter(mockRPC, mockStorage)
Expand Down
3 changes: 3 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,9 @@ func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
if len(*data) == 0 {
return nil
}
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
Expand Down