Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,6 @@ type Config struct {
ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"`
ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"`
ZeetClusterId string `env:"ZEET_CLUSTER_ID"`
OldClickhouseDatabaseV1 string `env:"OLD_CLICKHOUSE_DATABASE_V1"`
OldClickhouseHostV1 string `env:"OLD_CLICKHOUSE_HOST_V1"`
OldClickhousePortV1 int `env:"OLD_CLICKHOUSE_PORT_V1"`
OldClickhouseUsernameV1 string `env:"OLD_CLICKHOUSE_USERNAME_V1"`
OldClickhousePasswordV1 string `env:"OLD_CLICKHOUSE_PASSWORD_V1"`
OldClickhouseEnableTLSV1 bool `env:"OLD_CLICKHOUSE_ENABLE_TLS_V1" envDefault:"true"`
CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
Expand Down
1 change: 0 additions & 1 deletion internal/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var blockdataChannel = make(chan []*common.BlockData, config.Cfg.RPCNumParallelC
var avgMemoryPerBlockChannel = make(chan int, 1)

func Init() {
libs.InitOldClickHouseV1()
libs.InitS3()
libs.InitRPCClient()
InitParquetWriter()
Expand Down
49 changes: 19 additions & 30 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@ var ClickhouseConnV1 clickhouse.Conn
// use this for new current states and query
var ClickhouseConnV2 clickhouse.Conn

func InitOldClickHouseV1() {
ClickhouseConnV1 = initClickhouse(
config.Cfg.OldClickhouseHostV1,
config.Cfg.OldClickhousePortV1,
config.Cfg.OldClickhouseUsernameV1,
config.Cfg.OldClickhousePasswordV1,
config.Cfg.OldClickhouseDatabaseV1,
config.Cfg.OldClickhouseEnableTLSV1,
)
}

// This is a new clickhouse where data will be inserted into.
// All user queries will be done against this clickhouse.
func InitNewClickHouseV2() {
Expand Down Expand Up @@ -126,7 +115,7 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) {
return maxBlockNumber, nil
}

func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) {
func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) {
length := endBlockNumber - startBlockNumber + 1

blockData := make([]*common.BlockData, length)
Expand All @@ -139,22 +128,22 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl
wg.Add(4)
go func() {
defer wg.Done()
blocksRaw, _ = getBlocksFromV1(chainId, startBlockNumber, endBlockNumber)
blocksRaw, _ = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
}()

go func() {
defer wg.Done()
transactionsRaw, _ = getTransactionsFromV1(chainId, startBlockNumber, endBlockNumber)
transactionsRaw, _ = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber)
}()

go func() {
defer wg.Done()
logsRaw, _ = getLogsFromV1(chainId, startBlockNumber, endBlockNumber)
logsRaw, _ = getLogsFromV2(chainId, startBlockNumber, endBlockNumber)
}()

go func() {
defer wg.Done()
tracesRaw, _ = getTracesFromV1(chainId, startBlockNumber, endBlockNumber)
tracesRaw, _ = getTracesFromV2(chainId, startBlockNumber, endBlockNumber)
}()
wg.Wait()

Expand Down Expand Up @@ -189,19 +178,19 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl
return blockData, nil
}

func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
blocksRaw := make([]common.Block, length)

query := fmt.Sprintf("SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number",
strings.Join(defaultBlockFields, ", "),
config.Cfg.OldClickhouseDatabaseV1,
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)
blocks, err := execQueryV1[common.Block](query)
blocks, err := execQueryV2[common.Block](query)
if err != nil {
return blocksRaw, err
}
Expand All @@ -218,19 +207,19 @@ func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin
return blocksRaw, nil
}

func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) {
func getTransactionsFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
transactionsRaw := make([][]common.Transaction, length)

query := fmt.Sprintf("SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number, transaction_index",
strings.Join(defaultTransactionFields, ", "),
config.Cfg.OldClickhouseDatabaseV1,
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)
transactions, err := execQueryV1[common.Transaction](query)
transactions, err := execQueryV2[common.Transaction](query)
if err != nil {
return transactionsRaw, err
}
Expand All @@ -247,19 +236,19 @@ func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumb
return transactionsRaw, nil
}

func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) {
func getLogsFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
logsRaw := make([][]common.Log, length)

query := fmt.Sprintf("SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number, log_index",
strings.Join(defaultLogFields, ", "),
config.Cfg.OldClickhouseDatabaseV1,
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)
logs, err := execQueryV1[common.Log](query)
logs, err := execQueryV2[common.Log](query)
if err != nil {
return logsRaw, err
}
Expand All @@ -276,19 +265,19 @@ func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint6
return logsRaw, nil
}

func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) {
func getTracesFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
tracesRaw := make([][]common.Trace, length)

query := fmt.Sprintf("SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number",
strings.Join(defaultTraceFields, ", "),
config.Cfg.OldClickhouseDatabaseV1,
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)
traces, err := execQueryV1[common.Trace](query)
traces, err := execQueryV2[common.Trace](query)
if err != nil {
return tracesRaw, err
}
Expand All @@ -305,9 +294,9 @@ func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin
return tracesRaw, nil
}

func execQueryV1[T any](query string) ([]T, error) {
func execQueryV2[T any](query string) ([]T, error) {
var out []T
if err := ClickhouseConnV1.Select(context.Background(), &out, query); err != nil {
if err := ClickhouseConnV2.Select(context.Background(), &out, query); err != nil {
return nil, err
}
return out, nil
Expand Down
29 changes: 1 addition & 28 deletions internal/libs/libblockdata/getblockdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,10 @@ func GetValidBlockDataForRange(startBlockNumber uint64, endBlockNumber uint64) [
length := endBlockNumber - startBlockNumber + 1
validBlockData := make([]*common.BlockData, length)

clickhouseBlockData := getValidBlockDataFromClickhouseV1(startBlockNumber, endBlockNumber)
missingBlockNumbers := make([]uint64, 0)
for i := range validBlockData {
bn := startBlockNumber + uint64(i)
if clickhouseBlockData[i] == nil || bn != clickhouseBlockData[i].Block.Number.Uint64() {
missingBlockNumbers = append(missingBlockNumbers, bn)
log.Debug().
Uint64("start_block", startBlockNumber).
Uint64("end_block", endBlockNumber).
Uint64("block_number", bn).
Msg("Missing block data from clickhouse")
continue
}

validBlockData[i] = clickhouseBlockData[i]
clickhouseBlockData[i] = nil // clear out duplicate memory
missingBlockNumbers = append(missingBlockNumbers, bn)
}

if len(missingBlockNumbers) == 0 {
Expand Down Expand Up @@ -150,21 +138,6 @@ func GetValidBlockDataForRange(startBlockNumber uint64, endBlockNumber uint64) [
return validBlockData
}

func getValidBlockDataFromClickhouseV1(startBlockNumber uint64, endBlockNumber uint64) []*common.BlockData {
blockData, err := libs.GetBlockDataFromClickHouseV1(libs.ChainId.Uint64(), startBlockNumber, endBlockNumber)

if err != nil {
log.Panic().Err(err).Msg("Failed to get block data from ClickHouseV1")
}

for i, block := range blockData {
if isValid, _ := Validate(block); !isValid {
blockData[i] = nil
}
}
return blockData
}

func GetValidBlockDataFromRpc(blockNumbers []uint64) []*common.BlockData {
rpcBatchSize := config.Cfg.RPCBatchSize
totalBlocks := len(blockNumbers)
Expand Down
Loading