diff --git a/configs/config.go b/configs/config.go index 3740089..691fb47 100644 --- a/configs/config.go +++ b/configs/config.go @@ -277,12 +277,6 @@ type Config struct { ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"` ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"` ZeetClusterId string `env:"ZEET_CLUSTER_ID"` - OldClickhouseDatabaseV1 string `env:"OLD_CLICKHOUSE_DATABASE_V1"` - OldClickhouseHostV1 string `env:"OLD_CLICKHOUSE_HOST_V1"` - OldClickhousePortV1 int `env:"OLD_CLICKHOUSE_PORT_V1"` - OldClickhouseUsernameV1 string `env:"OLD_CLICKHOUSE_USERNAME_V1"` - OldClickhousePasswordV1 string `env:"OLD_CLICKHOUSE_PASSWORD_V1"` - OldClickhouseEnableTLSV1 bool `env:"OLD_CLICKHOUSE_ENABLE_TLS_V1" envDefault:"true"` CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` diff --git a/internal/backfill/backfill.go b/internal/backfill/backfill.go index cba5f57..f59e3df 100644 --- a/internal/backfill/backfill.go +++ b/internal/backfill/backfill.go @@ -15,7 +15,6 @@ var blockdataChannel = make(chan []*common.BlockData, config.Cfg.RPCNumParallelC var avgMemoryPerBlockChannel = make(chan int, 1) func Init() { - libs.InitOldClickHouseV1() libs.InitS3() libs.InitRPCClient() InitParquetWriter() diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index f377c97..8eb2e5d 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -50,17 +50,6 @@ var ClickhouseConnV1 clickhouse.Conn // use this for new current states and query var ClickhouseConnV2 clickhouse.Conn -func InitOldClickHouseV1() { - ClickhouseConnV1 = initClickhouse( - config.Cfg.OldClickhouseHostV1, - config.Cfg.OldClickhousePortV1, - config.Cfg.OldClickhouseUsernameV1, - config.Cfg.OldClickhousePasswordV1, - config.Cfg.OldClickhouseDatabaseV1, - config.Cfg.OldClickhouseEnableTLSV1, - ) -} - // This is a new clickhouse where data will be inserted into. // All user queries will be done against this clickhouse. func InitNewClickHouseV2() { @@ -126,7 +115,7 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) { return maxBlockNumber, nil } -func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { +func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { length := endBlockNumber - startBlockNumber + 1 blockData := make([]*common.BlockData, length) @@ -139,22 +128,22 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl wg.Add(4) go func() { defer wg.Done() - blocksRaw, _ = getBlocksFromV1(chainId, startBlockNumber, endBlockNumber) + blocksRaw, _ = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - transactionsRaw, _ = getTransactionsFromV1(chainId, startBlockNumber, endBlockNumber) + transactionsRaw, _ = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - logsRaw, _ = getLogsFromV1(chainId, startBlockNumber, endBlockNumber) + logsRaw, _ = getLogsFromV2(chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - tracesRaw, _ = getTracesFromV1(chainId, startBlockNumber, endBlockNumber) + tracesRaw, _ = getTracesFromV2(chainId, startBlockNumber, endBlockNumber) }() wg.Wait() @@ -189,19 +178,19 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } -func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { +func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 blocksRaw := make([]common.Block, length) query := fmt.Sprintf("SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number", strings.Join(defaultBlockFields, ", "), - config.Cfg.OldClickhouseDatabaseV1, + config.Cfg.CommitterClickhouseDatabase, chainId, startBlockNumber, endBlockNumber, ) - blocks, err := execQueryV1[common.Block](query) + blocks, err := execQueryV2[common.Block](query) if err != nil { return blocksRaw, err } @@ -218,19 +207,19 @@ func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin return blocksRaw, nil } -func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) { +func getTransactionsFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 transactionsRaw := make([][]common.Transaction, length) query := fmt.Sprintf("SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number, transaction_index", strings.Join(defaultTransactionFields, ", "), - config.Cfg.OldClickhouseDatabaseV1, + config.Cfg.CommitterClickhouseDatabase, chainId, startBlockNumber, endBlockNumber, ) - transactions, err := execQueryV1[common.Transaction](query) + transactions, err := execQueryV2[common.Transaction](query) if err != nil { return transactionsRaw, err } @@ -247,19 +236,19 @@ func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumb return transactionsRaw, nil } -func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) { +func getLogsFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 logsRaw := make([][]common.Log, length) query := fmt.Sprintf("SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number, log_index", strings.Join(defaultLogFields, ", "), - config.Cfg.OldClickhouseDatabaseV1, + config.Cfg.CommitterClickhouseDatabase, chainId, startBlockNumber, endBlockNumber, ) - logs, err := execQueryV1[common.Log](query) + logs, err := execQueryV2[common.Log](query) if err != nil { return logsRaw, err } @@ -276,19 +265,19 @@ func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint6 return logsRaw, nil } -func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) { +func getTracesFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 tracesRaw := make([][]common.Trace, length) query := fmt.Sprintf("SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number", strings.Join(defaultTraceFields, ", "), - config.Cfg.OldClickhouseDatabaseV1, + config.Cfg.CommitterClickhouseDatabase, chainId, startBlockNumber, endBlockNumber, ) - traces, err := execQueryV1[common.Trace](query) + traces, err := execQueryV2[common.Trace](query) if err != nil { return tracesRaw, err } @@ -305,9 +294,9 @@ func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin return tracesRaw, nil } -func execQueryV1[T any](query string) ([]T, error) { +func execQueryV2[T any](query string) ([]T, error) { var out []T - if err := ClickhouseConnV1.Select(context.Background(), &out, query); err != nil { + if err := ClickhouseConnV2.Select(context.Background(), &out, query); err != nil { return nil, err } return out, nil diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index 9db558f..a28df9c 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -95,22 +95,10 @@ func GetValidBlockDataForRange(startBlockNumber uint64, endBlockNumber uint64) [ length := endBlockNumber - startBlockNumber + 1 validBlockData := make([]*common.BlockData, length) - clickhouseBlockData := getValidBlockDataFromClickhouseV1(startBlockNumber, endBlockNumber) missingBlockNumbers := make([]uint64, 0) for i := range validBlockData { bn := startBlockNumber + uint64(i) - if clickhouseBlockData[i] == nil || bn != clickhouseBlockData[i].Block.Number.Uint64() { - missingBlockNumbers = append(missingBlockNumbers, bn) - log.Debug(). - Uint64("start_block", startBlockNumber). - Uint64("end_block", endBlockNumber). - Uint64("block_number", bn). - Msg("Missing block data from clickhouse") - continue - } - - validBlockData[i] = clickhouseBlockData[i] - clickhouseBlockData[i] = nil // clear out duplicate memory + missingBlockNumbers = append(missingBlockNumbers, bn) } if len(missingBlockNumbers) == 0 { @@ -150,21 +138,6 @@ func GetValidBlockDataForRange(startBlockNumber uint64, endBlockNumber uint64) [ return validBlockData } -func getValidBlockDataFromClickhouseV1(startBlockNumber uint64, endBlockNumber uint64) []*common.BlockData { - blockData, err := libs.GetBlockDataFromClickHouseV1(libs.ChainId.Uint64(), startBlockNumber, endBlockNumber) - - if err != nil { - log.Panic().Err(err).Msg("Failed to get block data from ClickHouseV1") - } - - for i, block := range blockData { - if isValid, _ := Validate(block); !isValid { - blockData[i] = nil - } - } - return blockData -} - func GetValidBlockDataFromRpc(blockNumbers []uint64) []*common.BlockData { rpcBatchSize := config.Cfg.RPCBatchSize totalBlocks := len(blockNumbers)