Skip to content

Commit a42cd15

Browse files
committed
fix duplicate inserts
1 parent 8322feb commit a42cd15

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

internal/orchestrator/committer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4747

4848
func (c *Committer) Start(ctx context.Context) {
4949
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
50-
ticker := time.NewTicker(interval)
51-
defer ticker.Stop()
5250

5351
log.Debug().Msgf("Committer running")
5452
for {
5553
select {
5654
case <-ctx.Done():
5755
log.Info().Msg("Committer shutting down")
5856
return
59-
case <-ticker.C:
57+
default:
58+
time.Sleep(interval)
6059
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
6160
if err != nil {
6261
log.Error().Err(err).Msg("Error getting block data to commit")

internal/storage/clickhouse.go

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,18 +1082,59 @@ func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big
10821082
})
10831083
}
10841084

1085+
func (c *ClickHouseConnector) getBlockExistenceMap(data *[]common.BlockData) (map[string]bool, error) {
1086+
if len(*data) == 0 {
1087+
return nil, nil
1088+
}
1089+
blockNumbers := make([]*big.Int, 0, len(*data))
1090+
for _, blockData := range *data {
1091+
blockNumbers = append(blockNumbers, blockData.Block.Number)
1092+
}
1093+
chainId := (*data)[0].Block.ChainId
1094+
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? AND block_number IN (?) GROUP BY block_number HAVING sum(sign) > 0", c.cfg.Database, c.getTableName(chainId, "blocks"))
1095+
rows, err := c.conn.Query(context.Background(), query, chainId, blockNumbers)
1096+
if err != nil {
1097+
return nil, err
1098+
}
1099+
defer rows.Close()
1100+
1101+
blockNumbersMap := make(map[string]bool)
1102+
for _, blockNumber := range blockNumbers {
1103+
blockNumbersMap[blockNumber.String()] = false
1104+
}
1105+
for rows.Next() {
1106+
var blockNumber string
1107+
err := rows.Scan(&blockNumber)
1108+
if err != nil {
1109+
return nil, err
1110+
}
1111+
blockNumbersMap[blockNumber] = true
1112+
}
1113+
return blockNumbersMap, nil
1114+
}
1115+
10851116
// TODO make this atomic
10861117
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
1118+
if len(*data) == 0 {
1119+
return nil
1120+
}
10871121
blocks := make([]common.Block, 0, len(*data))
10881122
logs := make([]common.Log, 0)
10891123
transactions := make([]common.Transaction, 0)
10901124
traces := make([]common.Trace, 0)
10911125

1126+
blockExistenceMap, err := c.getBlockExistenceMap(data)
1127+
if err != nil {
1128+
return err
1129+
}
1130+
10921131
for _, blockData := range *data {
1093-
blocks = append(blocks, blockData.Block)
1094-
logs = append(logs, blockData.Logs...)
1095-
transactions = append(transactions, blockData.Transactions...)
1096-
traces = append(traces, blockData.Traces...)
1132+
if !blockExistenceMap[blockData.Block.Number.String()] {
1133+
blocks = append(blocks, blockData.Block)
1134+
logs = append(logs, blockData.Logs...)
1135+
transactions = append(transactions, blockData.Transactions...)
1136+
traces = append(traces, blockData.Traces...)
1137+
}
10971138
}
10981139

10991140
var saveErr error

0 commit comments

Comments
 (0)