Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func RunApi(cmd *cobra.Command, args []string) {
// token holder queries
root.GET("/holders/:address", handlers.GetTokenHoldersByType)

// token transfers queries
root.GET("/transfers", handlers.GetTokenTransfers)

// search
root.GET("/search/:input", handlers.Search)
}
Expand Down
22 changes: 22 additions & 0 deletions internal/common/transfers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"math/big"
"time"
)

type TokenTransfer struct {
TokenType string `json:"token_type" ch:"token_type"`
ChainID *big.Int `json:"chain_id" ch:"chain_id"`
TokenAddress string `json:"token_address" ch:"token_address"`
FromAddress string `json:"from_address" ch:"from_address"`
ToAddress string `json:"to_address" ch:"to_address"`
BlockNumber *big.Int `json:"block_number" ch:"block_number"`
BlockTimestamp time.Time `json:"block_timestamp" ch:"block_timestamp"`
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
TokenID *big.Int `json:"token_id" ch:"token_id"`
Amount *big.Int `json:"amount" ch:"amount"`
LogIndex uint64 `json:"log_index" ch:"log_index"`
Sign int8 `json:"sign" ch:"sign"`
InsertTimestamp time.Time `json:"insert_timestamp" ch:"insert_timestamp"`
}
189 changes: 189 additions & 0 deletions internal/handlers/transfer_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package handlers

import (
"fmt"
"math/big"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/api"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/storage"
)

// TransferModel return type for Swagger documentation
type TransferModel struct {
TokenType string `json:"token_type" ch:"token_type"`
TokenAddress string `json:"token_address" ch:"token_address"`
FromAddress string `json:"from_address" ch:"from_address"`
ToAddress string `json:"to_address" ch:"to_address"`
TokenId string `json:"token_id" ch:"token_id"`
Amount string `json:"amount" ch:"amount"`
BlockNumber string `json:"block_number" ch:"block_number"`
BlockTimestamp string `json:"block_timestamp" ch:"block_timestamp"`
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
LogIndex uint64 `json:"log_index" ch:"log_index"`
}

// @Summary Get token transfers
// @Description Retrieve token transfers by various filters
// @Tags transfers
// @Accept json
// @Produce json
// @Security BasicAuth
// @Param chainId path string true "Chain ID"
// @Param token_type query []string false "Token types (erc721, erc1155, erc20)"
// @Param token_address query string false "Token contract address"
// @Param wallet query string false "Wallet address"
// @Param start_block query string false "Start block number"
// @Param end_block query string false "End block number"
// @Param start_timestamp query string false "Start timestamp (RFC3339 format)"
// @Param end_timestamp query string false "End timestamp (RFC3339 format)"
// @Param token_id query []string false "Token IDs"
// @Param transaction_hash query string false "Transaction hash"
// @Param page query int false "Page number for pagination"
// @Param limit query int false "Number of items per page" default(20)
// @Success 200 {object} api.QueryResponse{data=[]TransferModel}
// @Failure 400 {object} api.Error
// @Failure 401 {object} api.Error
// @Failure 500 {object} api.Error
// @Router /{chainId}/transfers [get]
func GetTokenTransfers(c *gin.Context) {
chainId, err := api.GetChainId(c)
if err != nil {
api.BadRequestErrorHandler(c, err)
return
}

tokenTypes, err := getTokenTypesFromReq(c)
if err != nil {
api.BadRequestErrorHandler(c, err)
return
}

walletAddress := strings.ToLower(c.Query("wallet_address"))
if walletAddress != "" && !strings.HasPrefix(walletAddress, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid wallet_address '%s'", walletAddress))
return
}

tokenAddress := strings.ToLower(c.Query("token_address"))
if tokenAddress != "" && !strings.HasPrefix(tokenAddress, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_address '%s'", tokenAddress))
return
}

transactionHash := strings.ToLower(c.Query("transaction_hash"))
if transactionHash != "" && !strings.HasPrefix(transactionHash, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid transaction_hash '%s'", transactionHash))
return
}

tokenIds, err := getTokenIdsFromReq(c)
if err != nil {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_id: %s", err))
return
}

// Parse block number parameters
var startBlockNumber, endBlockNumber *big.Int
startBlockStr := c.Query("start_block")
if startBlockStr != "" {
startBlockNumber = new(big.Int)
_, ok := startBlockNumber.SetString(startBlockStr, 10)
if !ok {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid start_block '%s'", startBlockStr))
return
}
}

endBlockStr := c.Query("end_block")
if endBlockStr != "" {
endBlockNumber = new(big.Int)
_, ok := endBlockNumber.SetString(endBlockStr, 10)
if !ok {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid end_block '%s'", endBlockStr))
return
}
}

// Define query filter
qf := storage.TransfersQueryFilter{
ChainId: chainId,
TokenTypes: tokenTypes,
WalletAddress: walletAddress,
TokenAddress: tokenAddress,
TokenIds: tokenIds,
TransactionHash: transactionHash,
StartBlockNumber: startBlockNumber,
EndBlockNumber: endBlockNumber,
Page: api.ParseIntQueryParam(c.Query("page"), 0),
Limit: api.ParseIntQueryParam(c.Query("limit"), 20),
SortBy: c.Query("sort_by"),
SortOrder: c.Query("sort_order"),
}

// Define columns for query
columns := []string{
"token_type",
"token_address",
"from_address",
"to_address",
"token_id",
"amount",
"block_number",
"block_timestamp",
"transaction_hash",
"log_index",
}

queryResult := api.QueryResponse{
Meta: api.Meta{
ChainId: chainId.Uint64(),
Page: qf.Page,
Limit: qf.Limit,
},
}

mainStorage, err = getMainStorage()
if err != nil {
log.Error().Err(err).Msg("Error getting main storage")
api.InternalErrorHandler(c)
return
}

transfersResult, err := mainStorage.GetTokenTransfers(qf, columns...)
if err != nil {
log.Error().Err(err).Msg("Error querying token transfers")
api.InternalErrorHandler(c)
return
}

queryResult.Data = serializeTransfers(transfersResult.Data)
sendJSONResponse(c, queryResult)
}

