Skip to content

Commit 0f70f2e

Browse files
committed
create new storage functions for handling reorgs
1 parent 5f3c0f7 commit 0f70f2e

File tree

7 files changed

+221
-4
lines changed

7 files changed

+221
-4
lines changed

internal/common/block.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ type BlockData struct {
3636
Traces []Trace
3737
}
3838

39+
type BlockHeader struct {
40+
Number *big.Int `json:"number"`
41+
Hash string `json:"hash"`
42+
ParentHash string `json:"parent_hash"`
43+
}
44+
3945
type RawBlock = map[string]interface{}

internal/storage/clickhouse.go

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
609609
return batch.Send()
610610
}
611611

612-
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
612+
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) {
613613
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
614614
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
615615

@@ -625,6 +625,7 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
625625
}
626626
defer rows.Close()
627627

628+
blockDataList := make([]common.BlockData, 0)
628629
for rows.Next() {
629630
var blockDataJson string
630631
err := rows.Scan(
@@ -639,9 +640,9 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]c
639640
if err != nil {
640641
return nil, err
641642
}
642-
*blockDataList = append(*blockDataList, blockData)
643+
blockDataList = append(blockDataList, blockData)
643644
}
644-
return blockDataList, nil
645+
return &blockDataList, nil
645646
}
646647

647648
func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
@@ -763,6 +764,116 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
763764
return traces, nil
764765
}
765766

767+
func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
768+
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
769+
if chainId.Sign() > 0 {
770+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
771+
}
772+
var blockNumberString string
773+
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
774+
if err != nil {
775+
return nil, err
776+
}
777+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
778+
if !ok {
779+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
780+
}
781+
return blockNumber, nil
782+
}
783+
784+
func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
785+
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String())
786+
err := c.conn.Exec(context.Background(), query)
787+
return err
788+
}
789+
790+
func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) {
791+
query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String())
792+
query += getLimitClause(limit)
793+
794+
rows, err := c.conn.Query(context.Background(), query)
795+
if err != nil {
796+
return nil, err
797+
}
798+
defer rows.Close()
799+
800+
for rows.Next() {
801+
var blockHeader common.BlockHeader
802+
err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash)
803+
if err != nil {
804+
return nil, err
805+
}
806+
blockHeaders = append(blockHeaders, blockHeader)
807+
}
808+
return blockHeaders, nil
809+
}
810+
811+
func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
812+
var saveErr error
813+
var saveErrMutex sync.Mutex
814+
var wg sync.WaitGroup
815+
wg.Add(4)
816+
817+
go func() {
818+
defer wg.Done()
819+
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "number"); err != nil {
820+
saveErrMutex.Lock()
821+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
822+
saveErrMutex.Unlock()
823+
}
824+
}()
825+
826+
go func() {
827+
defer wg.Done()
828+
if err := c.deleteBatch(chainId, blockNumbers, "logs", "block_number"); err != nil {
829+
saveErrMutex.Lock()
830+
saveErr = fmt.Errorf("error deleting logs: %v", err)
831+
saveErrMutex.Unlock()
832+
}
833+
}()
834+
835+
go func() {
836+
defer wg.Done()
837+
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "block_number"); err != nil {
838+
saveErrMutex.Lock()
839+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
840+
saveErrMutex.Unlock()
841+
}
842+
}()
843+
844+
go func() {
845+
defer wg.Done()
846+
if err := c.deleteBatch(chainId, blockNumbers, "traces", "block_number"); err != nil {
847+
saveErrMutex.Lock()
848+
saveErr = fmt.Errorf("error deleting traces: %v", err)
849+
saveErrMutex.Unlock()
850+
}
851+
}()
852+
853+
wg.Wait()
854+
855+
if saveErr != nil {
856+
return saveErr
857+
}
858+
return nil
859+
}
860+
861+
func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, blockNumberColumn string) error {
862+
query := fmt.Sprintf("ALTER TABLE %s.%s DELETE WHERE chain_id = ? AND %s IN (?)", c.cfg.Database, table, blockNumberColumn)
863+
864+
blockNumbersStr := make([]string, len(blockNumbers))
865+
for i, bn := range blockNumbers {
866+
blockNumbersStr[i] = bn.String()
867+
}
868+
869+
err := c.conn.Exec(context.Background(), query, chainId, blockNumbersStr)
870+
if err != nil {
871+
return fmt.Errorf("error deleting from %s: %w", table, err)
872+
}
873+
874+
return nil
875+
}
876+
766877
// TODO make this atomic
767878
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
768879
blocks := make([]common.Block, 0, len(*data))

