Skip to content

Commit 74accd7

Browse files
committed
use inserts instead of lightweight deletes to remove reorged data
1 parent 9538ef4 commit 74accd7

File tree

1 file changed

+80
-61
lines changed

1 file changed

+80
-61
lines changed

internal/storage/clickhouse.go

Lines changed: 80 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,8 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
6464
Settings: func() clickhouse.Settings {
6565
if cfg.AsyncInsert {
6666
return clickhouse.Settings{
67-
"async_insert": "1",
68-
"wait_for_async_insert": "1",
69-
"lightweight_deletes_sync": "0",
67+
"async_insert": "1",
68+
"wait_for_async_insert": "1",
7069
}
7170
}
7271
return clickhouse.Settings{}
@@ -962,7 +961,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*
962961

963962
go func() {
964963
defer wg.Done()
965-
if err := c.deleteBlocksByNumbers(chainId, blockNumbers); err != nil {
964+
if err := c.deleteBlocks(chainId, blockNumbers); err != nil {
966965
deleteErrMutex.Lock()
967966
deleteErr = fmt.Errorf("error deleting blocks: %v", err)
968967
deleteErrMutex.Unlock()
@@ -971,7 +970,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*
971970

972971
go func() {
973972
defer wg.Done()
974-
if err := c.deleteLogsByNumbers(chainId, blockNumbers); err != nil {
973+
if err := c.deleteLogs(chainId, blockNumbers); err != nil {
975974
deleteErrMutex.Lock()
976975
deleteErr = fmt.Errorf("error deleting logs: %v", err)
977976
deleteErrMutex.Unlock()
@@ -980,7 +979,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*
980979

981980
go func() {
982981
defer wg.Done()
983-
if err := c.deleteTransactionsByNumbers(chainId, blockNumbers); err != nil {
982+
if err := c.deleteTransactions(chainId, blockNumbers); err != nil {
984983
deleteErrMutex.Lock()
985984
deleteErr = fmt.Errorf("error deleting transactions: %v", err)
986985
deleteErrMutex.Unlock()
@@ -989,7 +988,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*
989988

990989
go func() {
991990
defer wg.Done()
992-
if err := c.deleteTracesByNumbers(chainId, blockNumbers); err != nil {
991+
if err := c.deleteTraces(chainId, blockNumbers); err != nil {
993992
deleteErrMutex.Lock()
994993
deleteErr = fmt.Errorf("error deleting traces: %v", err)
995994
deleteErrMutex.Unlock()
@@ -1004,21 +1003,33 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*
10041003
return nil
10051004
}
10061005

1007-
func (c *ClickHouseConnector) deleteBlocksByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
1008-
query := fmt.Sprintf("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)", c.cfg.Database)
1006+
func (c *ClickHouseConnector) deleteBlocks(chainId *big.Int, blockNumbers []*big.Int) error {
1007+
query := fmt.Sprintf("INSERT INTO %s.blocks (chain_id, number, is_deleted)", c.cfg.Database)
1008+
1009+
batch, err := c.conn.PrepareBatch(context.Background(), query)
1010+
if err != nil {
1011+
return err
1012+
}
10091013

10101014
blockNumbersStr := make([]string, len(blockNumbers))
10111015
for i, bn := range blockNumbers {
10121016
blockNumbersStr[i] = bn.String()
1017+
err := batch.Append(
1018+
chainId,
1019+
bn,
1020+
1,
1021+
)
1022+
if err != nil {
1023+
return err
1024+
}
10131025
}
1014-
err := c.conn.Exec(context.Background(), query, chainId, chainId, blockNumbersStr)
1015-
if err != nil {
1016-
return fmt.Errorf("error deleting blocks: %w", err)
1026+
if batch.Rows() < 1 {
1027+
return nil // No blocks to delete
10171028
}
1018-
return nil
1029+
return batch.Send()
10191030
}
10201031

1021-
func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
1032+
func (c *ClickHouseConnector) deleteLogs(chainId *big.Int, blockNumbers []*big.Int) error {
10221033
blockNumbersStr := make([]string, len(blockNumbers))
10231034
for i, bn := range blockNumbers {
10241035
blockNumbersStr[i] = bn.String()
@@ -1031,34 +1042,36 @@ func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers
10311042
}
10321043
defer rows.Close()
10331044

1034-
blockNumbersToDelete := common.NewSet[string]()
1035-
txHashesToDelete := common.NewSet[string]()
1036-
logIndexesToDelete := common.NewSet[uint64]()
1045+
query := fmt.Sprintf("INSERT INTO %s.logs (chain_id, block_number, transaction_hash, log_index, is_deleted)", c.cfg.Database)
1046+
batch, err := c.conn.PrepareBatch(context.Background(), query)
1047+
if err != nil {
1048+
return err
1049+
}
1050+
10371051
for rows.Next() {
10381052
var logToDelete common.Log
10391053
err := rows.ScanStruct(&logToDelete)
10401054
if err != nil {
10411055
return err
10421056
}
1043-
blockNumbersToDelete.Add(logToDelete.BlockNumber.String())
1044-
txHashesToDelete.Add(logToDelete.TransactionHash)
1045-
logIndexesToDelete.Add(logToDelete.LogIndex)
1057+
err = batch.Append(
1058+
chainId,
1059+
logToDelete.BlockNumber,
1060+
logToDelete.TransactionHash,
1061+
logToDelete.LogIndex,
1062+
1,
1063+
)
1064+
if err != nil {
1065+
return err
1066+
}
10461067
}
1047-
1048-
if txHashesToDelete.Size() == 0 {
1068+
if batch.Rows() < 1 {
10491069
return nil // No logs to delete
10501070
}
1051-
1052-
deleteQuery := fmt.Sprintf("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?) AND log_index IN (?)", c.cfg.Database)
1053-
1054-
err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List(), logIndexesToDelete.List())
1055-
if err != nil {
1056-
return err
1057-
}
1058-
return nil
1071+
return batch.Send()
10591072
}
10601073

1061-
func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
1074+
func (c *ClickHouseConnector) deleteTransactions(chainId *big.Int, blockNumbers []*big.Int) error {
10621075
blockNumbersStr := make([]string, len(blockNumbers))
10631076
for i, bn := range blockNumbers {
10641077
blockNumbersStr[i] = bn.String()
@@ -1071,68 +1084,74 @@ func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, bloc
10711084
}
10721085
defer rows.Close()
10731086

1074-
blockNumbersToDelete := common.NewSet[string]()
1075-
hashesToDelete := common.NewSet[string]()
1087+
query := fmt.Sprintf("INSERT INTO %s.transactions (chain_id, block_number, hash, is_deleted)", c.cfg.Database)
1088+
batch, err := c.conn.PrepareBatch(context.Background(), query)
1089+
if err != nil {
1090+
return err
1091+
}
10761092

10771093
for rows.Next() {
10781094
var txToDelete common.Transaction
10791095
err := rows.ScanStruct(&txToDelete)
10801096
if err != nil {
10811097
return err
10821098
}
1083-
blockNumbersToDelete.Add(txToDelete.BlockNumber.String())
1084-
hashesToDelete.Add(txToDelete.Hash)
1099+
err = batch.Append(
1100+
chainId,
1101+
txToDelete.BlockNumber,
1102+
txToDelete.Hash,
1103+
1,
1104+
)
1105+
if err != nil {
1106+
return err
1107+
}
10851108
}
1086-
1087-
if hashesToDelete.Size() == 0 {
1109+
if batch.Rows() < 1 {
10881110
return nil // No transactions to delete
10891111
}
1090-
1091-
deleteQuery := fmt.Sprintf("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND hash IN (?)", c.cfg.Database)
1092-
1093-
err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), hashesToDelete.List())
1094-
if err != nil {
1095-
return err
1096-
}
1097-
return nil
1112+
return batch.Send()
10981113
}
10991114

1100-
func (c *ClickHouseConnector) deleteTracesByNumbers(chainId *big.Int, blockNumbers []*big.Int) error {
1115+
func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big.Int) error {
11011116
blockNumbersStr := make([]string, len(blockNumbers))
11021117
for i, bn := range blockNumbers {
11031118
blockNumbersStr[i] = bn.String()
11041119
}
1105-
getQuery := fmt.Sprintf("SELECT block_number, transaction_hash FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String())
1120+
getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, trace_address FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String())
11061121

11071122
rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr)
11081123
if getErr != nil {
11091124
return getErr
11101125
}
11111126
defer rows.Close()
11121127

1113-
blockNumbersToDelete := common.NewSet[string]()
1114-
txHashesToDelete := common.NewSet[string]()
1128+
query := fmt.Sprintf("INSERT INTO %s.traces (chain_id, block_number, transaction_hash, trace_address, is_deleted)", c.cfg.Database)
1129+
batch, err := c.conn.PrepareBatch(context.Background(), query)
1130+
if err != nil {
1131+
return err
1132+
}
1133+
11151134
for rows.Next() {
11161135
var traceToDelete common.Trace
11171136
err := rows.ScanStruct(&traceToDelete)
11181137
if err != nil {
11191138
return err
11201139
}
1121-
blockNumbersToDelete.Add(traceToDelete.BlockNumber.String())
1122-
txHashesToDelete.Add(traceToDelete.TransactionHash)
1140+
err = batch.Append(
1141+
chainId,
1142+
traceToDelete.BlockNumber,
1143+
traceToDelete.TransactionHash,
1144+
traceToDelete.TraceAddress,
1145+
1,
1146+
)
1147+
if err != nil {
1148+
return err
1149+
}
11231150
}
1124-
1125-
if txHashesToDelete.Size() == 0 {
1151+
if batch.Rows() < 1 {
11261152
return nil // No traces to delete
11271153
}
1128-
1129-
deleteQuery := fmt.Sprintf("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?)", c.cfg.Database)
1130-
1131-
err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List())
1132-
if err != nil {
1133-
return err
1134-
}
1135-
return nil
1154+
return batch.Send()
11361155
}
11371156

11381157
// TODO make this atomic

0 commit comments

Comments
 (0)