Skip to content

Commit 0215476

Browse files
committed
validationMigration publish to new kafka
1 parent 9ca9e74 commit 0215476

File tree

4 files changed

+547
-11
lines changed

4 files changed

+547
-11
lines changed

cmd/migrate_valid.go

Lines changed: 179 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cmd
22

33
import (
44
"context"
5+
"fmt"
56
"math/big"
67
"os"
78
"strconv"
@@ -11,6 +12,7 @@ import (
1112
config "github.com/thirdweb-dev/indexer/configs"
1213
"github.com/thirdweb-dev/indexer/internal/common"
1314
"github.com/thirdweb-dev/indexer/internal/orchestrator"
15+
"github.com/thirdweb-dev/indexer/internal/publisher/newkafka"
1416
"github.com/thirdweb-dev/indexer/internal/rpc"
1517
"github.com/thirdweb-dev/indexer/internal/storage"
1618
)
@@ -36,6 +38,9 @@ func RunValidationMigration(cmd *cobra.Command, args []string) {
3638
migrator := NewMigrator()
3739
defer migrator.Close()
3840

41+
// get absolute start and end block for the migration. eg, 0-10M or 10M-20M
42+
absStartBlock, absEndBlock := migrator.getAbsStartAndEndBlock()
43+
3944
rangeStartBlock, rangeEndBlock := migrator.DetermineMigrationBoundaries()
4045

4146
log.Info().Msgf("Migrating blocks from %s to %s (both ends inclusive)", rangeStartBlock.String(), rangeEndBlock.String())
@@ -75,9 +80,14 @@ func RunValidationMigration(cmd *cobra.Command, args []string) {
7580
blocksToInsert = append(blocksToInsert, blockData)
7681
}
7782

78-
err := migrator.targetConn.InsertBlockData(blocksToInsert)
83+
err := migrator.newkafka.PublishBlockData(blocksToInsert)
7984
if err != nil {
80-
log.Fatal().Err(err).Msg("Failed to insert blocks to target storage")
85+
log.Fatal().Err(err).Msg("Failed to publish block data")
86+
}
87+
88+
err = migrator.UpdateMigratedBlock(absStartBlock, absEndBlock, blocksToInsert)
89+
if err != nil {
90+
log.Fatal().Err(err).Msg("Failed to update migrated block range")
8191
}
8292

8393
currentBlock = new(big.Int).Add(endBlock, big.NewInt(1))
@@ -94,6 +104,8 @@ type Migrator struct {
94104
targetConn *storage.ClickHouseConnector
95105
migrationBatchSize int
96106
rpcBatchSize int
107+
newkafka *newkafka.Publisher
108+
psql *storage.PostgresConnector
97109
}
98110

99111
func NewMigrator() *Migrator {
@@ -138,21 +150,97 @@ func NewMigrator() *Migrator {
138150
log.Fatal().Err(err).Msg("Failed to initialize target storage")
139151
}
140152

153+
// publish to new kafka stream i.e new clickhouse database
154+
newpublisher := newkafka.GetInstance()
155+
156+
// psql cursor for new kafka
157+
psql, err := storage.NewPostgresConnector(config.Cfg.Storage.Main.Postgres)
158+
if err != nil {
159+
log.Fatal().Err(err).Msg("Failed to initialize psql cursor")
160+
}
161+
162+
// Create migrated_block_ranges table if it doesn't exist
163+
createMigratedBlockRangesTable(psql)
164+
141165
return &Migrator{
142166
migrationBatchSize: batchSize,
143167
rpcBatchSize: rpcBatchSize,
144168
rpcClient: rpcClient,
145169
storage: s,
146170
validator: validator,
147171
targetConn: targetConn,
172+
newkafka: newpublisher,
173+
psql: psql,
174+
}
175+
}
176+
177+
// createMigratedBlockRangesTable creates the migrated_block_ranges table if it doesn't exist
178+
func createMigratedBlockRangesTable(psql *storage.PostgresConnector) {
179+
createTableSQL := `
180+
CREATE TABLE IF NOT EXISTS migrated_block_ranges (
181+
chain_id BIGINT NOT NULL,
182+
block_number BIGINT NOT NULL,
183+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
184+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
185+
) WITH (fillfactor = 80, autovacuum_vacuum_scale_factor = 0.1, autovacuum_analyze_scale_factor = 0.05)
186+
`
187+
188+
// Execute the CREATE TABLE statement
189+
_, err := psql.ExecRaw(createTableSQL)
190+
if err != nil {
191+
log.Warn().Err(err).Msg("Failed to create migrated_block_ranges table")
192+
}
193+
194+
// Create index if it doesn't exist
195+
createIndexSQL := `
196+
CREATE INDEX IF NOT EXISTS idx_migrated_block_ranges_chain_block
197+
ON migrated_block_ranges(chain_id, block_number DESC)
198+
`
199+
_, err = psql.ExecRaw(createIndexSQL)
200+
if err != nil {
201+
log.Warn().Err(err).Msg("Failed to create index on migrated_block_ranges table")
202+
}
203+
204+
// Create trigger if it doesn't exist
205+
createTriggerSQL := `
206+
CREATE TRIGGER update_migrated_block_ranges_updated_at
207+
BEFORE UPDATE ON migrated_block_ranges
208+
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column()
209+
`
210+
_, err = psql.ExecRaw(createTriggerSQL)
211+
if err != nil {
212+
log.Warn().Err(err).Msg("Failed to create trigger on migrated_block_ranges table")
148213
}
149214
}
150215

151216
func (m *Migrator) Close() {
152217
m.rpcClient.Close()
218+
m.newkafka.Close()
219+
m.psql.Close()
153220
}
154221

155222
func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
223+
startBlock, endBlock := m.getAbsStartAndEndBlock()
224+
225+
latestMigratedBlock, err := m.GetMaxBlockNumberInRange(startBlock, endBlock)
226+
if err != nil {
227+
log.Fatal().Err(err).Msg("Failed to get latest block from target storage")
228+
}
229+
log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)
230+
231+
if latestMigratedBlock.Cmp(endBlock) >= 0 {
232+
log.Fatal().Msgf("Full range is already migrated")
233+
}
234+
235+
// if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1
236+
if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 {
237+
startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1))
238+
}
239+
240+
return startBlock, endBlock
241+
}
242+
243+
func (m *Migrator) getAbsStartAndEndBlock() (*big.Int, *big.Int) {
156244
// get latest block from main storage
157245
latestBlockStored, err := m.storage.MainStorage.GetMaxBlockNumber(m.rpcClient.GetChainID())
158246
if err != nil {
@@ -187,22 +275,102 @@ func (m *Migrator) DetermineMigrationBoundaries() (*big.Int, *big.Int) {
187275
startBlock = configuredStartBlock
188276
}
189277

190-
latestMigratedBlock, err := m.targetConn.GetMaxBlockNumberInRange(m.rpcClient.GetChainID(), startBlock, endBlock)
278+
return startBlock, endBlock
279+
}
280+
281+
func (m *Migrator) UpdateMigratedBlock(startBlock *big.Int, endBlock *big.Int, blockData []common.BlockData) error {
282+
if len(blockData) == 0 {
283+
return nil
284+
}
285+
286+
maxBlockNumber := big.NewInt(0)
287+
for _, block := range blockData {
288+
if block.Block.Number.Cmp(maxBlockNumber) > 0 {
289+
maxBlockNumber = block.Block.Number
290+
}
291+
}
292+
293+
chainID := blockData[0].Block.ChainId
294+
err := m.upsertMigratedBlockRange(chainID, maxBlockNumber, startBlock, endBlock)
191295
if err != nil {
192-
log.Fatal().Err(err).Msg("Failed to get latest block from target storage")
296+
return fmt.Errorf("failed to update migrated block range: %w", err)
193297
}
194-
log.Info().Msgf("Latest block in target storage: %d", latestMigratedBlock)
298+
return nil
299+
}
195300

196-
if latestMigratedBlock.Cmp(endBlock) >= 0 {
197-
log.Fatal().Msgf("Full range is already migrated")
301+
func (m *Migrator) GetMaxBlockNumberInRange(startBlock *big.Int, endBlock *big.Int) (*big.Int, error) {
302+
// Get chain ID from RPC client
303+
chainID := m.rpcClient.GetChainID()
304+
305+
// Get the maximum end_block for the given chain_id
306+
maxBlock, err := m.getMaxMigratedBlock(chainID, startBlock, endBlock)
307+
if err != nil {
308+
log.Warn().Err(err).Msg("Failed to get last migrated block, returning start block")
309+
return startBlock, err
198310
}
199311

200-
// if configured start block is less than or equal to already migrated and migrated block is not 0, start from last migrated + 1
201-
if startBlock.Cmp(latestMigratedBlock) <= 0 && latestMigratedBlock.Sign() > 0 {
202-
startBlock = new(big.Int).Add(latestMigratedBlock, big.NewInt(1))
312+
// Return maxBlock + 1 as the next block to migrate
313+
return new(big.Int).Add(maxBlock, big.NewInt(1)), nil
314+
}
315+
316+
// upsertMigratedBlockRange upserts a row for the given chain_id and block range
317+
func (m *Migrator) upsertMigratedBlockRange(chainID, blockNumber, startBlock, endBlock *big.Int) error {
318+
// First, try to update existing rows that overlap with this range
319+
updateSQL := `
320+
UPDATE migrated_block_ranges
321+
SET block_number = $1, updated_at = CURRENT_TIMESTAMP
322+
WHERE chain_id = $2 AND block_number >= $3 AND block_number <= $4
323+
`
324+
325+
result, err := m.psql.ExecRaw(updateSQL, blockNumber.String(), chainID.String(), startBlock.String(), endBlock.String())
326+
if err != nil {
327+
return fmt.Errorf("failed to update migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
203328
}
204329

205-
return startBlock, endBlock
330+
// Check if any rows were updated
331+
rowsAffected, err := result.RowsAffected()
332+
if err != nil {
333+
log.Warn().Err(err).Msgf("Failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
334+
return fmt.Errorf("failed to get rows affected for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
335+
}
336+
337+
// If no rows were updated, insert a new row
338+
if rowsAffected == 0 {
339+
insertSQL := `
340+
INSERT INTO migrated_block_ranges (chain_id, block_number, created_at, updated_at)
341+
VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
342+
`
343+
344+
_, err := m.psql.ExecRaw(insertSQL, chainID.String(), blockNumber.String())
345+
if err != nil {
346+
return fmt.Errorf("failed to insert migrated block range for chain %s, range %s-%s", chainID.String(), startBlock.String(), endBlock.String())
347+
}
348+
}
349+
return nil
350+
}
351+
352+
// getMaxMigratedBlock gets the maximum block number within the given range for the given chain_id
353+
func (m *Migrator) getMaxMigratedBlock(chainID, startBlock, endBlock *big.Int) (*big.Int, error) {
354+
querySQL := `
355+
SELECT COALESCE(MAX(block_number), 0) as max_block
356+
FROM migrated_block_ranges
357+
WHERE chain_id = $1
358+
AND block_number >= $2
359+
AND block_number <= $3
360+
`
361+
362+
var maxBlockStr string
363+
err := m.psql.QueryRowRaw(querySQL, chainID.String(), startBlock.String(), endBlock.String()).Scan(&maxBlockStr)
364+
if err != nil {
365+
return nil, fmt.Errorf("failed to query migrated block ranges: %w", err)
366+
}
367+
368+
maxBlock, ok := new(big.Int).SetString(maxBlockStr, 10)
369+
if !ok {
370+
return nil, fmt.Errorf("failed to parse block number: %s", maxBlockStr)
371+
}
372+
373+
return maxBlock, nil
206374
}
207375

208376
func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockData, error) {

0 commit comments

Comments
 (0)