internal/storage/connector.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type IOrchestratorStorage interface {
3838
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
3939
StoreBlockFailures(failures []common.BlockFailure) error
4040
DeleteBlockFailures(failures []common.BlockFailure) error
41+
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
42+
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
4143
}
4244

4345
type IStagingStorage interface {
@@ -55,6 +57,11 @@ type IMainStorage interface {
5557
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
5658
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5759
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
60+
/**
61+
* Get block headers ordered from latest to oldest.
62+
*/
63+
LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error)
64+
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
5865
}
5966

6067
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {

internal/storage/memory.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,15 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
218218
return maxBlockNumber, nil
219219
}
220220

221+
func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool {
222+
for _, prefix := range prefixes {
223+
if isKeyForBlock(key, prefix, blocksFilter) {
224+
return true
225+
}
226+
}
227+
return false
228+
}
229+
221230
func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool {
222231
if !strings.HasPrefix(key, prefix) {
223232
return false
@@ -332,6 +341,24 @@ func traceAddressToString(traceAddress []uint64) string {
332341
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
333342
}
334343

344+
func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
345+
key := fmt.Sprintf("reorg_check:%s", chainId.String())
346+
value, ok := m.cache.Get(key)
347+
if !ok {
348+
return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String())
349+
}
350+
blockNumber, ok := new(big.Int).SetString(value, 10)
351+
if !ok {
352+
return nil, fmt.Errorf("failed to parse block number: %s", value)
353+
}
354+
return blockNumber, nil
355+
}
356+
357+
func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
358+
m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String())
359+
return nil
360+
}
361+
335362
func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
336363
blocks := make([]common.Block, 0, len(*data))
337364
logs := make([]common.Log, 0)
@@ -359,3 +386,42 @@ func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
359386
}
360387
return nil
361388
}
389+
390+
func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
391+
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
392+
for _, key := range m.cache.Keys() {
393+
prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())}
394+
shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck)
395+
if shouldDelete {
396+
m.cache.Remove(key)
397+
}
398+
}
399+
return nil
400+
}
401+
402+
func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) {
403+
blockHeaders := []common.BlockHeader{}
404+
for _, key := range m.cache.Keys() {
405+
if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) {
406+
blockNumberStr := strings.Split(key, ":")[2]
407+
blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10)
408+
if !ok {
409+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
410+
}
411+
if blockNumber.Cmp(lookbackStart) <= 0 {
412+
value, _ := m.cache.Get(key)
413+
block := common.Block{}
414+
err := json.Unmarshal([]byte(value), &block)
415+
if err != nil {
416+
return nil, err
417+
}
418+
blockHeaders = append(blockHeaders, common.BlockHeader{
419+
Number: blockNumber,
420+
Hash: block.Hash,
421+
ParentHash: block.ParentHash,
422+
})
423+
}
424+
}
425+
}
426+
return blockHeaders, nil
427+
}

internal/storage/redis.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math/big"
78

89
"github.com/go-redis/redis/v8"
910
"github.com/rs/zerolog/log"
@@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err
115116
}
116117
return nil
117118
}
119+
120+
func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
121+
ctx := context.Background()
122+
blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result()
123+
if err != nil {
124+
return nil, err
125+
}
126+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
127+
if !ok {
128+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
129+
}
130+
return blockNumber, nil
131+
}
132+
133+
func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
134+
ctx := context.Background()
135+
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
136+
return nil
137+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE cursors (
2+
`chain_id` UInt256,
3+
`cursor_type` String,
4+
`cursor_value` String,
5+
`insert_timestamp` DateTime DEFAULT now(),
6+
) ENGINE = ReplacingMergeTree(insert_timestamp)
7+
ORDER BY (chain_id, cursor_type);

internal/tools/clickhouse_create_staging_table.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ CREATE TABLE block_data (
66
`is_deleted` UInt8 DEFAULT 0,
77
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
88
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
9-
ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number)
9+
ORDER BY (chain_id, block_number)
1010
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

0 commit comments

Comments
 (0)