diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index d6988b6f..2aa19a2a 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -1,11 +1,13 @@ package orchestrator import ( + "context" "math/big" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/thirdweb-dev/indexer/internal/rpc" "github.com/thirdweb-dev/indexer/internal/storage" mocks "github.com/thirdweb-dev/indexer/test/mocks" ) @@ -19,6 +21,10 @@ func TestNewCommitter(t *testing.T) { MainStorage: mockMainStorage, StagingStorage: mockStagingStorage, } + + // Mock the GetBlocksPerRequest call that happens in NewWorker + mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100}) + poller := &Poller{} committer := NewCommitter(mockRPC, mockStorage, poller) @@ -61,6 +67,10 @@ func TestCleanupProcessedStagingBlocks(t *testing.T) { StagingStorage: mockStagingStorage, OrchestratorStorage: mockOrchestratorStorage, } + + // Mock the GetBlocksPerRequest call that happens in NewWorker + mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100}) + poller := &Poller{} committer := NewCommitter(mockRPC, mockStorage, poller) @@ -68,13 +78,14 @@ func TestCleanupProcessedStagingBlocks(t *testing.T) { committer.lastCommittedBlock.Store(100) committer.lastPublishedBlock.Store(0) - committer.cleanupProcessedStagingBlocks() + ctx := context.Background() + committer.cleanupProcessedStagingBlocks(ctx) mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything) committer.lastPublishedBlock.Store(90) mockRPC.EXPECT().GetChainID().Return(chainID) mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil) - committer.cleanupProcessedStagingBlocks() + committer.cleanupProcessedStagingBlocks(ctx) } func TestStartCommitter(t *testing.T) { diff --git a/internal/publisher/publisher.go b/internal/publisher/publisher.go index 0f8a761e..68b97ce6 100644 --- a/internal/publisher/publisher.go +++ b/internal/publisher/publisher.go @@ -271,6 +271,16 @@ func (p *Publisher) createBlockMessage(block common.Block, status string) (*kgo. Topic: p.getTopicName("blocks"), Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, block.ChainId.String(), block.Hash)), Value: msgJson, + Headers: []kgo.RecordHeader{ + { + Key: "chain_id", + Value: []byte(block.ChainId.String()), + }, + { + Key: "block_number", + Value: []byte(block.Number.String()), + }, + }, }, nil }