From 3da45bee595cdcd570ea58fd1c759dcbc140e0d6 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Wed, 16 Apr 2025 17:02:09 +0300 Subject: [PATCH] remove redis and memory storage connectors --- README.md | 3 - cmd/root.go | 10 - configs/config.example.yml | 6 +- configs/config.go | 13 - go.mod | 3 - go.sum | 14 - internal/storage/connector.go | 6 - internal/storage/memory.go | 475 ---------------------------------- internal/storage/redis.go | 137 ---------- 9 files changed, 4 insertions(+), 663 deletions(-) delete mode 100644 internal/storage/memory.go delete mode 100644 internal/storage/redis.go diff --git a/README.md b/README.md index cc25b4c..2779fcb 100644 --- a/README.md +++ b/README.md @@ -447,9 +447,6 @@ storage: clickhouse: port: 3000 database: "staging" - orchestrator: - memory: - maxItems: 10000 ``` With the corresponding `secrets.yml`: ```yaml diff --git a/cmd/root.go b/cmd/root.go index 61ea029..f47a2e9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -82,11 +82,6 @@ func init() { rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxOpenConns", 30, "Clickhouse max open connections for orchestrator storage") rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxIdleConns", 30, "Clickhouse max idle connections for orchestrator storage") rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-disableTLS", false, "Clickhouse disableTLS for orchestrator storage") - rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage") - rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage") - rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage") - rootCmd.PersistentFlags().String("storage-orchestrator-redis-password", "", "Redis password for orchestrator storage") - rootCmd.PersistentFlags().Int("storage-orchestrator-redis-db", 0, "Redis db for orchestrator storage") rootCmd.PersistentFlags().String("storage-staging-clickhouse-host", "", "Clickhouse host for staging storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage") @@ -183,11 +178,6 @@ func init() { viper.BindPFlag("storage.orchestrator.clickhouse.maxOpenConns", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxOpenConns")) viper.BindPFlag("storage.orchestrator.clickhouse.maxIdleConns", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxIdleConns")) viper.BindPFlag("storage.orchestrator.clickhouse.disableTLS", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-disableTLS")) - viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems")) - viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize")) - viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr")) - viper.BindPFlag("storage.orchestrator.redis.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-password")) - viper.BindPFlag("storage.orchestrator.redis.db", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-db")) viper.BindPFlag("api.host", rootCmd.PersistentFlags().Lookup("api-host")) viper.BindPFlag("api.basicAuth.username", rootCmd.PersistentFlags().Lookup("api-basicAuth-username")) viper.BindPFlag("api.basicAuth.password", rootCmd.PersistentFlags().Lookup("api-basicAuth-password")) diff --git a/configs/config.example.yml b/configs/config.example.yml index 0c3eb9b..5fd5902 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -50,5 +50,7 @@ storage: database: "default" disableTLS: true orchestrator: - memory: - maxItems: 10000 \ No newline at end of file + clickhouse: + port: 9440 + database: "default" + disableTLS: true \ No newline at end of file diff --git a/configs/config.go b/configs/config.go index 1ee1f2d..efe1955 100644 --- a/configs/config.go +++ b/configs/config.go @@ -61,8 +61,6 @@ const ( type StorageConnectionConfig struct { Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"` - Memory *MemoryConfig `mapstructure:"memory"` - Redis *RedisConfig `mapstructure:"redis"` } type TableConfig struct { @@ -86,17 +84,6 @@ type ClickhouseConfig struct { ChainBasedConfig map[string]TableOverrideConfig `mapstructure:"chainBasedConfig"` } -type MemoryConfig struct { - MaxItems int `mapstructure:"maxItems"` -} - -type RedisConfig struct { - PoolSize int `mapstructure:"poolSize"` - Addr string `mapstructure:"addr"` - Password string `mapstructure:"password"` - DB int `mapstructure:"db"` -} - type RPCBatchRequestConfig struct { BlocksPerRequest int `mapstructure:"blocksPerRequest"` BatchDelay int `mapstructure:"batchDelay"` diff --git a/go.mod b/go.mod index b3a487d..ac1ff73 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,7 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.30.1 github.com/ethereum/go-ethereum v1.14.8 github.com/gin-gonic/gin v1.10.0 - github.com/go-redis/redis/v8 v8.11.5 github.com/gorilla/schema v1.4.1 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/prometheus/client_golang v1.20.4 github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 @@ -40,7 +38,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.7 // indirect diff --git a/go.sum b/go.sum index c8f89a0..18c8cf2 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,6 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA= github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.14.8 h1:NgOWvXS+lauK+zFukEvi85UmmsS/OkV0N23UZ1VTIig= @@ -112,8 +110,6 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o= github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= -github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= -github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -143,8 +139,6 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= @@ -212,14 +206,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= -github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= @@ -423,8 +411,6 @@ gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 312429d..863727a 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -131,15 +131,9 @@ func NewConnector[T any](cfg *config.StorageConnectionConfig) (T, error) { var err error if cfg.Clickhouse != nil { conn, err = NewClickHouseConnector(cfg.Clickhouse) - } else if cfg.Memory != nil { - conn, err = NewMemoryConnector(cfg.Memory) } else { return *new(T), fmt.Errorf("no storage driver configured") } - /* - else if cfg.Redis != nil { - conn, err = NewRedisConnector(cfg.Redis) - } */ if err != nil { return *new(T), err diff --git a/internal/storage/memory.go b/internal/storage/memory.go deleted file mode 100644 index f8bb92a..0000000 --- a/internal/storage/memory.go +++ /dev/null @@ -1,475 +0,0 @@ -package storage - -import ( - "encoding/json" - "fmt" - "math" - "math/big" - "strings" - - lru "github.com/hashicorp/golang-lru/v2" - config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" -) - -type MemoryConnector struct { - cache *lru.Cache[string, string] -} - -func NewMemoryConnector(cfg *config.MemoryConfig) (*MemoryConnector, error) { - maxItems := 1000 - if cfg.MaxItems > 0 { - maxItems = cfg.MaxItems - } - - cache, err := lru.New[string, string](maxItems) - if err != nil { - return nil, fmt.Errorf("failed to create LRU cache: %w", err) - } - - return &MemoryConnector{ - cache: cache, - }, nil -} - -func (m *MemoryConnector) StoreBlockFailures(failures []common.BlockFailure) error { - for _, failure := range failures { - failureJson, err := json.Marshal(failure) - if err != nil { - return err - } - m.cache.Add(fmt.Sprintf("block_failure:%s:%s", failure.ChainId.String(), failure.BlockNumber.String()), string(failureJson)) - } - return nil -} - -func (m *MemoryConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) { - blockFailures := []common.BlockFailure{} - limit := getLimit(qf) - for _, key := range m.cache.Keys() { - if len(blockFailures) >= limit { - break - } - if strings.HasPrefix(key, fmt.Sprintf("block_failure:%s:", qf.ChainId.String())) { - value, ok := m.cache.Get(key) - if ok { - blockFailure := common.BlockFailure{} - err := json.Unmarshal([]byte(value), &blockFailure) - if err != nil { - return nil, err - } - blockFailures = append(blockFailures, blockFailure) - } - } - } - return blockFailures, nil -} - -func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) error { - for _, failure := range failures { - key := fmt.Sprintf("block_failure:%s:%s", failure.ChainId.String(), failure.BlockNumber.String()) - m.cache.Remove(key) - } - return nil -} - -func (m *MemoryConnector) insertBlocks(blocks []common.Block) error { - for _, block := range blocks { - blockJson, err := json.Marshal(block) - if err != nil { - return err - } - key := fmt.Sprintf("block:%s:%s", block.ChainId.String(), block.Number.String()) - m.cache.Add(key, string(blockJson)) - } - return nil -} - -func (m *MemoryConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error) { - blocks := []common.Block{} - limit := getLimit(qf) - blockNumbersToCheck := getBlockNumbersToCheck(qf) - fieldsSet := createFieldsSet(fields) - for _, key := range m.cache.Keys() { - if len(blocks) >= int(limit) { - break - } - if isKeyForBlock(key, fmt.Sprintf("block:%s:", qf.ChainId.String()), blockNumbersToCheck) { - value, ok := m.cache.Get(key) - if ok { - block, err := extractFields[common.Block](fieldsSet, value) - if err != nil { - return QueryResult[common.Block]{}, err - } - blocks = append(blocks, *block) - } - } - } - return QueryResult[common.Block]{Data: blocks}, nil -} - -func (m *MemoryConnector) insertTransactions(txs []common.Transaction) error { - for _, tx := range txs { - txJson, err := json.Marshal(tx) - if err != nil { - return err - } - m.cache.Add(fmt.Sprintf("transaction:%s:%s:%s", tx.ChainId.String(), tx.BlockNumber.String(), tx.Hash), string(txJson)) - } - return nil -} - -func (m *MemoryConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error) { - txs := []common.Transaction{} - limit := getLimit(qf) - blockNumbersToCheck := getBlockNumbersToCheck(qf) - fieldsSet := createFieldsSet(fields) - for _, key := range m.cache.Keys() { - if len(txs) >= limit { - break - } - if isKeyForBlock(key, fmt.Sprintf("transaction:%s:", qf.ChainId.String()), blockNumbersToCheck) { - value, ok := m.cache.Get(key) - if ok { - tx, err := extractFields[common.Transaction](fieldsSet, value) - if err != nil { - return QueryResult[common.Transaction]{}, err - } - txs = append(txs, *tx) - } - } - } - return QueryResult[common.Transaction]{Data: txs}, nil -} - -func (m *MemoryConnector) insertLogs(logs []common.Log) error { - for _, log := range logs { - logJson, err := json.Marshal(log) - if err != nil { - return err - } - m.cache.Add(fmt.Sprintf("log:%s:%s:%s-%d", log.ChainId.String(), log.BlockNumber.String(), log.TransactionHash, log.LogIndex), string(logJson)) - } - return nil -} - -func (m *MemoryConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error) { - logs := []common.Log{} - limit := getLimit(qf) - blockNumbersToCheck := getBlockNumbersToCheck(qf) - fieldsSet := createFieldsSet(fields) - for _, key := range m.cache.Keys() { - if len(logs) >= limit { - break - } - if isKeyForBlock(key, fmt.Sprintf("log:%s:", qf.ChainId.String()), blockNumbersToCheck) { - value, ok := m.cache.Get(key) - if ok { - log, err := extractFields[common.Log](fieldsSet, value) - if err != nil { - return QueryResult[common.Log]{}, err - } - logs = append(logs, *log) - } - } - } - return QueryResult[common.Log]{Data: logs}, nil -} - -func (m *MemoryConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error) { - maxBlockNumber := new(big.Int) - for _, key := range m.cache.Keys() { - if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) { - blockNumberStr := strings.Split(key, ":")[2] - blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) - } - if blockNumber.Cmp(maxBlockNumber) > 0 { - maxBlockNumber = blockNumber - } - } - } - return maxBlockNumber, nil -} - -func IsInRange(num *big.Int, rangeStart *big.Int, rangeEnd *big.Int) bool { - if rangeEnd.Sign() == 0 { - return true - } - return num.Cmp(rangeStart) >= 0 && num.Cmp(rangeEnd) <= 0 -} - -func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { - maxBlockNumber := new(big.Int) - for _, key := range m.cache.Keys() { - if strings.HasPrefix(key, fmt.Sprintf("blockData:%s:", chainId.String())) { - blockNumberStr := strings.Split(key, ":")[2] - blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) - } - if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeStart, rangeEnd) { - maxBlockNumber = blockNumber - } - } - } - return maxBlockNumber, nil -} - -func isKeyForSomeBlock(key string, prefixes []string, blocksFilter map[string]uint8) bool { - for _, prefix := range prefixes { - if isKeyForBlock(key, prefix, blocksFilter) { - return true - } - } - return false -} - -func isKeyForBlock(key string, prefix string, blocksFilter map[string]uint8) bool { - if !strings.HasPrefix(key, prefix) { - return false - } - parts := strings.Split(key, ":") - if len(parts) < 2 { - return false - } - blockNumber := parts[2] - if len(blocksFilter) == 0 { - return true - } - _, ok := blocksFilter[blockNumber] - return ok -} - -func getLimit(qf QueryFilter) int { - limit := qf.Limit - if limit == 0 { - limit = math.MaxUint16 - } - return int(limit) -} - -func getBlockNumbersToCheck(qf QueryFilter) map[string]uint8 { - blockNumbersToCheck := make(map[string]uint8, len(qf.BlockNumbers)) - for _, num := range qf.BlockNumbers { - key := fmt.Sprintf("%d", num) - blockNumbersToCheck[key] = 1 - } - return blockNumbersToCheck -} - -func (m *MemoryConnector) InsertStagingData(data []common.BlockData) error { - for _, blockData := range data { - dataJson, err := json.Marshal(blockData) - if err != nil { - return err - } - m.cache.Add(fmt.Sprintf("blockData:%s:%s", blockData.Block.ChainId.String(), blockData.Block.Number.String()), string(dataJson)) - } - return nil -} - -func (m *MemoryConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { - blockData := []common.BlockData{} - limit := getLimit(qf) - blockNumbersToCheck := getBlockNumbersToCheck(qf) - - for _, key := range m.cache.Keys() { - if len(blockData) >= int(limit) { - break - } - if isKeyForBlock(key, fmt.Sprintf("blockData:%s:", qf.ChainId.String()), blockNumbersToCheck) { - value, ok := m.cache.Get(key) - if ok { - bd := common.BlockData{} - err := json.Unmarshal([]byte(value), &bd) - if err != nil { - return nil, err - } - blockData = append(blockData, bd) - } - } - } - return blockData, nil -} - -func (m *MemoryConnector) DeleteStagingData(data []common.BlockData) error { - for _, blockData := range data { - key := fmt.Sprintf("blockData:%s:%s", blockData.Block.ChainId.String(), blockData.Block.Number.String()) - m.cache.Remove(key) - } - return nil -} - -func (m *MemoryConnector) insertTraces(traces []common.Trace) error { - for _, trace := range traces { - traceJson, err := json.Marshal(trace) - if err != nil { - return err - } - m.cache.Add(fmt.Sprintf("trace:%s:%s:%s:%s", trace.ChainID.String(), trace.BlockNumber.String(), trace.TransactionHash, traceAddressToString(trace.TraceAddress)), string(traceJson)) - } - return nil -} - -func (m *MemoryConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error) { - traces := []common.Trace{} - limit := getLimit(qf) - blockNumbersToCheck := getBlockNumbersToCheck(qf) - fieldsSet := createFieldsSet(fields) - for _, key := range m.cache.Keys() { - if len(traces) >= limit { - break - } - if isKeyForBlock(key, fmt.Sprintf("trace:%s:", qf.ChainId.String()), blockNumbersToCheck) { - value, ok := m.cache.Get(key) - if ok { - trace, err := extractFields[common.Trace](fieldsSet, value) - if err != nil { - return QueryResult[common.Trace]{}, err - } - traces = append(traces, *trace) - } - } - } - return QueryResult[common.Trace]{Data: traces}, nil -} - -func createFieldsSet(fields []string) map[string]bool { - fieldsSet := make(map[string]bool) - if len(fields) == 0 { - fieldsSet["*"] = true - } else { - for _, field := range fields { - fieldsSet[field] = true - } - } - return fieldsSet -} - -func extractFields[T common.Block | common.Transaction | common.Log | common.Trace](fields map[string]bool, data string) (*T, error) { - if fields["*"] { - result := new(T) - err := json.Unmarshal([]byte(data), result) - if err != nil { - return nil, err - } - return result, nil - } - - // For specific fields, first unmarshal into a map - var resultMap map[string]interface{} - if err := json.Unmarshal([]byte(data), &resultMap); err != nil { - return nil, err - } - - // Then create a new map with only requested fields - filteredMap := make(map[string]interface{}) - for field := range fields { - if val, exists := resultMap[field]; exists { - filteredMap[field] = val - } - } - - // Marshal and unmarshal to convert to type T - jsonBytes, err := json.Marshal(filteredMap) - if err != nil { - return nil, err - } - - result := new(T) - if err := json.Unmarshal(jsonBytes, result); err != nil { - return nil, err - } - return result, nil -} - -func traceAddressToString(traceAddress []uint64) string { - return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]") -} - -func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { - key := fmt.Sprintf("reorg_check:%s", chainId.String()) - value, ok := m.cache.Get(key) - if !ok { - return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String()) - } - blockNumber, ok := new(big.Int).SetString(value, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", value) - } - return blockNumber, nil -} - -func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { - m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String()) - return nil -} - -func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error { - blocks := make([]common.Block, 0, len(data)) - logs := make([]common.Log, 0) - transactions := make([]common.Transaction, 0) - traces := make([]common.Trace, 0) - - for _, blockData := range data { - blocks = append(blocks, blockData.Block) - logs = append(logs, blockData.Logs...) - transactions = append(transactions, blockData.Transactions...) - traces = append(traces, blockData.Traces...) - } - - if err := m.insertBlocks(blocks); err != nil { - return err - } - if err := m.insertLogs(logs); err != nil { - return err - } - if err := m.insertTransactions(transactions); err != nil { - return err - } - if err := m.insertTraces(traces); err != nil { - return err - } - return nil -} - -func (m *MemoryConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) { - blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers}) - for _, key := range m.cache.Keys() { - prefixes := []string{fmt.Sprintf("block:%s:", chainId.String()), fmt.Sprintf("log:%s:", chainId.String()), fmt.Sprintf("transaction:%s:", chainId.String()), fmt.Sprintf("trace:%s:", chainId.String())} - shouldDelete := isKeyForSomeBlock(key, prefixes, blockNumbersToCheck) - if shouldDelete { - m.cache.Remove(key) - } - } - return []common.BlockData{}, nil // TODO implement -} - -func (m *MemoryConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error) { - blockHeaders := []common.BlockHeader{} - for _, key := range m.cache.Keys() { - if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) { - blockNumberStr := strings.Split(key, ":")[2] - blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) - } - if blockNumber.Cmp(from) >= 0 && blockNumber.Cmp(to) <= 0 { - value, _ := m.cache.Get(key) - block := common.Block{} - err := json.Unmarshal([]byte(value), &block) - if err != nil { - return nil, err - } - blockHeaders = append(blockHeaders, common.BlockHeader{ - Number: blockNumber, - Hash: block.Hash, - ParentHash: block.ParentHash, - }) - } - } - } - return blockHeaders, nil -} diff --git a/internal/storage/redis.go b/internal/storage/redis.go deleted file mode 100644 index 3e11236..0000000 --- a/internal/storage/redis.go +++ /dev/null @@ -1,137 +0,0 @@ -package storage - -import ( - "context" - "encoding/json" - "fmt" - "math/big" - - "github.com/go-redis/redis/v8" - "github.com/rs/zerolog/log" - config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" -) - -type RedisConnector struct { - client *redis.Client - cfg *config.RedisConfig -} - -var DEFAULT_REDIS_POOL_SIZE = 20 - -func NewRedisConnector(cfg *config.RedisConfig) (*RedisConnector, error) { - poolSize := cfg.PoolSize - if poolSize <= 0 { - poolSize = DEFAULT_REDIS_POOL_SIZE - } - - options := &redis.Options{ - Addr: cfg.Addr, - Password: cfg.Password, - DB: cfg.DB, - PoolSize: poolSize, - } - - client := redis.NewClient(options) - - ctx := context.Background() - _, err := client.Ping(ctx).Result() - if err != nil { - return nil, fmt.Errorf("failed to connect to Redis: %w", err) - } - - log.Warn().Msgf("Connected to Redis") - return &RedisConnector{ - client: client, - cfg: cfg, - }, nil -} - -func (r *RedisConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) { - ctx := context.Background() - var blockFailures []common.BlockFailure - var cursor uint64 - var keys []string - var err error - - keyPrefix := "block_failure:*:*" - if qf.ChainId.Sign() != 0 { - keyPrefix = fmt.Sprintf("block_failure:%s:*", qf.ChainId.String()) - } - - limit := qf.Limit - if limit == 0 { - limit = 100 - } - - for { - keys, cursor, err = r.client.Scan(ctx, cursor, keyPrefix, int64(limit-len(blockFailures))).Result() - if err != nil { - return nil, fmt.Errorf("failed to scan block failures: %w", err) - } - - for _, key := range keys { - value, err := r.client.Get(ctx, key).Result() - if err != nil { - return nil, fmt.Errorf("failed to get block failure: %w", err) - } - - var failure common.BlockFailure - err = json.Unmarshal([]byte(value), &failure) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal block failure: %w", err) - } - - blockFailures = append(blockFailures, failure) - - if len(blockFailures) >= limit { - return blockFailures, nil - } - } - - if cursor == 0 { - break - } - } - - return blockFailures, nil -} - -func (r *RedisConnector) StoreBlockFailures(failures []common.BlockFailure) error { - ctx := context.Background() - for _, failure := range failures { - failureJson, err := json.Marshal(failure) - if err != nil { - return err - } - r.client.Set(ctx, fmt.Sprintf("block_failure:%s:%s", failure.ChainId.String(), failure.BlockNumber.String()), string(failureJson), 0) - } - return nil -} - -func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) error { - ctx := context.Background() - for _, failure := range failures { - r.client.Del(ctx, fmt.Sprintf("block_failure:%s:%s", failure.ChainId.String(), failure.BlockNumber.String())) - } - return nil -} - -func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { - ctx := context.Background() - blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result() - if err != nil { - return nil, err - } - blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) - if !ok { - return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) - } - return blockNumber, nil -} - -func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { - ctx := context.Background() - r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0) - return nil -}