diff --git a/cmd/api.go b/cmd/api.go index c557190..ea2f2c3 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -92,6 +92,8 @@ func RunApi(cmd *cobra.Command, args []string) { // token holder queries root.GET("/holders/:address", handlers.GetTokenHoldersByType) + // token transfers queries + root.GET("/transfers", handlers.GetTokenTransfers) // token ID queries root.GET("/tokens/:address", handlers.GetTokenIdsByType) diff --git a/internal/common/transfers.go b/internal/common/transfers.go new file mode 100644 index 0000000..931ee1c --- /dev/null +++ b/internal/common/transfers.go @@ -0,0 +1,22 @@ +package common + +import ( + "math/big" + "time" +) + +type TokenTransfer struct { + TokenType string `json:"token_type" ch:"token_type"` + ChainID *big.Int `json:"chain_id" ch:"chain_id"` + TokenAddress string `json:"token_address" ch:"token_address"` + FromAddress string `json:"from_address" ch:"from_address"` + ToAddress string `json:"to_address" ch:"to_address"` + BlockNumber *big.Int `json:"block_number" ch:"block_number"` + BlockTimestamp time.Time `json:"block_timestamp" ch:"block_timestamp"` + TransactionHash string `json:"transaction_hash" ch:"transaction_hash"` + TokenID *big.Int `json:"token_id" ch:"token_id"` + Amount *big.Int `json:"amount" ch:"amount"` + LogIndex uint64 `json:"log_index" ch:"log_index"` + Sign int8 `json:"sign" ch:"sign"` + InsertTimestamp time.Time `json:"insert_timestamp" ch:"insert_timestamp"` +} diff --git a/internal/handlers/transfer_handlers.go b/internal/handlers/transfer_handlers.go new file mode 100644 index 0000000..24cd07b --- /dev/null +++ b/internal/handlers/transfer_handlers.go @@ -0,0 +1,189 @@ +package handlers + +import ( + "fmt" + "math/big" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + "github.com/thirdweb-dev/indexer/api" + "github.com/thirdweb-dev/indexer/internal/common" + "github.com/thirdweb-dev/indexer/internal/storage" +) + +// TransferModel return type for Swagger documentation +type TransferModel struct { + TokenType string `json:"token_type" ch:"token_type"` + TokenAddress string `json:"token_address" ch:"token_address"` + FromAddress string `json:"from_address" ch:"from_address"` + ToAddress string `json:"to_address" ch:"to_address"` + TokenId string `json:"token_id" ch:"token_id"` + Amount string `json:"amount" ch:"amount"` + BlockNumber string `json:"block_number" ch:"block_number"` + BlockTimestamp string `json:"block_timestamp" ch:"block_timestamp"` + TransactionHash string `json:"transaction_hash" ch:"transaction_hash"` + LogIndex uint64 `json:"log_index" ch:"log_index"` +} + +// @Summary Get token transfers +// @Description Retrieve token transfers by various filters +// @Tags transfers +// @Accept json +// @Produce json +// @Security BasicAuth +// @Param chainId path string true "Chain ID" +// @Param token_type query []string false "Token types (erc721, erc1155, erc20)" +// @Param token_address query string false "Token contract address" +// @Param wallet query string false "Wallet address" +// @Param start_block query string false "Start block number" +// @Param end_block query string false "End block number" +// @Param start_timestamp query string false "Start timestamp (RFC3339 format)" +// @Param end_timestamp query string false "End timestamp (RFC3339 format)" +// @Param token_id query []string false "Token IDs" +// @Param transaction_hash query string false "Transaction hash" +// @Param page query int false "Page number for pagination" +// @Param limit query int false "Number of items per page" default(20) +// @Success 200 {object} api.QueryResponse{data=[]TransferModel} +// @Failure 400 {object} api.Error +// @Failure 401 {object} api.Error +// @Failure 500 {object} api.Error +// @Router /{chainId}/transfers [get] +func GetTokenTransfers(c *gin.Context) { + chainId, err := api.GetChainId(c) + if err != nil { + api.BadRequestErrorHandler(c, err) + return + } + + tokenTypes, err := getTokenTypesFromReq(c) + if err != nil { + api.BadRequestErrorHandler(c, err) + return + } + + walletAddress := strings.ToLower(c.Query("wallet_address")) + if walletAddress != "" && !strings.HasPrefix(walletAddress, "0x") { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid wallet_address '%s'", walletAddress)) + return + } + + tokenAddress := strings.ToLower(c.Query("token_address")) + if tokenAddress != "" && !strings.HasPrefix(tokenAddress, "0x") { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_address '%s'", tokenAddress)) + return + } + + transactionHash := strings.ToLower(c.Query("transaction_hash")) + if transactionHash != "" && !strings.HasPrefix(transactionHash, "0x") { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid transaction_hash '%s'", transactionHash)) + return + } + + tokenIds, err := getTokenIdsFromReq(c) + if err != nil { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_id: %s", err)) + return + } + + // Parse block number parameters + var startBlockNumber, endBlockNumber *big.Int + startBlockStr := c.Query("start_block") + if startBlockStr != "" { + startBlockNumber = new(big.Int) + _, ok := startBlockNumber.SetString(startBlockStr, 10) + if !ok { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid start_block '%s'", startBlockStr)) + return + } + } + + endBlockStr := c.Query("end_block") + if endBlockStr != "" { + endBlockNumber = new(big.Int) + _, ok := endBlockNumber.SetString(endBlockStr, 10) + if !ok { + api.BadRequestErrorHandler(c, fmt.Errorf("invalid end_block '%s'", endBlockStr)) + return + } + } + + // Define query filter + qf := storage.TransfersQueryFilter{ + ChainId: chainId, + TokenTypes: tokenTypes, + WalletAddress: walletAddress, + TokenAddress: tokenAddress, + TokenIds: tokenIds, + TransactionHash: transactionHash, + StartBlockNumber: startBlockNumber, + EndBlockNumber: endBlockNumber, + Page: api.ParseIntQueryParam(c.Query("page"), 0), + Limit: api.ParseIntQueryParam(c.Query("limit"), 20), + SortBy: c.Query("sort_by"), + SortOrder: c.Query("sort_order"), + } + + // Define columns for query + columns := []string{ + "token_type", + "token_address", + "from_address", + "to_address", + "token_id", + "amount", + "block_number", + "block_timestamp", + "transaction_hash", + "log_index", + } + + queryResult := api.QueryResponse{ + Meta: api.Meta{ + ChainId: chainId.Uint64(), + Page: qf.Page, + Limit: qf.Limit, + }, + } + + mainStorage, err = getMainStorage() + if err != nil { + log.Error().Err(err).Msg("Error getting main storage") + api.InternalErrorHandler(c) + return + } + + transfersResult, err := mainStorage.GetTokenTransfers(qf, columns...) + if err != nil { + log.Error().Err(err).Msg("Error querying token transfers") + api.InternalErrorHandler(c) + return + } + + queryResult.Data = serializeTransfers(transfersResult.Data) + sendJSONResponse(c, queryResult) +} + +func serializeTransfers(transfers []common.TokenTransfer) []TransferModel { + transferModels := make([]TransferModel, len(transfers)) + for i, transfer := range transfers { + transferModels[i] = serializeTransfer(transfer) + } + return transferModels +} + +func serializeTransfer(transfer common.TokenTransfer) TransferModel { + return TransferModel{ + TokenType: transfer.TokenType, + TokenAddress: transfer.TokenAddress, + FromAddress: transfer.FromAddress, + ToAddress: transfer.ToAddress, + TokenId: transfer.TokenID.String(), + Amount: transfer.Amount.String(), + BlockNumber: transfer.BlockNumber.String(), + BlockTimestamp: transfer.BlockTimestamp.Format(time.RFC3339), + TransactionHash: transfer.TransactionHash, + LogIndex: transfer.LogIndex, + } +} diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 54ab76c..0bab40d 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1396,6 +1396,88 @@ func (c *ClickHouseConnector) getTableName(chainId *big.Int, defaultTable string return defaultTable } +func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) { + columns := "token_type, chain_id, token_address, from_address, to_address, block_number, block_timestamp, transaction_hash, token_id, amount, log_index, sign, insert_timestamp" + if len(fields) > 0 { + columns = strings.Join(fields, ", ") + } + query := fmt.Sprintf("SELECT %s FROM %s.token_transfers WHERE chain_id = ?", columns, c.cfg.Database) + + if len(qf.TokenTypes) > 0 { + tokenTypesStr := "" + tokenTypesLen := len(qf.TokenTypes) + for i := 0; i < tokenTypesLen-1; i++ { + tokenTypesStr += fmt.Sprintf("'%s',", qf.TokenTypes[i]) + } + tokenTypesStr += fmt.Sprintf("'%s'", qf.TokenTypes[tokenTypesLen-1]) + query += fmt.Sprintf(" AND token_type in (%s)", tokenTypesStr) + } + + if qf.WalletAddress != "" { + query += fmt.Sprintf(" AND (from_address = '%s' OR to_address = '%s')", qf.WalletAddress, qf.WalletAddress) + } + if qf.TokenAddress != "" { + query += fmt.Sprintf(" AND token_address = '%s'", qf.TokenAddress) + } + if qf.TransactionHash != "" { + query += fmt.Sprintf(" AND transaction_hash = '%s'", qf.TransactionHash) + } + + if len(qf.TokenIds) > 0 { + tokenIdsStr := "" + tokenIdsLen := len(qf.TokenIds) + for i := 0; i < tokenIdsLen-1; i++ { + tokenIdsStr += fmt.Sprintf("%s,", qf.TokenIds[i].String()) + } + tokenIdsStr += qf.TokenIds[tokenIdsLen-1].String() + query += fmt.Sprintf(" AND token_id in (%s)", tokenIdsStr) + } + + if qf.StartBlockNumber != nil { + query += fmt.Sprintf(" AND block_number >= %s", qf.StartBlockNumber.String()) + } + if qf.EndBlockNumber != nil { + query += fmt.Sprintf(" AND block_number <= %s", qf.EndBlockNumber.String()) + } + + if len(qf.GroupBy) > 0 { + query += fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", ")) + } + + // Add ORDER BY clause + if qf.SortBy != "" { + query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder) + } + + // Add limit clause + if qf.Page > 0 && qf.Limit > 0 { + offset := (qf.Page - 1) * qf.Limit + query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset) + } else if qf.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", qf.Limit) + } + rows, err := c.conn.Query(context.Background(), query, qf.ChainId) + if err != nil { + return QueryResult[common.TokenTransfer]{}, err + } + defer rows.Close() + + queryResult := QueryResult[common.TokenTransfer]{ + Data: []common.TokenTransfer{}, + } + + for rows.Next() { + var tt common.TokenTransfer + err := rows.ScanStruct(&tt) + if err != nil { + return QueryResult[common.TokenTransfer]{}, err + } + queryResult.Data = append(queryResult.Data, tt) + } + + return queryResult, nil +} + func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) { columns := "chain_id, token_type, address, owner, token_id, balance" if len(fields) > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 9a564ac..0f4f14e 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -25,6 +25,23 @@ type QueryFilter struct { ForceConsistentData bool } +type TransfersQueryFilter struct { + ChainId *big.Int + TokenTypes []string + TokenAddress string + WalletAddress string + TokenIds []*big.Int + TransactionHash string + StartBlockNumber *big.Int + EndBlockNumber *big.Int + GroupBy []string + SortBy string + SortOrder string // "ASC" or "DESC" + Page int + Limit int + Offset int +} + type BalancesQueryFilter struct { ChainId *big.Int TokenTypes []string @@ -83,6 +100,7 @@ type IMainStorage interface { DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) + GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) } func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index 73ff691..50be87f 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.4. DO NOT EDIT. +// Code generated by mockery v2.53.2. DO NOT EDIT. //go:build !production @@ -461,6 +461,77 @@ func (_c *MockIMainStorage_GetTokenBalances_Call) RunAndReturn(run func(storage. return _c } +// GetTokenTransfers provides a mock function with given fields: qf, fields +func (_m *MockIMainStorage) GetTokenTransfers(qf storage.TransfersQueryFilter, fields ...string) (storage.QueryResult[common.TokenTransfer], error) { + _va := make([]interface{}, len(fields)) + for _i := range fields { + _va[_i] = fields[_i] + } + var _ca []interface{} + _ca = append(_ca, qf) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetTokenTransfers") + } + + var r0 storage.QueryResult[common.TokenTransfer] + var r1 error + if rf, ok := ret.Get(0).(func(storage.TransfersQueryFilter, ...string) (storage.QueryResult[common.TokenTransfer], error)); ok { + return rf(qf, fields...) + } + if rf, ok := ret.Get(0).(func(storage.TransfersQueryFilter, ...string) storage.QueryResult[common.TokenTransfer]); ok { + r0 = rf(qf, fields...) + } else { + r0 = ret.Get(0).(storage.QueryResult[common.TokenTransfer]) + } + + if rf, ok := ret.Get(1).(func(storage.TransfersQueryFilter, ...string) error); ok { + r1 = rf(qf, fields...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIMainStorage_GetTokenTransfers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTokenTransfers' +type MockIMainStorage_GetTokenTransfers_Call struct { + *mock.Call +} + +// GetTokenTransfers is a helper method to define mock.On call +// - qf storage.TransfersQueryFilter +// - fields ...string +func (_e *MockIMainStorage_Expecter) GetTokenTransfers(qf interface{}, fields ...interface{}) *MockIMainStorage_GetTokenTransfers_Call { + return &MockIMainStorage_GetTokenTransfers_Call{Call: _e.mock.On("GetTokenTransfers", + append([]interface{}{qf}, fields...)...)} +} + +func (_c *MockIMainStorage_GetTokenTransfers_Call) Run(run func(qf storage.TransfersQueryFilter, fields ...string)) *MockIMainStorage_GetTokenTransfers_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]string, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(string) + } + } + run(args[0].(storage.TransfersQueryFilter), variadicArgs...) + }) + return _c +} + +func (_c *MockIMainStorage_GetTokenTransfers_Call) Return(_a0 storage.QueryResult[common.TokenTransfer], _a1 error) *MockIMainStorage_GetTokenTransfers_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockIMainStorage_GetTokenTransfers_Call) RunAndReturn(run func(storage.TransfersQueryFilter, ...string) (storage.QueryResult[common.TokenTransfer], error)) *MockIMainStorage_GetTokenTransfers_Call { + _c.Call.Return(run) + return _c +} + // GetTraces provides a mock function with given fields: qf, fields func (_m *MockIMainStorage) GetTraces(qf storage.QueryFilter, fields ...string) (storage.QueryResult[common.Trace], error) { _va := make([]interface{}, len(fields)) diff --git a/test/mocks/MockIOrchestratorStorage.go b/test/mocks/MockIOrchestratorStorage.go index fe382f0..97cf435 100644 --- a/test/mocks/MockIOrchestratorStorage.go +++ b/test/mocks/MockIOrchestratorStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.4. DO NOT EDIT. +// Code generated by mockery v2.53.2. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIRPCClient.go b/test/mocks/MockIRPCClient.go index 816b205..4737815 100644 --- a/test/mocks/MockIRPCClient.go +++ b/test/mocks/MockIRPCClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.4. DO NOT EDIT. +// Code generated by mockery v2.53.2. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index dc4b958..5921049 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.4. DO NOT EDIT. +// Code generated by mockery v2.53.2. DO NOT EDIT. //go:build !production