Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ func (rh *ReorgHandler) Start() {
select {}
}

func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
toBlock := new(big.Int).Add(fromBlock, big.NewInt(int64(rh.blocksPerScan)))
func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
fromBlock, toBlock, err := rh.getReorgCheckRange(latestCheckedBlock)
if err != nil {
return nil, err
}
if toBlock.Cmp(latestCheckedBlock) == 0 {
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", toBlock.String(), latestCheckedBlock.String())
return nil, nil
}
log.Debug().Msgf("Checking for reorgs from block %s to %s", fromBlock.String(), toBlock.String())
blockHeaders, err := rh.storage.MainStorage.GetBlockHeadersDescending(rh.rpc.GetChainID(), fromBlock, toBlock)
if err != nil {
Expand All @@ -104,11 +111,6 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
return nil, nil
}
mostRecentBlockHeader := blockHeaders[0]
lastBlockHeader := blockHeaders[len(blockHeaders)-1]
if mostRecentBlockHeader.Number.Cmp(lastBlockHeader.Number) == 0 {
log.Debug().Msgf("Most recent (%s) and last checked (%s) block numbers are equal, skipping reorg check", mostRecentBlockHeader.Number.String(), lastBlockHeader.Number.String())
return nil, nil
}

firstMismatchIndex, err := findIndexOfFirstHashMismatch(blockHeaders)
if err != nil {
Expand Down Expand Up @@ -138,6 +140,27 @@ func (rh *ReorgHandler) RunFromBlock(fromBlock *big.Int) (lastCheckedBlock *big.
return mostRecentBlockHeader.Number, nil
}

func (rh *ReorgHandler) getReorgCheckRange(latestCheckedBlock *big.Int) (*big.Int, *big.Int, error) {
latestCommittedBlock, err := rh.storage.MainStorage.GetMaxBlockNumber(rh.rpc.GetChainID())
if err != nil {
return nil, nil, fmt.Errorf("error getting latest committed block: %w", err)
}
if new(big.Int).Sub(latestCommittedBlock, latestCheckedBlock).Cmp(big.NewInt(int64(rh.blocksPerScan))) < 0 {
// diff between latest committed and latest checked is less than blocksPerScan, so we will look back from the latest committed block
fromBlock := new(big.Int).Sub(latestCommittedBlock, big.NewInt(int64(rh.blocksPerScan)))
if fromBlock.Cmp(big.NewInt(0)) < 0 {
fromBlock = big.NewInt(0)
}
toBlock := new(big.Int).Set(latestCommittedBlock)
return fromBlock, toBlock, nil
} else {
// diff between latest committed and latest checked is greater or equal to blocksPerScan, so we will look forward from the latest checked block
fromBlock := new(big.Int).Set(latestCheckedBlock)
toBlock := new(big.Int).Add(fromBlock, big.NewInt(int64(rh.blocksPerScan)))
return fromBlock, toBlock, nil
}
}

func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (int, error) {
for i := 0; i < len(blockHeadersDescending)-1; i++ {
currentBlock := blockHeadersDescending[i]
Expand Down
103 changes: 102 additions & 1 deletion internal/orchestrator/reorg_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,77 @@ func TestNewReorgHandlerStartsFromConfiguredBlock(t *testing.T) {
assert.Equal(t, big.NewInt(1000), handler.lastCheckedBlock)
}

func TestReorgHandlerRangeIsForwardLookingWhenItIsCatchingUp(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.ReorgHandler.BlocksPerScan = 50

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
handler := NewReorgHandler(mockRPC, mockStorage)

fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(100))
assert.NoError(t, err)
assert.Equal(t, big.NewInt(100), fromBlock)
assert.Equal(t, big.NewInt(150), toBlock)
}
func TestReorgHandlerRangeIsBackwardLookingWhenItIsCaughtUp(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.ReorgHandler.BlocksPerScan = 50

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)
handler := NewReorgHandler(mockRPC, mockStorage)

fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(990))
assert.NoError(t, err)
assert.Equal(t, big.NewInt(950), fromBlock)
assert.Equal(t, big.NewInt(1000), toBlock)
}

func TestReorgHandlerRangeStartIs0WhenRangeIsLargerThanProcessedBlocks(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.ReorgHandler.BlocksPerScan = 50

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(0), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10), nil)
handler := NewReorgHandler(mockRPC, mockStorage)

fromBlock, toBlock, err := handler.getReorgCheckRange(big.NewInt(10))
assert.NoError(t, err)
assert.Equal(t, big.NewInt(0), fromBlock)
assert.Equal(t, big.NewInt(10), toBlock)
}

func TestFindReorgEndIndex(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -451,8 +522,9 @@ func TestStartReorgHandler(t *testing.T) {
OrchestratorStorage: mockOrchestratorStorage,
}

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1)).Times(5)
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1)).Times(7)
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(2000), nil).Times(1)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(100000), nil)
handler := NewReorgHandler(mockRPC, mockStorage)
handler.triggerInterval = 100 // Set a short interval for testing

Expand All @@ -470,6 +542,30 @@ func TestStartReorgHandler(t *testing.T) {
time.Sleep(250 * time.Millisecond)
}

func TestReorgHandlingIsSkippedIfMostRecentAndLastCheckedBlockAreSame(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.ReorgHandler.BlocksPerScan = 10

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)

handler := NewReorgHandler(mockRPC, mockStorage)
mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(100))

assert.NoError(t, err)
assert.Nil(t, mostRecentBlockChecked)
}

func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.ReorgHandler.BlocksPerScan = 10
Expand All @@ -486,6 +582,7 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)

mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"},
Expand Down Expand Up @@ -543,6 +640,7 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)

mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"}, // <-- fork starts here
Expand Down Expand Up @@ -610,6 +708,7 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(100), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(1000), nil)

mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(99), big.NewInt(109)).Return([]common.BlockHeader{
{Number: big.NewInt(109), Hash: "hash109", ParentHash: "hash108"},
Expand Down Expand Up @@ -672,6 +771,7 @@ func TestHandleReorgWithDuplicateBlocks(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(6268164), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10000000), nil)

mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(6268162), big.NewInt(6268172)).Return([]common.BlockHeader{
{Number: big.NewInt(6268172), Hash: "0x69d2044d27d2879c309fd885eb0c7d915c9aeed9b28df460d3b52cb4ccf888d8", ParentHash: "0xbf44d12afe40ef30effa32ed45c8d26d854ffba1c8ad781117117e7d18ca157f"},
Expand Down Expand Up @@ -708,6 +808,7 @@ func TestNothingIsDoneForCorrectBlocks(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(6268164), nil)
mockMainStorage.EXPECT().GetMaxBlockNumber(big.NewInt(1)).Return(big.NewInt(10000000), nil)

mockMainStorage.EXPECT().GetBlockHeadersDescending(big.NewInt(1), big.NewInt(6268163), big.NewInt(6268173)).Return([]common.BlockHeader{
{Number: big.NewInt(6268173), Hash: "0xa281ed679e6f7d0ede5fffdd3528348f303bc456d8d83e6bbe7ad0708f8f9b10", ParentHash: "0x69d2044d27d2879c309fd885eb0c7d915c9aeed9b28df460d3b52cb4ccf888d8"},
Expand Down