@@ -609,7 +609,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
609609 return batch .Send ()
610610}
611611
612- func (c * ClickHouseConnector ) GetStagingData (qf QueryFilter ) (blockDataList * []common.BlockData , err error ) {
612+ func (c * ClickHouseConnector ) GetStagingData (qf QueryFilter ) (* []common.BlockData , error ) {
613613 query := fmt .Sprintf ("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0" ,
614614 c .cfg .Database , getBlockNumbersStringArray (qf .BlockNumbers ))
615615
@@ -625,6 +625,7 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
625625 }
626626 defer rows .Close ()
627627
628+ blockDataList := make ([]common.BlockData , 0 )
628629 for rows .Next () {
629630 var blockDataJson string
630631 err := rows .Scan (
@@ -639,9 +640,9 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
639640 if err != nil {
640641 return nil , err
641642 }
642- * blockDataList = append (* blockDataList , blockData )
643+ blockDataList = append (blockDataList , blockData )
643644 }
644- return blockDataList , nil
645+ return & blockDataList , nil
645646}
646647
647648func (c * ClickHouseConnector ) DeleteStagingData (data * []common.BlockData ) error {
@@ -763,6 +764,116 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
763764 return traces , nil
764765}
765766
767+ func (c * ClickHouseConnector ) GetLastReorgCheckedBlockNumber (chainId * big.Int ) (* big.Int , error ) {
768+ query := fmt .Sprintf ("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'" , c .cfg .Database )
769+ if chainId .Sign () > 0 {
770+ query += fmt .Sprintf (" AND chain_id = %s" , chainId .String ())
771+ }
772+ var blockNumberString string
773+ err := c .conn .QueryRow (context .Background (), query ).Scan (& blockNumberString )
774+ if err != nil {
775+ return nil , err
776+ }
777+ blockNumber , ok := new (big.Int ).SetString (blockNumberString , 10 )
778+ if ! ok {
779+ return nil , fmt .Errorf ("failed to parse block number: %s" , blockNumberString )
780+ }
781+ return blockNumber , nil
782+ }
783+
784+ func (c * ClickHouseConnector ) SetLastReorgCheckedBlockNumber (chainId * big.Int , blockNumber * big.Int ) error {
785+ query := fmt .Sprintf ("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')" , c .cfg .Database , chainId , blockNumber .String ())
786+ err := c .conn .Exec (context .Background (), query )
787+ return err
788+ }
789+
790+ func (c * ClickHouseConnector ) LookbackBlockHeaders (chainId * big.Int , limit int , lookbackStart * big.Int ) (blockHeaders []common.BlockHeader , err error ) {
791+ query := fmt .Sprintf ("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC" , c .cfg .Database , chainId .String (), lookbackStart .String ())
792+ query += getLimitClause (limit )
793+
794+ rows , err := c .conn .Query (context .Background (), query )
795+ if err != nil {
796+ return nil , err
797+ }
798+ defer rows .Close ()
799+
800+ for rows .Next () {
801+ var blockHeader common.BlockHeader
802+ err := rows .Scan (& blockHeader .Number , & blockHeader .Hash , & blockHeader .ParentHash )
803+ if err != nil {
804+ return nil , err
805+ }
806+ blockHeaders = append (blockHeaders , blockHeader )
807+ }
808+ return blockHeaders , nil
809+ }
810+
811+ func (c * ClickHouseConnector ) DeleteBlockData (chainId * big.Int , blockNumbers []* big.Int ) error {
812+ var saveErr error
813+ var saveErrMutex sync.Mutex
814+ var wg sync.WaitGroup
815+ wg .Add (4 )
816+
817+ go func () {
818+ defer wg .Done ()
819+ if err := c .deleteBatch (chainId , blockNumbers , "blocks" , "number" ); err != nil {
820+ saveErrMutex .Lock ()
821+ saveErr = fmt .Errorf ("error deleting blocks: %v" , err )
822+ saveErrMutex .Unlock ()
823+ }
824+ }()
825+
826+ go func () {
827+ defer wg .Done ()
828+ if err := c .deleteBatch (chainId , blockNumbers , "logs" , "block_number" ); err != nil {
829+ saveErrMutex .Lock ()
830+ saveErr = fmt .Errorf ("error deleting logs: %v" , err )
831+ saveErrMutex .Unlock ()
832+ }
833+ }()
834+
835+ go func () {
836+ defer wg .Done ()
837+ if err := c .deleteBatch (chainId , blockNumbers , "transactions" , "block_number" ); err != nil {
838+ saveErrMutex .Lock ()
839+ saveErr = fmt .Errorf ("error deleting transactions: %v" , err )
840+ saveErrMutex .Unlock ()
841+ }
842+ }()
843+
844+ go func () {
845+ defer wg .Done ()
846+ if err := c .deleteBatch (chainId , blockNumbers , "traces" , "block_number" ); err != nil {
847+ saveErrMutex .Lock ()
848+ saveErr = fmt .Errorf ("error deleting traces: %v" , err )
849+ saveErrMutex .Unlock ()
850+ }
851+ }()
852+
853+ wg .Wait ()
854+
855+ if saveErr != nil {
856+ return saveErr
857+ }
858+ return nil
859+ }
860+
861+ func (c * ClickHouseConnector ) deleteBatch (chainId * big.Int , blockNumbers []* big.Int , table string , blockNumberColumn string ) error {
862+ query := fmt .Sprintf ("ALTER TABLE %s.%s DELETE WHERE chain_id = ? AND %s IN (?)" , c .cfg .Database , table , blockNumberColumn )
863+
864+ blockNumbersStr := make ([]string , len (blockNumbers ))
865+ for i , bn := range blockNumbers {
866+ blockNumbersStr [i ] = bn .String ()
867+ }
868+
869+ err := c .conn .Exec (context .Background (), query , chainId , blockNumbersStr )
870+ if err != nil {
871+ return fmt .Errorf ("error deleting from %s: %w" , table , err )
872+ }
873+
874+ return nil
875+ }
876+
766877// TODO make this atomic
767878func (c * ClickHouseConnector ) InsertBlockData (data * []common.BlockData ) error {
768879 blocks := make ([]common.Block , 0 , len (* data ))
0 commit comments