Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func RunApi(cmd *cobra.Command, args []string) {
// token holder queries
root.GET("/holders/:address", handlers.GetTokenHoldersByType)

// token transfers queries
root.GET("/transfers", handlers.GetTokenTransfers)
// token ID queries
root.GET("/tokens/:address", handlers.GetTokenIdsByType)

Expand Down
22 changes: 22 additions & 0 deletions internal/common/transfers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"math/big"
"time"
)

type TokenTransfer struct {
TokenType string `json:"token_type" ch:"token_type"`
ChainID *big.Int `json:"chain_id" ch:"chain_id"`
TokenAddress string `json:"token_address" ch:"token_address"`
FromAddress string `json:"from_address" ch:"from_address"`
ToAddress string `json:"to_address" ch:"to_address"`
BlockNumber *big.Int `json:"block_number" ch:"block_number"`
BlockTimestamp time.Time `json:"block_timestamp" ch:"block_timestamp"`
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
TokenID *big.Int `json:"token_id" ch:"token_id"`
Amount *big.Int `json:"amount" ch:"amount"`
LogIndex uint64 `json:"log_index" ch:"log_index"`
Sign int8 `json:"sign" ch:"sign"`
InsertTimestamp time.Time `json:"insert_timestamp" ch:"insert_timestamp"`
}
189 changes: 189 additions & 0 deletions internal/handlers/transfer_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package handlers

import (
"fmt"
"math/big"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/api"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/storage"
)

// TransferModel return type for Swagger documentation
type TransferModel struct {
TokenType string `json:"token_type" ch:"token_type"`
TokenAddress string `json:"token_address" ch:"token_address"`
FromAddress string `json:"from_address" ch:"from_address"`
ToAddress string `json:"to_address" ch:"to_address"`
TokenId string `json:"token_id" ch:"token_id"`
Amount string `json:"amount" ch:"amount"`
BlockNumber string `json:"block_number" ch:"block_number"`
BlockTimestamp string `json:"block_timestamp" ch:"block_timestamp"`
TransactionHash string `json:"transaction_hash" ch:"transaction_hash"`
LogIndex uint64 `json:"log_index" ch:"log_index"`
}

// @Summary Get token transfers
// @Description Retrieve token transfers by various filters
// @Tags transfers
// @Accept json
// @Produce json
// @Security BasicAuth
// @Param chainId path string true "Chain ID"
// @Param token_type query []string false "Token types (erc721, erc1155, erc20)"
// @Param token_address query string false "Token contract address"
// @Param wallet query string false "Wallet address"
// @Param start_block query string false "Start block number"
// @Param end_block query string false "End block number"
// @Param start_timestamp query string false "Start timestamp (RFC3339 format)"
// @Param end_timestamp query string false "End timestamp (RFC3339 format)"
// @Param token_id query []string false "Token IDs"
// @Param transaction_hash query string false "Transaction hash"
// @Param page query int false "Page number for pagination"
// @Param limit query int false "Number of items per page" default(20)
// @Success 200 {object} api.QueryResponse{data=[]TransferModel}
// @Failure 400 {object} api.Error
// @Failure 401 {object} api.Error
// @Failure 500 {object} api.Error
// @Router /{chainId}/transfers [get]
func GetTokenTransfers(c *gin.Context) {
chainId, err := api.GetChainId(c)
if err != nil {
api.BadRequestErrorHandler(c, err)
return
}

tokenTypes, err := getTokenTypesFromReq(c)
if err != nil {
api.BadRequestErrorHandler(c, err)
return
}

walletAddress := strings.ToLower(c.Query("wallet_address"))
if walletAddress != "" && !strings.HasPrefix(walletAddress, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid wallet_address '%s'", walletAddress))
return
}

tokenAddress := strings.ToLower(c.Query("token_address"))
if tokenAddress != "" && !strings.HasPrefix(tokenAddress, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_address '%s'", tokenAddress))
return
}

transactionHash := strings.ToLower(c.Query("transaction_hash"))
if transactionHash != "" && !strings.HasPrefix(transactionHash, "0x") {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid transaction_hash '%s'", transactionHash))
return
}

tokenIds, err := getTokenIdsFromReq(c)
if err != nil {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid token_id: %s", err))
return
}

// Parse block number parameters
var startBlockNumber, endBlockNumber *big.Int
startBlockStr := c.Query("start_block")
if startBlockStr != "" {
startBlockNumber = new(big.Int)
_, ok := startBlockNumber.SetString(startBlockStr, 10)
if !ok {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid start_block '%s'", startBlockStr))
return
}
}

endBlockStr := c.Query("end_block")
if endBlockStr != "" {
endBlockNumber = new(big.Int)
_, ok := endBlockNumber.SetString(endBlockStr, 10)
if !ok {
api.BadRequestErrorHandler(c, fmt.Errorf("invalid end_block '%s'", endBlockStr))
return
}
}

// Define query filter
qf := storage.TransfersQueryFilter{
ChainId: chainId,
TokenTypes: tokenTypes,
WalletAddress: walletAddress,
TokenAddress: tokenAddress,
TokenIds: tokenIds,
TransactionHash: transactionHash,
StartBlockNumber: startBlockNumber,
EndBlockNumber: endBlockNumber,
Page: api.ParseIntQueryParam(c.Query("page"), 0),
Limit: api.ParseIntQueryParam(c.Query("limit"), 20),
SortBy: c.Query("sort_by"),
SortOrder: c.Query("sort_order"),
}

// Define columns for query
columns := []string{
"token_type",
"token_address",
"from_address",
"to_address",
"token_id",
"amount",
"block_number",
"block_timestamp",
"transaction_hash",
"log_index",
}

queryResult := api.QueryResponse{
Meta: api.Meta{
ChainId: chainId.Uint64(),
Page: qf.Page,
Limit: qf.Limit,
},
}

mainStorage, err = getMainStorage()
if err != nil {
log.Error().Err(err).Msg("Error getting main storage")
api.InternalErrorHandler(c)
return
}

transfersResult, err := mainStorage.GetTokenTransfers(qf, columns...)
if err != nil {
log.Error().Err(err).Msg("Error querying token transfers")
api.InternalErrorHandler(c)
return
}

queryResult.Data = serializeTransfers(transfersResult.Data)
sendJSONResponse(c, queryResult)
}

