@@ -758,6 +758,122 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
758758 return traces , nil
759759}
760760
761+ func (c * ClickHouseConnector ) GetLastReorgCheckedBlockNumber (chainId * big.Int ) (* big.Int , error ) {
762+ query := fmt .Sprintf ("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'" , c .cfg .Database )
763+ if chainId .Sign () > 0 {
764+ query += fmt .Sprintf (" AND chain_id = %s" , chainId .String ())
765+ }
766+ var blockNumberString string
767+ err := c .conn .QueryRow (context .Background (), query ).Scan (& blockNumberString )
768+ if err != nil {
769+ return nil , err
770+ }
771+ blockNumber , ok := new (big.Int ).SetString (blockNumberString , 10 )
772+ if ! ok {
773+ return nil , fmt .Errorf ("failed to parse block number: %s" , blockNumberString )
774+ }
775+ return blockNumber , nil
776+ }
777+
778+ func (c * ClickHouseConnector ) SetLastReorgCheckedBlockNumber (chainId * big.Int , blockNumber * big.Int ) error {
779+ query := fmt .Sprintf ("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')" , c .cfg .Database , chainId , blockNumber .String ())
780+ err := c .conn .Exec (context .Background (), query )
781+ return err
782+ }
783+
784+ func (c * ClickHouseConnector ) LookbackBlockHeaders (chainId * big.Int , limit int , lookbackStart * big.Int ) (blockHeaders []common.BlockHeader , err error ) {
785+ query := fmt .Sprintf ("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC" , c .cfg .Database , chainId .String (), lookbackStart .String ())
786+ if chainId .Sign () > 0 {
787+ query += fmt .Sprintf (" AND chain_id = %s" , chainId .String ())
788+ }
789+ query += getLimitClause (limit )
790+
791+ rows , err := c .conn .Query (context .Background (), query )
792+ if err != nil {
793+ return nil , err
794+ }
795+ defer rows .Close ()
796+
797+ for rows .Next () {
798+ var blockHeader common.BlockHeader
799+ err := rows .Scan (& blockHeader .Number , & blockHeader .Hash , & blockHeader .ParentHash )
800+ if err != nil {
801+ return nil , err
802+ }
803+ blockHeaders = append (blockHeaders , blockHeader )
804+ }
805+ return blockHeaders , nil
806+ }
807+
808+ func (c * ClickHouseConnector ) DeleteDataForBlocks (chainId * big.Int , blockNumbers []* big.Int ) error {
809+ var saveErr error
810+ var saveErrMutex sync.Mutex
811+ var wg sync.WaitGroup
812+ wg .Add (4 )
813+
814+ go func () {
815+ defer wg .Done ()
816+ if err := c .deleteBatch (chainId , blockNumbers , "blocks" , "chain_id, number, is_deleted" ); err != nil {
817+ saveErrMutex .Lock ()
818+ saveErr = fmt .Errorf ("error deleting blocks: %v" , err )
819+ saveErrMutex .Unlock ()
820+ }
821+ }()
822+
823+ go func () {
824+ defer wg .Done ()
825+ if err := c .deleteBatch (chainId , blockNumbers , "logs" , "chain_id, block_number, is_deleted" ); err != nil {
826+ saveErrMutex .Lock ()
827+ saveErr = fmt .Errorf ("error deleting logs: %v" , err )
828+ saveErrMutex .Unlock ()
829+ }
830+ }()
831+
832+ go func () {
833+ defer wg .Done ()
834+ if err := c .deleteBatch (chainId , blockNumbers , "transactions" , "chain_id, block_number, is_deleted" ); err != nil {
835+ saveErrMutex .Lock ()
836+ saveErr = fmt .Errorf ("error deleting transactions: %v" , err )
837+ saveErrMutex .Unlock ()
838+ }
839+ }()
840+
841+ go func () {
842+ defer wg .Done ()
843+ if err := c .deleteBatch (chainId , blockNumbers , "traces" , "chain_id, block_number, is_deleted" ); err != nil {
844+ saveErrMutex .Lock ()
845+ saveErr = fmt .Errorf ("error deleting traces: %v" , err )
846+ saveErrMutex .Unlock ()
847+ }
848+ }()
849+
850+ wg .Wait ()
851+
852+ if saveErr != nil {
853+ return saveErr
854+ }
855+ return nil
856+ }
857+
858+ func (c * ClickHouseConnector ) deleteBatch (chainId * big.Int , blockNumbers []* big.Int , table string , columns string ) error {
859+ query := fmt .Sprintf ("INSERT INTO %s.%s (%s)" , c .cfg .Database , table , columns )
860+ batch , err := c .conn .PrepareBatch (context .Background (), query )
861+ if err != nil {
862+ return err
863+ }
864+ for _ , blockNumber := range blockNumbers {
865+ err = batch .Append (
866+ chainId ,
867+ blockNumber ,
868+ 1 ,
869+ )
870+ if err != nil {
871+ return err
872+ }
873+ }
874+ return batch .Send ()
875+ }
876+
761877// TODO make this atomic
762878func (c * ClickHouseConnector ) InsertBlockData (data * []common.BlockData ) error {
763879 blocks := make ([]common.Block , 0 , len (* data ))
0 commit comments