func serializeTransfers(transfers []common.TokenTransfer) []TransferModel {
transferModels := make([]TransferModel, len(transfers))
for i, transfer := range transfers {
transferModels[i] = serializeTransfer(transfer)
}
return transferModels
}

func serializeTransfer(transfer common.TokenTransfer) TransferModel {
return TransferModel{
TokenType: transfer.TokenType,
TokenAddress: transfer.TokenAddress,
FromAddress: transfer.FromAddress,
ToAddress: transfer.ToAddress,
TokenId: transfer.TokenID.String(),
Amount: transfer.Amount.String(),
BlockNumber: transfer.BlockNumber.String(),
BlockTimestamp: transfer.BlockTimestamp.Format(time.RFC3339),
TransactionHash: transfer.TransactionHash,
LogIndex: transfer.LogIndex,
}
}
82 changes: 82 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,88 @@ func (c *ClickHouseConnector) getTableName(chainId *big.Int, defaultTable string
return defaultTable
}

func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) {
columns := "token_type, chain_id, token_address, from_address, to_address, block_number, block_timestamp, transaction_hash, token_id, amount, log_index, sign, insert_timestamp"
if len(fields) > 0 {
columns = strings.Join(fields, ", ")
}
query := fmt.Sprintf("SELECT %s FROM %s.token_transfers WHERE chain_id = ?", columns, c.cfg.Database)

if len(qf.TokenTypes) > 0 {
tokenTypesStr := ""
tokenTypesLen := len(qf.TokenTypes)
for i := 0; i < tokenTypesLen-1; i++ {
tokenTypesStr += fmt.Sprintf("'%s',", qf.TokenTypes[i])
}
tokenTypesStr += fmt.Sprintf("'%s'", qf.TokenTypes[tokenTypesLen-1])
query += fmt.Sprintf(" AND token_type in (%s)", tokenTypesStr)
}

if qf.WalletAddress != "" {
query += fmt.Sprintf(" AND (from_address = '%s' OR to_address = '%s')", qf.WalletAddress, qf.WalletAddress)
}
if qf.TokenAddress != "" {
query += fmt.Sprintf(" AND token_address = '%s'", qf.TokenAddress)
}
if qf.TransactionHash != "" {
query += fmt.Sprintf(" AND transaction_hash = '%s'", qf.TransactionHash)
}

if len(qf.TokenIds) > 0 {
tokenIdsStr := ""
tokenIdsLen := len(qf.TokenIds)
for i := 0; i < tokenIdsLen-1; i++ {
tokenIdsStr += fmt.Sprintf("%s,", qf.TokenIds[i].String())
}
tokenIdsStr += qf.TokenIds[tokenIdsLen-1].String()
query += fmt.Sprintf(" AND token_id in (%s)", tokenIdsStr)
}

if qf.StartBlockNumber != nil {
query += fmt.Sprintf(" AND block_number >= %s", qf.StartBlockNumber.String())
}
if qf.EndBlockNumber != nil {
query += fmt.Sprintf(" AND block_number <= %s", qf.EndBlockNumber.String())
}

if len(qf.GroupBy) > 0 {
query += fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Add limit clause
if qf.Page > 0 && qf.Limit > 0 {
offset := (qf.Page - 1) * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}
rows, err := c.conn.Query(context.Background(), query, qf.ChainId)
if err != nil {
return QueryResult[common.TokenTransfer]{}, err
}
defer rows.Close()

queryResult := QueryResult[common.TokenTransfer]{
Data: []common.TokenTransfer{},
}

for rows.Next() {
var tt common.TokenTransfer
err := rows.ScanStruct(&tt)
if err != nil {
return QueryResult[common.TokenTransfer]{}, err
}
queryResult.Data = append(queryResult.Data, tt)
}

return queryResult, nil
}

func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) {
columns := "chain_id, token_type, address, owner, token_id, balance"
if len(fields) > 0 {
Expand Down
18 changes: 18 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ type QueryFilter struct {
ForceConsistentData bool
}

type TransfersQueryFilter struct {
ChainId *big.Int
TokenTypes []string
TokenAddress string
WalletAddress string
TokenIds []*big.Int
TransactionHash string
StartBlockNumber *big.Int
EndBlockNumber *big.Int
GroupBy []string
SortBy string
SortOrder string // "ASC" or "DESC"
Page int
Limit int
Offset int
}

type BalancesQueryFilter struct {
ChainId *big.Int
TokenTypes []string
Expand Down Expand Up @@ -83,6 +100,7 @@ type IMainStorage interface {
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error

GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
}

func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
Expand Down
Loading