Skip to content

Commit b591820

Browse files
committed
create new storage functions for handling reorgs
1 parent b11b079 commit b591820

File tree

7 files changed

+223
-1
lines changed

7 files changed

+223
-1
lines changed

internal/common/block.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ type BlockData struct {
3636
Traces []Trace
3737
}
3838

39+
type BlockHeader struct {
40+
Number *big.Int `json:"number"`
41+
Hash string `json:"hash"`
42+
ParentHash string `json:"parent_hash"`
43+
}
44+
3945
type RawBlock = map[string]interface{}

internal/storage/clickhouse.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,122 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
758758
return traces, nil
759759
}
760760

761+
func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
762+
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
763+
if chainId.Sign() > 0 {
764+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
765+
}
766+
var blockNumberString string
767+
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
768+
if err != nil {
769+
return nil, err
770+
}
771+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
772+
if !ok {
773+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
774+
}
775+
return blockNumber, nil
776+
}
777+
778+
func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
779+
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String())
780+
err := c.conn.Exec(context.Background(), query)
781+
return err
782+
}
783+
784+
func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) {
785+
query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String())
786+
if chainId.Sign() > 0 {
787+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
788+
}
789+
query += getLimitClause(limit)
790+
791+
rows, err := c.conn.Query(context.Background(), query)
792+
if err != nil {
793+
return nil, err
794+
}
795+
defer rows.Close()
796+
797+
for rows.Next() {
798+
var blockHeader common.BlockHeader
799+
err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash)
800+
if err != nil {
801+
return nil, err
802+
}
803+
blockHeaders = append(blockHeaders, blockHeader)
804+
}
805+
return blockHeaders, nil
806+
}
807+
808+
func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
809+
var saveErr error
810+
var saveErrMutex sync.Mutex
811+
var wg sync.WaitGroup
812+
wg.Add(4)
813+
814+
go func() {
815+
defer wg.Done()
816+
if err := c.deleteBatch(chainId, blockNumbers, "blocks", "chain_id, number, is_deleted"); err != nil {
817+
saveErrMutex.Lock()
818+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
819+
saveErrMutex.Unlock()
820+
}
821+
}()
822+
823+
go func() {
824+
defer wg.Done()
825+
if err := c.deleteBatch(chainId, blockNumbers, "logs", "chain_id, block_number, is_deleted"); err != nil {
826+
saveErrMutex.Lock()
827+
saveErr = fmt.Errorf("error deleting logs: %v", err)
828+
saveErrMutex.Unlock()
829+
}
830+
}()
831+
832+
go func() {
833+
defer wg.Done()
834+
if err := c.deleteBatch(chainId, blockNumbers, "transactions", "chain_id, block_number, is_deleted"); err != nil {
835+
saveErrMutex.Lock()
836+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
837+
saveErrMutex.Unlock()
838+
}
839+
}()
840+
841+
go func() {
842+
defer wg.Done()
843+
if err := c.deleteBatch(chainId, blockNumbers, "traces", "chain_id, block_number, is_deleted"); err != nil {
844+
saveErrMutex.Lock()
845+
saveErr = fmt.Errorf("error deleting traces: %v", err)
846+
saveErrMutex.Unlock()
847+
}
848+
}()
849+
850+
wg.Wait()
851+
852+
if saveErr != nil {
853+
return saveErr
854+
}
855+
return nil
856+
}
857+
858+
func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, columns string) error {
859+
query := fmt.Sprintf("INSERT INTO %s.%s (%s)", c.cfg.Database, table, columns)
860+
batch, err := c.conn.PrepareBatch(context.Background(), query)
861+
if err != nil {
862+
return err
863+
}
864+
for _, blockNumber := range blockNumbers {
865+
err = batch.Append(
866+
chainId,
867+
blockNumber,
868+
1,
869+
)
870+
if err != nil {
871+
return err
872+
}
873+
}
874+
return batch.Send()
875+
}
876+
761877
// TODO make this atomic
762878
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
763879
blocks := make([]common.Block, 0, len(*data))

