From fb754f2e52f2782968979957efbd398eefcb57d7 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Thu, 20 Feb 2025 16:06:57 +0200 Subject: [PATCH] improve token balances endpoint performance --- internal/handlers/token_handlers.go | 40 +++++++++++++++++++++++---- internal/storage/clickhouse.go | 30 +++++++++++++++++--- internal/storage/connector.go | 3 +- test/mocks/MockIMainStorage.go | 43 +++++++++++++++++++---------- 4 files changed, 91 insertions(+), 25 deletions(-) diff --git a/internal/handlers/token_handlers.go b/internal/handlers/token_handlers.go index b771169..3ebe86f 100644 --- a/internal/handlers/token_handlers.go +++ b/internal/handlers/token_handlers.go @@ -8,15 +8,13 @@ import ( "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" "github.com/thirdweb-dev/indexer/api" + "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/storage" ) // BalanceModel return type for Swagger documentation type BalanceModel struct { - ChainId string `json:"chain_id" ch:"chain_id"` - TokenType string `json:"token_type" ch:"token_type"` TokenAddress string `json:"token_address" ch:"address"` - Owner string `json:"owner" ch:"owner"` TokenId string `json:"token_id" ch:"token_id"` Balance *big.Int `json:"balance" ch:"balance"` } @@ -37,7 +35,7 @@ type BalanceModel struct { // @Failure 400 {object} api.Error // @Failure 401 {object} api.Error // @Failure 500 {object} api.Error -// @Router /{chainId}/events [get] +// @Router /{chainId}/balances/{owner}/{type} [get] func GetTokenBalancesByType(c *gin.Context) { chainId, err := api.GetChainId(c) if err != nil { @@ -60,12 +58,21 @@ func GetTokenBalancesByType(c *gin.Context) { return } hideZeroBalances := c.Query("hide_zero_balances") != "false" + + columns := []string{"address", "sum(balance) as balance"} + groupBy := []string{"address"} + if tokenType != "erc20" { + columns = []string{"address", "token_id", "sum(balance) as balance"} + groupBy = []string{"address", "token_id"} + } + qf := storage.BalancesQueryFilter{ ChainId: chainId, Owner: owner, TokenType: tokenType, TokenAddress: tokenAddress, ZeroBalance: hideZeroBalances, + GroupBy: groupBy, SortBy: c.Query("sort_by"), SortOrder: c.Query("sort_order"), Page: api.ParseIntQueryParam(c.Query("page"), 0), @@ -87,13 +94,34 @@ func GetTokenBalancesByType(c *gin.Context) { return } - balancesResult, err := mainStorage.GetTokenBalances(qf) + balancesResult, err := mainStorage.GetTokenBalances(qf, columns...) if err != nil { log.Error().Err(err).Msg("Error querying balances") // TODO: might want to choose BadRequestError if it's due to not-allowed functions api.InternalErrorHandler(c) return } - queryResult.Data = balancesResult.Data + queryResult.Data = serializeBalances(balancesResult.Data) sendJSONResponse(c, queryResult) } + +func serializeBalances(balances []common.TokenBalance) []BalanceModel { + balanceModels := make([]BalanceModel, len(balances)) + for i, balance := range balances { + balanceModels[i] = serializeBalance(balance) + } + return balanceModels +} + +func serializeBalance(balance common.TokenBalance) BalanceModel { + return BalanceModel{ + TokenAddress: balance.TokenAddress, + Balance: balance.Balance, + TokenId: func() string { + if balance.TokenId != nil { + return balance.TokenId.String() + } + return "" + }(), + } +} diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index c672a91..647891b 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1373,16 +1373,38 @@ func (c *ClickHouseConnector) getTableName(chainId *big.Int, defaultTable string return defaultTable } -func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter) (QueryResult[common.TokenBalance], error) { +func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) { columns := "chain_id, token_type, address, owner, token_id, balance" + if len(fields) > 0 { + columns = strings.Join(fields, ", ") + } + query := fmt.Sprintf("SELECT %s FROM %s.token_balances WHERE chain_id = ? AND token_type = ? AND owner = ?", columns, c.cfg.Database) + + if qf.TokenAddress != "" { + query += fmt.Sprintf(" AND address = '%s'", qf.TokenAddress) + } + + isBalanceAggregated := false + for _, field := range fields { + if strings.Contains(field, "balance") && strings.TrimSpace(field) != "balance" { + isBalanceAggregated = true + break + } + } balanceCondition := ">=" if qf.ZeroBalance { balanceCondition = ">" } - query := fmt.Sprintf("SELECT %s FROM %s.token_balances FINAL WHERE chain_id = ? AND token_type = ? AND owner = ? AND balance %s 0", columns, c.cfg.Database, balanceCondition) + if !isBalanceAggregated { + query += fmt.Sprintf(" AND balance %s 0", balanceCondition) + } - if qf.TokenAddress != "" { - query += fmt.Sprintf(" AND address = '%s'", qf.TokenAddress) + if len(qf.GroupBy) > 0 { + query += fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", ")) + + if isBalanceAggregated { + query += fmt.Sprintf(" HAVING balance %s 0", balanceCondition) + } } // Add ORDER BY clause diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 23b7fd2..dae4330 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -31,6 +31,7 @@ type BalancesQueryFilter struct { TokenAddress string Owner string ZeroBalance bool + GroupBy []string SortBy string SortOrder string Page int @@ -80,7 +81,7 @@ type IMainStorage interface { GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error - GetTokenBalances(qf BalancesQueryFilter) (QueryResult[common.TokenBalance], error) + GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) } func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index efe4ead..73ff691 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -390,9 +390,16 @@ func (_c *MockIMainStorage_GetMaxBlockNumber_Call) RunAndReturn(run func(*big.In return _c } -// GetTokenBalances provides a mock function with given fields: qf -func (_m *MockIMainStorage) GetTokenBalances(qf storage.BalancesQueryFilter) (storage.QueryResult[common.TokenBalance], error) { - ret := _m.Called(qf) +// GetTokenBalances provides a mock function with given fields: qf, fields +func (_m *MockIMainStorage) GetTokenBalances(qf storage.BalancesQueryFilter, fields ...string) (storage.QueryResult[common.TokenBalance], error) { + _va := make([]interface{}, len(fields)) + for _i := range fields { + _va[_i] = fields[_i] + } + var _ca []interface{} + _ca = append(_ca, qf) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) if len(ret) == 0 { panic("no return value specified for GetTokenBalances") @@ -400,17 +407,17 @@ func (_m *MockIMainStorage) GetTokenBalances(qf storage.BalancesQueryFilter) (st var r0 storage.QueryResult[common.TokenBalance] var r1 error - if rf, ok := ret.Get(0).(func(storage.BalancesQueryFilter) (storage.QueryResult[common.TokenBalance], error)); ok { - return rf(qf) + if rf, ok := ret.Get(0).(func(storage.BalancesQueryFilter, ...string) (storage.QueryResult[common.TokenBalance], error)); ok { + return rf(qf, fields...) } - if rf, ok := ret.Get(0).(func(storage.BalancesQueryFilter) storage.QueryResult[common.TokenBalance]); ok { - r0 = rf(qf) + if rf, ok := ret.Get(0).(func(storage.BalancesQueryFilter, ...string) storage.QueryResult[common.TokenBalance]); ok { + r0 = rf(qf, fields...) } else { r0 = ret.Get(0).(storage.QueryResult[common.TokenBalance]) } - if rf, ok := ret.Get(1).(func(storage.BalancesQueryFilter) error); ok { - r1 = rf(qf) + if rf, ok := ret.Get(1).(func(storage.BalancesQueryFilter, ...string) error); ok { + r1 = rf(qf, fields...) } else { r1 = ret.Error(1) } @@ -425,13 +432,21 @@ type MockIMainStorage_GetTokenBalances_Call struct { // GetTokenBalances is a helper method to define mock.On call // - qf storage.BalancesQueryFilter -func (_e *MockIMainStorage_Expecter) GetTokenBalances(qf interface{}) *MockIMainStorage_GetTokenBalances_Call { - return &MockIMainStorage_GetTokenBalances_Call{Call: _e.mock.On("GetTokenBalances", qf)} +// - fields ...string +func (_e *MockIMainStorage_Expecter) GetTokenBalances(qf interface{}, fields ...interface{}) *MockIMainStorage_GetTokenBalances_Call { + return &MockIMainStorage_GetTokenBalances_Call{Call: _e.mock.On("GetTokenBalances", + append([]interface{}{qf}, fields...)...)} } -func (_c *MockIMainStorage_GetTokenBalances_Call) Run(run func(qf storage.BalancesQueryFilter)) *MockIMainStorage_GetTokenBalances_Call { +func (_c *MockIMainStorage_GetTokenBalances_Call) Run(run func(qf storage.BalancesQueryFilter, fields ...string)) *MockIMainStorage_GetTokenBalances_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(storage.BalancesQueryFilter)) + variadicArgs := make([]string, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(string) + } + } + run(args[0].(storage.BalancesQueryFilter), variadicArgs...) }) return _c } @@ -441,7 +456,7 @@ func (_c *MockIMainStorage_GetTokenBalances_Call) Return(_a0 storage.QueryResult return _c } -func (_c *MockIMainStorage_GetTokenBalances_Call) RunAndReturn(run func(storage.BalancesQueryFilter) (storage.QueryResult[common.TokenBalance], error)) *MockIMainStorage_GetTokenBalances_Call { +func (_c *MockIMainStorage_GetTokenBalances_Call) RunAndReturn(run func(storage.BalancesQueryFilter, ...string) (storage.QueryResult[common.TokenBalance], error)) *MockIMainStorage_GetTokenBalances_Call { _c.Call.Return(run) return _c }