Skip to content

Commit 81f8a60

Browse files
committed
create new storage functions for handling reorgs
1 parent d0e0f6f commit 81f8a60

File tree

7 files changed

+214
-1
lines changed

7 files changed

+214
-1
lines changed

internal/common/block.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ type BlockData struct {
3636
Traces []Trace
3737
}
3838

39+
type BlockHeader struct {
40+
Number *big.Int `json:"number"`
41+
Hash string `json:"hash"`
42+
ParentHash string `json:"parent_hash"`
43+
}
44+
3945
type RawBlock = map[string]interface{}

internal/storage/clickhouse.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,121 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
758758
return traces, nil
759759
}
760760

761+
func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
762+
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
763+
if chainId.Sign() > 0 {
764+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
765+
}
766+
var blockNumberString string
767+
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
768+
if err != nil {
769+
return nil, err
770+
}
771+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
772+
if !ok {
773+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
774+
}
775+
return blockNumber, nil
776+
}
777+
778+
func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
779+
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String())
780+
err := c.conn.Exec(context.Background(), query)
781+
return err
782+
}
783+
784+
func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) {
785+
query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String())
786+
if chainId.Sign() > 0 {
787+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
788+
}
789+
query += getLimitClause(limit)
790+
791+
rows, err := c.conn.Query(context.Background(), query)
792+
if err != nil {
793+
return nil, err
794+
}
795+
defer rows.Close()
796+
797+
for rows.Next() {
798+
var blockHeader common.BlockHeader
799+
err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash)
800+
if err != nil {
801+
return nil, err
802+
}
803+
blockHeaders = append(blockHeaders, blockHeader)
804+
}
805+
return blockHeaders, nil
806+
}
807+
808+
func (c *ClickHouseConnector) DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error {
809+
var saveErr error
810+
var saveErrMutex sync.Mutex
811+
var wg sync.WaitGroup
812+
wg.Add(4)
813+
814+
go func() {
815+
defer wg.Done()
816+
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "chain_id, number, is_deleted"); err != nil {
817+
saveErrMutex.Lock()
818+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
819+
saveErrMutex.Unlock()
820+
}
821+
}()
822+
823+
go func() {
824+
defer wg.Done()
825+
if err := c.deleteBatch(chainId, blockNumbers, "logs", "chain_id, block_number, is_deleted"); err != nil {
826+
saveErrMutex.Lock()
827+
saveErr = fmt.Errorf("error deleting logs: %v", err)
828+
saveErrMutex.Unlock()
829+
}
830+
}()
831+
832+
go func() {
833+
defer wg.Done()
834+
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "chain_id, block_number, is_deleted"); err != nil {
835+
saveErrMutex.Lock()
836+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
837+
saveErrMutex.Unlock()
838+
}
839+
}()
840+
841+
go func() {
842+
defer wg.Done()
843+
if err := c.deleteBatch(chainId, blockNumbers, "traces", "chain_id, block_number, is_deleted"); err != nil {
844+
saveErrMutex.Lock()
845+
saveErr = fmt.Errorf("error deleting traces: %v", err)
846+
saveErrMutex.Unlock()
847+
}
848+
}()
849+
850+
wg.Wait()
851+
852+
if saveErr != nil {
853+
return saveErr
854+
}
855+
return nil
856+
}
857+
858+
func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, columns string) error {
859+
query := fmt.Sprintf("INSERT INTO %s.%s (%s)", c.cfg.Database, table, columns)
860+
batch, err := c.conn.PrepareBatch(context.Background(), query)
861+
if err != nil {
862+
return err
863+
}
864+
for _, blockNumber := range blockNumbers {
865+
err = batch.Append(
866+
chainId,
867+
blockNumber,
868+
1,
869+
)
870+
if err != nil {
871+
return err
872+
}
873+
}
874+
return batch.Send()
875+
}
761876

