Skip to content

Commit 8768acd

Browse files
committed
backfill working
1 parent d196ead commit 8768acd

File tree

3 files changed

+103
-11
lines changed

3 files changed

+103
-11
lines changed

cmd/migrate_valid.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
181181
chain_id BIGINT NOT NULL,
182182
block_number BIGINT NOT NULL,
183183
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
184-
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
184+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
185185
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
186186
`
187187

@@ -242,7 +242,7 @@ func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
242242

243243
func (m *Migrator) getAbsStartAndEndBlock() (*big.Int, *big.Int) {
244244
// get latest block from main storage
245-
latestBlockStored, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpcClient.GetChainID())
245+
latestBlockStored, err := m.rpcClient.GetLatestBlockNumber(context.Background())
246246
if err != nil {
247247
log.Fatal().Err(err).Msg("Failed to get latest block from main storage")
248248
}
@@ -304,13 +304,14 @@ func (m *Migrator) GetMaxBlockNumberInRange(startBlock *big.Int, endBlock *big.I
304304

305305
// Get the maximum end_block for the given chain_id
306306
maxBlock, err := m.getMaxMigratedBlock(chainID, startBlock, endBlock)
307-
if err != nil {
308-
log.Warn().Err(err).Msg("Failed to get last migrated block, returning start block")
309-
return startBlock, err
307+
if err != nil || maxBlock.Cmp(startBlock) < 0 {
308+
log.Warn().Err(err).Msg("Failed to get last migrated block, returning start block - 1")
309+
// Return startBlock - 1 so that the next block to migrate is startBlock
310+
return new(big.Int).Sub(startBlock, big.NewInt(1)), nil
310311
}
311312

312-
// Return maxBlock + 1 as the next block to migrate
313-
return new(big.Int).Add(maxBlock, big.NewInt(1)), nil
313+
// Return the actual maxBlock (not +1) since this represents the last migrated block
314+
return maxBlock, nil
314315
}
315316

316317
// upsertMigratedBlockRange upserts a row for the given chain_id and block range

configs/kafka_config.yml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
rpc:
2+
url: https://eth.llamarpc.com
3+
chainId: "1"
4+
blockReceipts:
5+
enabled: true
6+
7+
log:
8+
level: debug
9+
prettify: true
10+
11+
poller:
12+
enabled: true
13+
interval: 2000
14+
blocksPerPoll: 100
15+
16+
committer:
17+
enabled: true
18+
interval: 2000
19+
blocksPerCommit: 100
20+
21+
storage:
22+
main:
23+
clickhouse:
24+
host: localhost
25+
port: 9440
26+
username: admin
27+
password: password
28+
database: default
29+
disableTLS: true
30+
asyncInsert: true
31+
maxRowsPerInsert: 1000
32+
maxOpenConns: 50
33+
maxIdleConns: 10
34+
35+
staging:
36+
postgres:
37+
host: localhost
38+
port: 5432
39+
username: admin
40+
password: password
41+
database: insight
42+
sslMode: disable
43+
maxOpenConns: 50
44+
maxIdleConns: 10
45+
maxConnLifetime: 300
46+
connectTimeout: 10
47+
48+
orchestrator:
49+
postgres:
50+
host: localhost
51+
port: 5432
52+
username: admin
53+
password: password
54+
database: insight
55+
sslMode: disable
56+
maxOpenConns: 50
57+
maxIdleConns: 10
58+
maxConnLifetime: 300
59+
connectTimeout: 10
60+
61+
api:
62+
host: localhost:3000
63+
basicAuth:
64+
username: admin
65+
password: admin
66+
67+
publisher:
68+
enabled: false
69+
mode: default
70+
71+
# New Kafka configuration
72+
newKafka:
73+
brokers: "localhost:9092"
74+
username: ""
75+
password: ""
76+
77+
validation:
78+
mode: minimal
79+
80+
# Work mode configuration - Controls system behavior based on blockchain state
81+
workMode:
82+
# Interval in minutes to check if system should switch between live/historical mode
83+
checkIntervalMinutes: 10
84+
# Block number threshold to determine if system is in "live mode" (near chain head)
85+
# Setting this very high forces backfill mode for testing
86+
liveModeThreshold: 1000000

internal/storage/clickhouse.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,6 +2016,11 @@ func (c *ClickHouseConnector) FindMissingBlockNumbers(chainId *big.Int, startBlo
20162016
}
20172017

20182018
func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error) {
2019+
// For migration purposes, we don't need ForceConsistentData which causes FINAL keyword issues
2020+
return c.GetFullBlockDataWithOptions(chainId, blockNumbers, true)
2021+
}
2022+
2023+
func (c *ClickHouseConnector) GetFullBlockDataWithOptions(chainId *big.Int, blockNumbers []*big.Int, forceConsistentData bool) (blocks []common.BlockData, err error) {
20192024
// Get blocks, logs and transactions concurrently
20202025
type blockResult struct {
20212026
blocks []common.Block
@@ -2047,7 +2052,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
20472052
blocksResult, err := c.GetBlocks(QueryFilter{
20482053
ChainId: chainId,
20492054
BlockNumbers: blockNumbers,
2050-
ForceConsistentData: true,
2055+
ForceConsistentData: forceConsistentData,
20512056
})
20522057
blocksChan <- blockResult{blocks: blocksResult.Data, err: err}
20532058
}()
@@ -2056,7 +2061,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
20562061
logsResult, err := c.GetLogs(QueryFilter{
20572062
ChainId: chainId,
20582063
BlockNumbers: blockNumbers,
2059-
ForceConsistentData: true,
2064+
ForceConsistentData: forceConsistentData,
20602065
})
20612066
if err != nil {
20622067
logsChan <- logResult{err: err}
@@ -2076,7 +2081,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
20762081
transactionsResult, err := c.GetTransactions(QueryFilter{
20772082
ChainId: chainId,
20782083
BlockNumbers: blockNumbers,
2079-
ForceConsistentData: true,
2084+
ForceConsistentData: forceConsistentData,
20802085
})
20812086
if err != nil {
20822087
txsChan <- txResult{err: err}
@@ -2096,7 +2101,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
20962101
tracesResult, err := c.GetTraces(QueryFilter{
20972102
ChainId: chainId,
20982103
BlockNumbers: blockNumbers,
2099-
ForceConsistentData: true,
2104+
ForceConsistentData: forceConsistentData,
21002105
})
21012106
if err != nil {
21022107
tracesChan <- traceResult{err: err}

0 commit comments

Comments
 (0)