Skip to content

Commit debc231

Browse files
committed
migrate with destination storage
1 parent 51f1398 commit debc231

File tree

14 files changed

+462
-242
lines changed

14 files changed

+462
-242
lines changed

cmd/migrate_valid.go

Lines changed: 195 additions & 109 deletions
Large diffs are not rendered by default.

cmd/root.go

Lines changed: 88 additions & 5 deletions
Large diffs are not rendered by default.

configs/config.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ type StorageConfig struct {
5252
Main StorageConnectionConfig `mapstructure:"main"`
5353
Orchestrator StorageConnectionConfig `mapstructure:"orchestrator"`
5454
}
55-
type StorageType string
56-
57-
const (
58-
StorageTypeMain StorageType = "main"
59-
StorageTypeStaging StorageType = "staging"
60-
StorageTypeOrchestrator StorageType = "orchestrator"
61-
)
6255

6356
type StorageConnectionConfig struct {
6457
Type string `mapstructure:"type"` // "auto", "clickhouse", "postgres", "kafka", "badger", "s3"
@@ -116,6 +109,7 @@ type ClickhouseConfig struct {
116109
EnableParallelViewProcessing bool `mapstructure:"enableParallelViewProcessing"`
117110
MaxQueryTime int `mapstructure:"maxQueryTime"`
118111
MaxMemoryUsage int `mapstructure:"maxMemoryUsage"`
112+
EnableCompression bool `mapstructure:"enableCompression"`
119113
}
120114

121115
type PostgresConfig struct {
@@ -238,6 +232,14 @@ type ValidationConfig struct {
238232
Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict"
239233
}
240234

235+
type MigratorConfig struct {
236+
Destination StorageConnectionConfig `mapstructure:"destination"`
237+
StartBlock uint `mapstructure:"startBlock"`
238+
EndBlock uint `mapstructure:"endBlock"`
239+
StorageBatchSize uint `mapstructure:"storageBatchSize"`
240+
RpcBatchSize uint `mapstructure:"rpcBatchSize"`
241+
}
242+
241243
type Config struct {
242244
RPC RPCConfig `mapstructure:"rpc"`
243245
Log LogConfig `mapstructure:"log"`
@@ -250,6 +252,7 @@ type Config struct {
250252
Publisher PublisherConfig `mapstructure:"publisher"`
251253
WorkMode WorkModeConfig `mapstructure:"workMode"`
252254
Validation ValidationConfig `mapstructure:"validation"`
255+
Migrator MigratorConfig `mapstructure:"migrator"`
253256
}
254257

255258
var Cfg Config

internal/common/block.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ type BlockModel struct {
5959
}
6060

6161
type BlockData struct {
62-
ChainId uint64 `json:"chain_id"`
6362
Block Block `json:"block"`
6463
Transactions []Transaction `json:"transactions"`
6564
Logs []Log `json:"logs"`
@@ -103,7 +102,6 @@ func (b *Block) Serialize() BlockModel {
103102

104103
func (b *BlockData) Serialize() BlockData {
105104
data := BlockData{
106-
ChainId: b.ChainId,
107105
Block: b.Block,
108106
Transactions: b.Transactions,
109107
Logs: b.Logs,

internal/orchestrator/failure_recoverer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
110110
})
111111
} else {
112112
successfulResults = append(successfulResults, common.BlockData{
113-
ChainId: fr.rpc.GetChainID().Uint64(),
114113
Block: result.Data.Block,
115114
Logs: result.Data.Logs,
116115
Transactions: result.Data.Transactions,

internal/orchestrator/poller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult)
261261
blockData := make([]common.BlockData, 0, len(successfulResults))
262262
for _, result := range successfulResults {
263263
blockData = append(blockData, common.BlockData{
264-
ChainId: p.rpc.GetChainID().Uint64(),
265264
Block: result.Data.Block,
266265
Logs: result.Data.Logs,
267266
Transactions: result.Data.Transactions,

internal/orchestrator/reorg_handler.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,6 @@ func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*
274274
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
275275
}
276276
data = append(data, common.BlockData{
277-
ChainId: rh.rpc.GetChainID().Uint64(),
278277
Block: result.Data.Block,
279278
Logs: result.Data.Logs,
280279
Transactions: result.Data.Transactions,

internal/storage/badger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (bc *BadgerConnector) InsertStagingData(data []common.BlockData) error {
237237

238238
return bc.db.Update(func(txn *badger.Txn) error {
239239
for _, blockData := range data {
240-
key := blockKey(big.NewInt(int64(blockData.ChainId)), blockData.Block.Number)
240+
key := blockKey(blockData.Block.ChainId, blockData.Block.Number)
241241

242242
var buf bytes.Buffer
243243
if err := gob.NewEncoder(&buf).Encode(blockData); err != nil {
@@ -348,7 +348,7 @@ func (bc *BadgerConnector) DeleteStagingData(data []common.BlockData) error {
348348

349349
return bc.db.Update(func(txn *badger.Txn) error {
350350
for _, blockData := range data {
351-
key := blockKey(big.NewInt(int64(blockData.ChainId)), blockData.Block.Number)
351+
key := blockKey(blockData.Block.ChainId, blockData.Block.Number)
352352
if err := txn.Delete(key); err != nil && err != badger.ErrKeyNotFound {
353353
return err
354354
}

internal/storage/clickhouse.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
107107
},
108108
MaxOpenConns: cfg.MaxOpenConns,
109109
MaxIdleConns: cfg.MaxIdleConns,
110+
Compression: func() *clickhouse.Compression {
111+
c := &clickhouse.Compression{}
112+
if cfg.EnableCompression {
113+
zLog.Debug().Msg("ClickHouse LZ4 compression is enabled")
114+
c.Method = clickhouse.CompressionLZ4
115+
}
116+
return c
117+
}(),
110118
Settings: func() clickhouse.Settings {
111119
settings := clickhouse.Settings{
112120
"do_not_merge_across_partitions_select_final": "1",
@@ -901,6 +909,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBl
901909
return maxBlockNumber, nil
902910
}
903911

912+
func (c *ClickHouseConnector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error) {
913+
tableName := c.getTableName(chainId, "blocks")
914+
query := fmt.Sprintf("SELECT COUNT(DISTINCT block_number) FROM %s.%s WHERE chain_id = ? AND block_number >= ? AND block_number <= ?", c.cfg.Database, tableName)
915+
err = c.conn.QueryRow(context.Background(), query, chainId, startBlock, endBlock).Scan(&blockCount)
916+
if err != nil {
917+
if err == sql.ErrNoRows {
918+
return big.NewInt(0), nil
919+
}
920+
return nil, err
921+
}
922+
return blockCount, nil
923+
}
924+
904925
func (c *ClickHouseConnector) getMaxBlockNumberConsistent(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
905926
tableName := c.getTableName(chainId, "blocks")
906927
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1", c.cfg.Database, tableName)
@@ -1976,7 +1997,6 @@ func (c *ClickHouseConnector) GetValidationBlockData(chainId *big.Int, startBloc
19761997
for i, block := range blocksResult.blocks {
19771998
blockNum := block.Number.String()
19781999
blockData[i] = common.BlockData{
1979-
ChainId: chainId.Uint64(),
19802000
Block: block,
19812001
Logs: logsResult.logMap[blockNum],
19822002
Transactions: txsResult.txMap[blockNum],
@@ -2156,7 +2176,6 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
21562176
for i, block := range blocksResult.blocks {
21572177
blockNum := block.Number.String()
21582178
blockData[i] = common.BlockData{
2159-
ChainId: chainId.Uint64(),
21602179
Block: block,
21612180
Logs: logsResult.logMap[blockNum],
21622181
Transactions: txsResult.txMap[blockNum],

internal/storage/connector.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ type IMainStorage interface {
130130

131131
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
132132
GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error)
133+
GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error)
134+
133135
/**
134136
* Get block headers ordered from latest to oldest.
135137
*/
@@ -175,13 +177,13 @@ func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
175177
func NewConnector[T any](cfg *config.StorageConnectionConfig) (T, error) {
176178
var conn interface{}
177179
var err error
178-
180+
179181
// Default to "auto" if Type is not specified
180182
storageType := cfg.Type
181183
if storageType == "" {
182184
storageType = "auto"
183185
}
184-
186+
185187
// Handle explicit type selection
186188
if storageType != "auto" {
187189
switch storageType {

0 commit comments

Comments
 (0)