@@ -64,8 +64,9 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
6464 Settings : func () clickhouse.Settings {
6565 if cfg .AsyncInsert {
6666 return clickhouse.Settings {
67- "async_insert" : "1" ,
68- "wait_for_async_insert" : "1" ,
67+ "async_insert" : "1" ,
68+ "wait_for_async_insert" : "1" ,
69+ "lightweight_deletes_sync" : "0" ,
6970 }
7071 }
7172 return clickhouse.Settings {}
@@ -954,68 +955,209 @@ func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int,
954955}
955956
956957func (c * ClickHouseConnector ) DeleteBlockData (chainId * big.Int , blockNumbers []* big.Int ) error {
957- var saveErr error
958- var saveErrMutex sync.Mutex
958+ var deleteErr error
959+ var deleteErrMutex sync.Mutex
959960 var wg sync.WaitGroup
960961 wg .Add (4 )
961962
962963 go func () {
963964 defer wg .Done ()
964- if err := c .deleteBatch (chainId , blockNumbers , "blocks" , "number" ); err != nil {
965- saveErrMutex .Lock ()
966- saveErr = fmt .Errorf ("error deleting blocks: %v" , err )
967- saveErrMutex .Unlock ()
965+ if err := c .deleteBlocksByNumbers (chainId , blockNumbers ); err != nil {
966+ deleteErrMutex .Lock ()
967+ deleteErr = fmt .Errorf ("error deleting blocks: %v" , err )
968+ deleteErrMutex .Unlock ()
968969 }
969970 }()
970971
971972 go func () {
972973 defer wg .Done ()
973- if err := c .deleteBatch (chainId , blockNumbers , "logs" , "block_number" ); err != nil {
974- saveErrMutex .Lock ()
975- saveErr = fmt .Errorf ("error deleting logs: %v" , err )
976- saveErrMutex .Unlock ()
974+ if err := c .deleteLogsByNumbers (chainId , blockNumbers ); err != nil {
975+ deleteErrMutex .Lock ()
976+ deleteErr = fmt .Errorf ("error deleting logs: %v" , err )
977+ deleteErrMutex .Unlock ()
977978 }
978979 }()
979980
980981 go func () {
981982 defer wg .Done ()
982- if err := c .deleteBatch (chainId , blockNumbers , "transactions" , "block_number" ); err != nil {
983- saveErrMutex .Lock ()
984- saveErr = fmt .Errorf ("error deleting transactions: %v" , err )
985- saveErrMutex .Unlock ()
983+ if err := c .deleteTransactionsByNumbers (chainId , blockNumbers ); err != nil {
984+ deleteErrMutex .Lock ()
985+ deleteErr = fmt .Errorf ("error deleting transactions: %v" , err )
986+ deleteErrMutex .Unlock ()
986987 }
987988 }()
988989
989990 go func () {
990991 defer wg .Done ()
991- if err := c .deleteBatch (chainId , blockNumbers , "traces" , "block_number" ); err != nil {
992- saveErrMutex .Lock ()
993- saveErr = fmt .Errorf ("error deleting traces: %v" , err )
994- saveErrMutex .Unlock ()
992+ if err := c .deleteTracesByNumbers (chainId , blockNumbers ); err != nil {
993+ deleteErrMutex .Lock ()
994+ deleteErr = fmt .Errorf ("error deleting traces: %v" , err )
995+ deleteErrMutex .Unlock ()
995996 }
996997 }()
997998
998999 wg .Wait ()
9991000
1000- if saveErr != nil {
1001- return saveErr
1001+ if deleteErr != nil {
1002+ return deleteErr
10021003 }
10031004 return nil
10041005}
10051006
1006- func (c * ClickHouseConnector ) deleteBatch (chainId * big.Int , blockNumbers []* big.Int , table string , blockNumberColumn string ) error {
1007- query := fmt .Sprintf ("DELETE FROM %s.%s WHERE chain_id = ? AND %s IN (?)" , c .cfg .Database , table , blockNumberColumn )
1007+ func (c * ClickHouseConnector ) deleteBlocksByNumbers (chainId * big.Int , blockNumbers []* big.Int ) error {
1008+ query := fmt .Sprintf ("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)" , c .cfg .Database )
10081009
10091010 blockNumbersStr := make ([]string , len (blockNumbers ))
10101011 for i , bn := range blockNumbers {
10111012 blockNumbersStr [i ] = bn .String ()
10121013 }
1013-
10141014 err := c .conn .Exec (context .Background (), query , chainId , blockNumbersStr )
10151015 if err != nil {
1016- return fmt .Errorf ("error deleting from %s: %w" , table , err )
1016+ return fmt .Errorf ("error deleting blocks: %w" , err )
1017+ }
1018+ return nil
1019+ }
1020+
1021+ func (c * ClickHouseConnector ) deleteLogsByNumbers (chainId * big.Int , blockNumbers []* big.Int ) error {
1022+ blockNumbersStr := make ([]string , len (blockNumbers ))
1023+ for i , bn := range blockNumbers {
1024+ blockNumbersStr [i ] = bn .String ()
1025+ }
1026+ getQuery := fmt .Sprintf ("SELECT block_number, transaction_hash, log_index FROM %s.logs WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0" , c .cfg .Database , chainId .String ())
1027+
1028+ rows , getErr := c .conn .Query (context .Background (), getQuery )
1029+ if getErr != nil {
1030+ return getErr
10171031 }
1032+ defer rows .Close ()
1033+
1034+ logsToDelete := make ([]common.Log , 0 )
1035+ for rows .Next () {
1036+ var logToDelete common.Log
1037+ err := rows .ScanStruct (& logToDelete )
1038+ if err != nil {
1039+ return err
1040+ }
1041+ logsToDelete = append (logsToDelete , logToDelete )
1042+ }
1043+
1044+ deleteQuery := fmt .Sprintf ("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?" , c .cfg .Database )
10181045
1046+ batch , err := c .conn .PrepareBatch (context .Background (), deleteQuery )
1047+ if err != nil {
1048+ return fmt .Errorf ("error preparing batch for deleting logs: %w" , err )
1049+ }
1050+
1051+ for _ , log := range logsToDelete {
1052+ err := batch .Append (
1053+ chainId ,
1054+ chainId ,
1055+ log .BlockNumber ,
1056+ log .TransactionHash ,
1057+ log .LogIndex ,
1058+ )
1059+ if err != nil {
1060+ return fmt .Errorf ("error appending log to delete batch: %w" , err )
1061+ }
1062+ }
1063+ if err := batch .Send (); err != nil {
1064+ return fmt .Errorf ("error deleting logs: %w" , err )
1065+ }
1066+ return nil
1067+ }
1068+
1069+ func (c * ClickHouseConnector ) deleteTransactionsByNumbers (chainId * big.Int , blockNumbers []* big.Int ) error {
1070+ blockNumbersStr := make ([]string , len (blockNumbers ))
1071+ for i , bn := range blockNumbers {
1072+ blockNumbersStr [i ] = bn .String ()
1073+ }
1074+ getQuery := fmt .Sprintf ("SELECT block_number, hash FROM %s.transactions WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0" , c .cfg .Database , chainId .String ())
1075+
1076+ rows , getErr := c .conn .Query (context .Background (), getQuery )
1077+ if getErr != nil {
1078+ return getErr
1079+ }
1080+ defer rows .Close ()
1081+
1082+ txsToDelete := make ([]common.Transaction , 0 )
1083+ for rows .Next () {
1084+ var txToDelete common.Transaction
1085+ err := rows .ScanStruct (& txToDelete )
1086+ if err != nil {
1087+ return err
1088+ }
1089+ txsToDelete = append (txsToDelete , txToDelete )
1090+ }
1091+
1092+ deleteQuery := fmt .Sprintf ("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND hash = ?" , c .cfg .Database )
1093+
1094+ batch , err := c .conn .PrepareBatch (context .Background (), deleteQuery )
1095+ if err != nil {
1096+ return fmt .Errorf ("error preparing batch for deleting transactions: %w" , err )
1097+ }
1098+
1099+ for _ , tx := range txsToDelete {
1100+ err := batch .Append (
1101+ chainId ,
1102+ chainId ,
1103+ tx .BlockNumber ,
1104+ tx .Hash ,
1105+ )
1106+ if err != nil {
1107+ return fmt .Errorf ("error appending transaction to delete batch: %w" , err )
1108+ }
1109+ }
1110+ if err := batch .Send (); err != nil {
1111+ return fmt .Errorf ("error deleting transactions: %w" , err )
1112+ }
1113+ return nil
1114+ }
1115+
1116+ func (c * ClickHouseConnector ) deleteTracesByNumbers (chainId * big.Int , blockNumbers []* big.Int ) error {
1117+ blockNumbersStr := make ([]string , len (blockNumbers ))
1118+ for i , bn := range blockNumbers {
1119+ blockNumbersStr [i ] = bn .String ()
1120+ }
1121+ getQuery := fmt .Sprintf ("SELECT block_number, transaction_hash, trace_address FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0" , c .cfg .Database , chainId .String ())
1122+
1123+ rows , getErr := c .conn .Query (context .Background (), getQuery )
1124+ if getErr != nil {
1125+ return getErr
1126+ }
1127+ defer rows .Close ()
1128+
1129+ tracesToDelete := make ([]common.Trace , 0 )
1130+ for rows .Next () {
1131+ var traceToDelete common.Trace
1132+ err := rows .ScanStruct (& traceToDelete )
1133+ if err != nil {
1134+ return err
1135+ }
1136+ tracesToDelete = append (tracesToDelete , traceToDelete )
1137+ }
1138+
1139+ deleteQuery := fmt .Sprintf ("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number = ? AND transaction_hash = ? AND trace_address = ?" , c .cfg .Database )
1140+
1141+ batch , err := c .conn .PrepareBatch (context .Background (), deleteQuery )
1142+ if err != nil {
1143+ return fmt .Errorf ("error preparing batch for deleting traces: %w" , err )
1144+ }
1145+
1146+ for _ , trace := range tracesToDelete {
1147+ err := batch .Append (
1148+ chainId ,
1149+ chainId ,
1150+ trace .BlockNumber ,
1151+ trace .TransactionHash ,
1152+ trace .TraceAddress ,
1153+ )
1154+ if err != nil {
1155+ return fmt .Errorf ("error appending trace to delete batch: %w" , err )
1156+ }
1157+ }
1158+ if err := batch .Send (); err != nil {
1159+ return fmt .Errorf ("error deleting traces: %w" , err )
1160+ }
10191161 return nil
10201162}
10211163
0 commit comments