Skip to content

Commit 7325d08

Browse files
authored
block headers (#283)
* add chainid, block_number to kafka headers
1 parent e9f7efc commit 7325d08

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

internal/orchestrator/committer_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package orchestrator
22

33
import (
4+
"context"
45
"math/big"
56
"testing"
67

78
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/mock"
10+
"github.com/thirdweb-dev/indexer/internal/rpc"
911
"github.com/thirdweb-dev/indexer/internal/storage"
1012
mocks "github.com/thirdweb-dev/indexer/test/mocks"
1113
)
@@ -19,6 +21,10 @@ func TestNewCommitter(t *testing.T) {
1921
MainStorage: mockMainStorage,
2022
StagingStorage: mockStagingStorage,
2123
}
24+
25+
// Mock the GetBlocksPerRequest call that happens in NewWorker
26+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
27+
2228
poller := &Poller{}
2329
committer := NewCommitter(mockRPC, mockStorage, poller)
2430

@@ -61,20 +67,25 @@ func TestCleanupProcessedStagingBlocks(t *testing.T) {
6167
StagingStorage: mockStagingStorage,
6268
OrchestratorStorage: mockOrchestratorStorage,
6369
}
70+
71+
// Mock the GetBlocksPerRequest call that happens in NewWorker
72+
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
73+
6474
poller := &Poller{}
6575
committer := NewCommitter(mockRPC, mockStorage, poller)
6676

6777
chainID := big.NewInt(1)
6878
committer.lastCommittedBlock.Store(100)
6979
committer.lastPublishedBlock.Store(0)
7080

71-
committer.cleanupProcessedStagingBlocks()
81+
ctx := context.Background()
82+
committer.cleanupProcessedStagingBlocks(ctx)
7283
mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything)
7384

7485
committer.lastPublishedBlock.Store(90)
7586
mockRPC.EXPECT().GetChainID().Return(chainID)
7687
mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil)
77-
committer.cleanupProcessedStagingBlocks()
88+
committer.cleanupProcessedStagingBlocks(ctx)
7889
}
7990

8091
func TestStartCommitter(t *testing.T) {

internal/publisher/publisher.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,16 @@ func (p *Publisher) createBlockMessage(block common.Block, status string) (*kgo.
271271
Topic: p.getTopicName("blocks"),
272272
Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, block.ChainId.String(), block.Hash)),
273273
Value: msgJson,
274+
Headers: []kgo.RecordHeader{
275+
{
276+
Key: "chain_id",
277+
Value: []byte(block.ChainId.String()),
278+
},
279+
{
280+
Key: "block_number",
281+
Value: []byte(block.Number.String()),
282+
},
283+
},
274284
}, nil
275285
}
276286

0 commit comments

Comments
 (0)