762877
// TODO make this atomic
763878
func (c *ClickHouseConnector) InsertDataForBlocks(data []common.BlockData) error {

internal/storage/connector.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type IOrchestratorStorage interface {
3838
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
3939
StoreBlockFailures(failures []common.BlockFailure) error
4040
DeleteBlockFailures(failures []common.BlockFailure) error
41+
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
42+
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
4143
}
4244

4345
type IStagingStorage interface {
@@ -55,6 +57,11 @@ type IMainStorage interface {
5557
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
5658
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5759
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
60+
/**
61+
* Get block headers ordered from latest to oldest.
62+
*/
63+
LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error)
64+
DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error
5865
}
5966

6067
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {

internal/storage/memory.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,24 @@ func traceAddressToString(traceAddress []uint64) string {
332332
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
333333
}
334334

335+
func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
336+
key := fmt.Sprintf("reorg_check:%s", chainId.String())
337+
value, ok := m.cache.Get(key)
338+
if !ok {
339+
return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String())
340+
}
341+
blockNumber, ok := new(big.Int).SetString(value, 10)
342+
if !ok {
343+
return nil, fmt.Errorf("failed to parse block number: %s", value)
344+
}
345+
return blockNumber, nil
346+
}
347+
348+
func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
349+
m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String())
350+
return nil
351+
}
352+
335353
func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error {
336354
blocks := make([]common.Block, 0, len(data))
337355
logs := make([]common.Log, 0)
@@ -360,3 +378,43 @@ func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error {
360378
return nil
361379
}
362380

381+
func (m *MemoryConnector) DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error {
382+
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
383+
for _, key := range m.cache.Keys() {
384+
shouldDelete := isKeyForBlock(key, fmt.Sprintf("block:%s:", chainId), blockNumbersToCheck) ||
385+
isKeyForBlock(key, fmt.Sprintf("log:%s:", chainId), blockNumbersToCheck) ||
386+
isKeyForBlock(key, fmt.Sprintf("transaction:%s:", chainId), blockNumbersToCheck) ||
387+
isKeyForBlock(key, fmt.Sprintf("trace:%s:", chainId), blockNumbersToCheck)
388+
if shouldDelete {
389+
m.cache.Remove(key)
390+
}
391+
}
392+
return nil
393+
}
394+
395+
func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) {
396+
blockHeaders := []common.BlockHeader{}
397+
for _, key := range m.cache.Keys() {
398+
if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) {
399+
blockNumberStr := strings.Split(key, ":")[2]
400+
blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10)
401+
if !ok {
402+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
403+
}
404+
if blockNumber.Cmp(lookbackStart) <= 0 {
405+
value, _ := m.cache.Get(key)
406+
block := common.Block{}
407+
err := json.Unmarshal([]byte(value), &block)
408+
if err != nil {
409+
return nil, err
410+
}
411+
blockHeaders = append(blockHeaders, common.BlockHeader{
412+
Number: blockNumber,
413+
Hash: block.Hash,
414+
ParentHash: block.ParentHash,
415+
})
416+
}
417+
}
418+
}
419+
return blockHeaders, nil
420+
}

internal/storage/redis.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math/big"
78

89
"github.com/go-redis/redis/v8"
910
"github.com/rs/zerolog/log"
@@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err
115116
}
116117
return nil
117118
}
119+
120+
func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
121+
ctx := context.Background()
122+
blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result()
123+
if err != nil {
124+
return nil, err
125+
}
126+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
127+
if !ok {
128+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
129+
}
130+
return blockNumber, nil
131+
}
132+
133+
func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
134+
ctx := context.Background()
135+
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
136+
return nil
137+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE cursors (
2+
`chain_id` UInt256,
3+
`cursor_type` String,
4+
`cursor_value` String,
5+
`insert_timestamp` DateTime DEFAULT now(),
6+
) ENGINE = ReplacingMergeTree(insert_timestamp)
7+
ORDER BY (chain_id, cursor_type);

internal/tools/clickhouse_create_staging_table.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ CREATE TABLE block_data (
66
`is_deleted` UInt8 DEFAULT 0,
77
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
88
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
9-
ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number)
9+
ORDER BY (chain_id, block_number)
1010
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

0 commit comments

Comments
 (0)