@@ -71,45 +71,55 @@ func (rh *ReorgHandler) Start() {
7171 log .Debug ().Msgf ("Reorg handler running" )
7272 go func () {
7373 for range ticker .C {
74- lookbackFrom := new (big.Int ).Add (rh .lastCheckedBlock , big .NewInt (int64 (rh .blocksPerScan )))
75- blockHeaders , err := rh .storage .MainStorage .LookbackBlockHeaders (rh .rpc .GetChainID (), rh .blocksPerScan , lookbackFrom )
74+ // need to include lastCheckedBlock to check if next block's parent matches
75+ lookbackFrom := new (big.Int ).Add (rh .lastCheckedBlock , big .NewInt (int64 (rh .blocksPerScan - 1 )))
76+ mostRecentBlockChecked , err := rh .RunFromBlock (lookbackFrom )
7677 if err != nil {
77- log .Error ().Err (err ).Msg ("Error getting recent block headers " )
78+ log .Error ().Err (err ).Msg ("Error during reorg handling " )
7879 continue
7980 }
80- if len (blockHeaders ) == 0 {
81- log .Warn ().Msg ("No block headers found" )
81+ if mostRecentBlockChecked == nil {
8282 continue
8383 }
84- mostRecentBlockHeader := blockHeaders [0 ]
85- reorgEndIndex := findReorgEndIndex (blockHeaders )
86- if reorgEndIndex == - 1 {
87- rh .lastCheckedBlock = mostRecentBlockHeader .Number
88- rh .storage .OrchestratorStorage .SetLastReorgCheckedBlockNumber (rh .rpc .GetChainID (), mostRecentBlockHeader .Number )
89- metrics .ReorgHandlerLastCheckedBlock .Set (float64 (mostRecentBlockHeader .Number .Int64 ()))
90- continue
91- }
92- metrics .ReorgCounter .Inc ()
93- forkPoint , err := rh .findForkPoint (blockHeaders [reorgEndIndex :])
94- if err != nil {
95- log .Error ().Err (err ).Msg ("Error while finding fork point" )
96- continue
97- }
98- err = rh .handleReorg (forkPoint , lookbackFrom )
99- if err != nil {
100- log .Error ().Err (err ).Msg ("Error while handling reorg" )
101- continue
102- }
103- rh .lastCheckedBlock = mostRecentBlockHeader .Number
104- rh .storage .OrchestratorStorage .SetLastReorgCheckedBlockNumber (rh .rpc .GetChainID (), mostRecentBlockHeader .Number )
105- metrics .ReorgHandlerLastCheckedBlock .Set (float64 (mostRecentBlockHeader .Number .Int64 ()))
84+
85+ rh .lastCheckedBlock = mostRecentBlockChecked
86+ rh .storage .OrchestratorStorage .SetLastReorgCheckedBlockNumber (rh .rpc .GetChainID (), mostRecentBlockChecked )
87+ metrics .ReorgHandlerLastCheckedBlock .Set (float64 (mostRecentBlockChecked .Int64 ()))
10688 }
10789 }()
10890
10991 // Keep the program running (otherwise it will exit)
11092 select {}
11193}
11294
95+ func (rh * ReorgHandler ) RunFromBlock (lookbackFrom * big.Int ) (lastCheckedBlock * big.Int , err error ) {
96+ blockHeaders , err := rh .storage .MainStorage .LookbackBlockHeaders (rh .rpc .GetChainID (), rh .blocksPerScan , lookbackFrom )
97+ if err != nil {
98+ return nil , fmt .Errorf ("error getting recent block headers: %w" , err )
99+ }
100+ if len (blockHeaders ) == 0 {
101+ log .Warn ().Msg ("No block headers found during reorg handling" )
102+ return nil , nil
103+ }
104+ mostRecentBlockHeader := blockHeaders [0 ]
105+ log .Debug ().Msgf ("Checking for reorgs from block %s to %s" , mostRecentBlockHeader .Number .String (), blockHeaders [len (blockHeaders )- 1 ].Number .String ())
106+ reorgEndIndex := findReorgEndIndex (blockHeaders )
107+ if reorgEndIndex == - 1 {
108+ return mostRecentBlockHeader .Number , nil
109+ }
110+ reorgEndBlock := blockHeaders [reorgEndIndex ].Number
111+ metrics .ReorgCounter .Inc ()
112+ forkPoint , err := rh .findFirstForkedBlockNumber (blockHeaders [reorgEndIndex :])
113+ if err != nil {
114+ return nil , fmt .Errorf ("error while finding fork point: %w" , err )
115+ }
116+ err = rh .handleReorg (forkPoint , reorgEndBlock )
117+ if err != nil {
118+ return nil , fmt .Errorf ("error while handling reorg: %w" , err )
119+ }
120+ return mostRecentBlockHeader .Number , nil
121+ }
122+
113123func findReorgEndIndex (reversedBlockHeaders []common.BlockHeader ) (index int ) {
114124 for i := 0 ; i < len (reversedBlockHeaders )- 1 ; i ++ {
115125 currentBlock := reversedBlockHeaders [i ]
@@ -129,20 +139,21 @@ func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
129139 return - 1
130140}
131141
132- func (rh * ReorgHandler ) findForkPoint (reversedBlockHeaders []common.BlockHeader ) (forkPoint * big.Int , err error ) {
142+ func (rh * ReorgHandler ) findFirstForkedBlockNumber (reversedBlockHeaders []common.BlockHeader ) (forkPoint * big.Int , err error ) {
133143 newBlocksByNumber , err := rh .getNewBlocksByNumber (reversedBlockHeaders )
134144 if err != nil {
135145 return nil , err
136146 }
137147
138- for i := 0 ; i < len (reversedBlockHeaders )- 1 ; i ++ {
148+ // skip first because that is the reorg end block
149+ for i := 1 ; i < len (reversedBlockHeaders ); i ++ {
139150 blockHeader := reversedBlockHeaders [i ]
140151 block , ok := (* newBlocksByNumber )[blockHeader .Number .String ()]
141152 if ! ok {
142153 return nil , fmt .Errorf ("block not found: %s" , blockHeader .Number .String ())
143154 }
144- if block .Hash == blockHeader .Hash {
145- previousBlock := reversedBlockHeaders [i + 1 ]
155+ if blockHeader . ParentHash == block .ParentHash && blockHeader . Hash == block .Hash {
156+ previousBlock := reversedBlockHeaders [i - 1 ]
146157 return previousBlock .Number , nil
147158 }
148159 }
@@ -151,7 +162,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
151162 if err != nil {
152163 return nil , fmt .Errorf ("error getting next headers batch: %w" , err )
153164 }
154- return rh .findForkPoint (nextHeadersBatch )
165+ return rh .findFirstForkedBlockNumber (nextHeadersBatch )
155166}
156167
157168func (rh * ReorgHandler ) getNewBlocksByNumber (reversedBlockHeaders []common.BlockHeader ) (* map [string ]common.Block , error ) {
@@ -171,13 +182,15 @@ func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.Block
171182}
172183
173184func (rh * ReorgHandler ) handleReorg (reorgStart * big.Int , reorgEnd * big.Int ) error {
185+ log .Debug ().Msgf ("Handling reorg from block %s to %s" , reorgStart .String (), reorgEnd .String ())
174186 blockRange := make ([]* big.Int , 0 , new (big.Int ).Sub (reorgEnd , reorgStart ).Int64 ())
175187 for i := new (big.Int ).Set (reorgStart ); i .Cmp (reorgEnd ) <= 0 ; i .Add (i , big .NewInt (1 )) {
176188 blockRange = append (blockRange , new (big.Int ).Set (i ))
177189 }
178190
179191 results := rh .worker .Run (blockRange )
180192 data := make ([]common.BlockData , 0 , len (results ))
193+ blocksToDelete := make ([]* big.Int , 0 , len (results ))
181194 for _ , result := range results {
182195 if result .Error != nil {
183196 return fmt .Errorf ("cannot fix reorg: failed block %s: %w" , result .BlockNumber .String (), result .Error )
@@ -188,10 +201,11 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
188201 Transactions : result .Data .Transactions ,
189202 Traces : result .Data .Traces ,
190203 })
204+ blocksToDelete = append (blocksToDelete , result .BlockNumber )
191205 }
192206 // TODO make delete and insert atomic
193- if err := rh .storage .MainStorage .DeleteBlockData (rh .rpc .GetChainID (), blockRange ); err != nil {
194- return fmt .Errorf ("error deleting data for blocks %v: %w" , blockRange , err )
207+ if err := rh .storage .MainStorage .DeleteBlockData (rh .rpc .GetChainID (), blocksToDelete ); err != nil {
208+ return fmt .Errorf ("error deleting data for blocks %v: %w" , blocksToDelete , err )
195209 }
196210 if err := rh .storage .MainStorage .InsertBlockData (& data ); err != nil {
197211 return fmt .Errorf ("error saving data to main storage: %w" , err )
0 commit comments