internal/storage/connector.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type IOrchestratorStorage interface {
3838
GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
3939
StoreBlockFailures(failures []common.BlockFailure) error
4040
DeleteBlockFailures(failures []common.BlockFailure) error
41+
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
42+
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
4143
}
4244

4345
type IStagingStorage interface {
@@ -55,6 +57,11 @@ type IMainStorage interface {
5557
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
5658
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5759
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
60+
/**
61+
* Get block headers ordered from latest to oldest.
62+
*/
63+
LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error)
64+
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error
5865
}
5966

6067
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {

internal/storage/memory.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,15 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
218218
return maxBlockNumber, nil
219219
}
220220

221+
func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool {
222+
for _, prefix := range prefixes {
223+
if isKeyForBlock(key, prefix, blocksFilter) {
224+
return true
225+
}
226+
}
227+
return false
228+
}
229+
221230
func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool {
222231
if !strings.HasPrefix(key, prefix) {
223232
return false
@@ -332,6 +341,24 @@ func traceAddressToString(traceAddress []uint64) string {
332341
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
333342
}
334343

344+
func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
345+
key := fmt.Sprintf("reorg_check:%s", chainId.String())
346+
value, ok := m.cache.Get(key)
347+
if !ok {
348+
return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String())
349+
}
350+
blockNumber, ok := new(big.Int).SetString(value, 10)
351+
if !ok {
352+
return nil, fmt.Errorf("failed to parse block number: %s", value)
353+
}
354+
return blockNumber, nil
355+
}
356+
357+
func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
358+
m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String())
359+
return nil
360+
}
361+
335362
func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
336363
blocks := make([]common.Block, 0, len(*data))
337364
logs := make([]common.Log, 0)
@@ -359,3 +386,42 @@ func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error {
359386
}
360387
return nil
361388
}
389+
390+
func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error {
391+
blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers})
392+
for _, key := range m.cache.Keys() {
393+
prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())}
394+
shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck)
395+
if shouldDelete {
396+
m.cache.Remove(key)
397+
}
398+
}
399+
return nil
400+
}
401+
402+
func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) {
403+
blockHeaders := []common.BlockHeader{}
404+
for _, key := range m.cache.Keys() {
405+
if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) {
406+
blockNumberStr := strings.Split(key, ":")[2]
407+
blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10)
408+
if !ok {
409+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
410+
}
411+
if blockNumber.Cmp(lookbackStart) <= 0 {
412+
value, _ := m.cache.Get(key)
413+
block := common.Block{}
414+
err := json.Unmarshal([]byte(value), &block)
415+
if err != nil {
416+
return nil, err
417+
}
418+
blockHeaders = append(blockHeaders, common.BlockHeader{
419+
Number: blockNumber,
420+
Hash: block.Hash,
421+
ParentHash: block.ParentHash,
422+
})
423+
}
424+
}
425+
}
426+
return blockHeaders, nil
427+
}

internal/storage/redis.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math/big"
78

89
"github.com/go-redis/redis/v8"
910
"github.com/rs/zerolog/log"
@@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err
115116
}
116117
return nil
117118
}
119+
120+
func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
121+
ctx := context.Background()
122+
blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result()
123+
if err != nil {
124+
return nil, err
125+
}
126+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
127+
if !ok {
128+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
129+
}
130+
return blockNumber, nil
131+
}
132+
133+
func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
134+
ctx := context.Background()
135+
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
136+
return nil
137+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE cursors (
2+
`chain_id` UInt256,
3+
`cursor_type` String,
4+
`cursor_value` String,
5+
`insert_timestamp` DateTime DEFAULT now(),
6+
) ENGINE = ReplacingMergeTree(insert_timestamp)
7+
ORDER BY (chain_id, cursor_type);

internal/tools/clickhouse_create_staging_table.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ CREATE TABLE block_data (
66
`is_deleted` UInt8 DEFAULT 0,
77
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
88
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
9-
ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number)
9+
ORDER BY (chain_id, block_number)
1010
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;

0 commit comments

Comments
 (0)