func serializeTransfers(transfers []common.TokenTransfer) []TransferModel {
transferModels := make([]TransferModel, len(transfers))
for i, transfer := range transfers {
transferModels[i] = serializeTransfer(transfer)
}
return transferModels
}

func serializeTransfer(transfer common.TokenTransfer) TransferModel {
return TransferModel{
TokenType: transfer.TokenType,
TokenAddress: transfer.TokenAddress,
FromAddress: transfer.FromAddress,
ToAddress: transfer.ToAddress,
TokenId: transfer.TokenID.String(),
Amount: transfer.Amount.String(),
BlockNumber: transfer.BlockNumber.String(),
BlockTimestamp: transfer.BlockTimestamp.Format(time.RFC3339),
TransactionHash: transfer.TransactionHash,
LogIndex: transfer.LogIndex,
}
}
82 changes: 82 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,88 @@ func (c *ClickHouseConnector) getTableName(chainId *big.Int, defaultTable string
return defaultTable
}

func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) {
columns := "token_type, chain_id, token_address, from_address, to_address, block_number, block_timestamp, transaction_hash, token_id, amount, log_index, sign, insert_timestamp"
if len(fields) > 0 {
columns = strings.Join(fields, ", ")
}
query := fmt.Sprintf("SELECT %s FROM %s.token_transfers WHERE chain_id = ?", columns, c.cfg.Database)

if len(qf.TokenTypes) > 0 {
tokenTypesStr := ""
tokenTypesLen := len(qf.TokenTypes)
for i := 0; i < tokenTypesLen-1; i++ {
tokenTypesStr += fmt.Sprintf("'%s',", qf.TokenTypes[i])
}
tokenTypesStr += fmt.Sprintf("'%s'", qf.TokenTypes[tokenTypesLen-1])
query += fmt.Sprintf(" AND token_type in (%s)", tokenTypesStr)
}

if qf.WalletAddress != "" {
query += fmt.Sprintf(" AND (from_address = '%s' OR to_address = '%s')", qf.WalletAddress, qf.WalletAddress)
}
if qf.TokenAddress != "" {
query += fmt.Sprintf(" AND token_address = '%s'", qf.TokenAddress)
}
if qf.TransactionHash != "" {
query += fmt.Sprintf(" AND transaction_hash = '%s'", qf.TransactionHash)
}

if len(qf.TokenIds) > 0 {
tokenIdsStr := ""
tokenIdsLen := len(qf.TokenIds)
for i := 0; i < tokenIdsLen-1; i++ {
tokenIdsStr += fmt.Sprintf("%s,", qf.TokenIds[i].String())
}
tokenIdsStr += qf.TokenIds[tokenIdsLen-1].String()
query += fmt.Sprintf(" AND token_id in (%s)", tokenIdsStr)
}

if qf.StartBlockNumber != nil {
query += fmt.Sprintf(" AND block_number >= %s", qf.StartBlockNumber.String())
}
if qf.EndBlockNumber != nil {
query += fmt.Sprintf(" AND block_number <= %s", qf.EndBlockNumber.String())
}

if len(qf.GroupBy) > 0 {
query += fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}

// Add limit clause
if qf.Page > 0 && qf.Limit > 0 {
offset := (qf.Page - 1) * qf.Limit
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
} else if qf.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
}
rows, err := c.conn.Query(context.Background(), query, qf.ChainId)
if err != nil {
return QueryResult[common.TokenTransfer]{}, err
}
defer rows.Close()

queryResult := QueryResult[common.TokenTransfer]{
Data: []common.TokenTransfer{},
}

for rows.Next() {
var tt common.TokenTransfer
err := rows.ScanStruct(&tt)
if err != nil {
return QueryResult[common.TokenTransfer]{}, err
}
queryResult.Data = append(queryResult.Data, tt)
}

return queryResult, nil
}

func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) {
columns := "chain_id, token_type, address, owner, token_id, balance"
if len(fields) > 0 {
Expand Down
18 changes: 18 additions & 0 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ type QueryFilter struct {
ForceConsistentData bool
}

type TransfersQueryFilter struct {
ChainId *big.Int
TokenTypes []string
TokenAddress string
WalletAddress string
TokenIds []*big.Int
TransactionHash string
StartBlockNumber *big.Int
EndBlockNumber *big.Int
GroupBy []string
SortBy string
SortOrder string // "ASC" or "DESC"
Page int
Limit int
Offset int
}

type BalancesQueryFilter struct {
ChainId *big.Int
TokenTypes []string
Expand Down Expand Up @@ -83,6 +100,7 @@ type IMainStorage interface {
DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) error

GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
}

func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
Expand Down
Loading