Skip to content

Commit c0d6438

Browse files
authored
Merge pull request #249 from thirdweb-dev/np/wallet_address_query_split
wallet address query split
2 parents 8d15fa9 + c96d5f4 commit c0d6438

File tree

3 files changed

+289
-86
lines changed

3 files changed

+289
-86
lines changed

indexer

50.3 MB
Binary file not shown.

internal/storage/clickhouse.go

Lines changed: 146 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -467,69 +467,11 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (Query
467467
}
468468

469469
func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
470-
tableName := c.getTableName(qf.ChainId, table)
471470
// Build the SELECT clause with aggregates
472471
selectColumns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ")
473-
query := fmt.Sprintf("SELECT %s FROM %s.%s", selectColumns, c.cfg.Database, tableName)
474-
if qf.ForceConsistentData {
475-
query += " FINAL"
476-
}
477-
478-
whereClauses := []string{}
479-
// Apply filters
480-
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
481-
whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String()))
482-
}
483-
blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers)
484-
if blockNumbersClause != "" {
485-
whereClauses = append(whereClauses, blockNumbersClause)
486-
}
487-
contractAddressClause := createContractAddressClause(table, qf.ContractAddress)
488-
if contractAddressClause != "" {
489-
whereClauses = append(whereClauses, contractAddressClause)
490-
}
491-
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
492-
if walletAddressClause != "" {
493-
whereClauses = append(whereClauses, walletAddressClause)
494-
}
495-
fromAddressClause := createFromAddressClause(table, qf.FromAddress)
496-
if fromAddressClause != "" {
497-
whereClauses = append(whereClauses, fromAddressClause)
498-
}
499-
signatureClause := createSignatureClause(table, qf.Signature)
500-
if signatureClause != "" {
501-
whereClauses = append(whereClauses, signatureClause)
502-
}
503-
for key, value := range qf.FilterParams {
504-
whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value)))
505-
}
506472

507-
// Add WHERE clause to query if there are any conditions
508-
if len(whereClauses) > 0 {
509-
query += " WHERE " + strings.Join(whereClauses, " AND ")
510-
}
511-
512-
if len(qf.GroupBy) > 0 {
513-
groupByColumns := strings.Join(qf.GroupBy, ", ")
514-
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
515-
}
516-
517-
// Add ORDER BY clause
518-
if qf.SortBy != "" {
519-
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
520-
}
521-
522-
// Add limit clause
523-
if qf.Page >= 0 && qf.Limit > 0 {
524-
offset := qf.Page * qf.Limit
525-
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
526-
} else if qf.Limit > 0 {
527-
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
528-
}
529-
530-
if c.cfg.MaxQueryTime > 0 {
531-
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
532-
}
473+
// Use the new query building logic
474+
query := c.buildQuery(table, selectColumns, qf)
533475

534476
if err := common.ValidateQuery(query); err != nil {
535477
return QueryResult[interface{}]{}, err
@@ -611,64 +553,177 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
611553
}
612554

613555
func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string {
556+
var query string
557+
558+
// Check if we need to handle wallet address with UNION for transactions
559+
if table == "transactions" && qf.WalletAddress != "" {
560+
query = c.buildUnionQuery(table, columns, qf)
561+
} else {
562+
query = c.buildStandardQuery(table, columns, qf)
563+
}
564+
565+
// Apply post-query clauses to ALL queries
566+
query = c.addPostQueryClauses(query, qf)
567+
568+
return query
569+
}
570+
571+
func (c *ClickHouseConnector) buildStandardQuery(table, columns string, qf QueryFilter) string {
614572
tableName := c.getTableName(qf.ChainId, table)
615573
query := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
616574
if qf.ForceConsistentData {
617575
query += " FINAL"
618576
}
619577

578+
whereClauses := c.buildWhereClauses(table, qf)
579+
580+
// Add WHERE clause to query if there are any conditions
581+
if len(whereClauses) > 0 {
582+
query += " WHERE " + strings.Join(whereClauses, " AND ")
583+
}
584+
585+
return query
586+
}
587+
588+
func (c *ClickHouseConnector) buildUnionQuery(table, columns string, qf QueryFilter) string {
589+
tableName := c.getTableName(qf.ChainId, table)
590+
591+
// Build base where clauses (excluding wallet address)
592+
baseWhereClauses := c.buildWhereClauses(table, qf)
593+
594+
// Create two separate queries for from_address and to_address
595+
fromQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
596+
if qf.ForceConsistentData {
597+
fromQuery += " FINAL"
598+
}
599+
600+
toQuery := fmt.Sprintf("SELECT %s FROM %s.%s", columns, c.cfg.Database, tableName)
601+
if qf.ForceConsistentData {
602+
toQuery += " FINAL"
603+
}
604+
605+
// Add base where clauses to both queries
606+
if len(baseWhereClauses) > 0 {
607+
baseWhereClause := strings.Join(baseWhereClauses, " AND ")
608+
fromQuery += " WHERE " + baseWhereClause + " AND from_address = '" + strings.ToLower(qf.WalletAddress) + "'"
609+
toQuery += " WHERE " + baseWhereClause + " AND to_address = '" + strings.ToLower(qf.WalletAddress) + "'"
610+
} else {
611+
fromQuery += " WHERE from_address = '" + strings.ToLower(qf.WalletAddress) + "'"
612+
toQuery += " WHERE to_address = '" + strings.ToLower(qf.WalletAddress) + "'"
613+
}
614+
615+
// Apply ORDER BY to both queries for consistent results
616+
if qf.SortBy != "" {
617+
fromQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
618+
toQuery += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
619+
}
620+
621+
// Apply LIMIT to each individual query to avoid loading too much data
622+
// We use a higher limit to ensure we get enough results after UNION
623+
individualLimit := qf.Limit * 2 // Double the limit to account for potential duplicates
624+
if qf.Page >= 0 && qf.Limit > 0 {
625+
offset := qf.Page * qf.Limit
626+
fromQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset)
627+
toQuery += fmt.Sprintf(" LIMIT %d OFFSET %d", individualLimit, offset)
628+
} else if qf.Limit > 0 {
629+
fromQuery += fmt.Sprintf(" LIMIT %d", individualLimit)
630+
toQuery += fmt.Sprintf(" LIMIT %d", individualLimit)
631+
}
632+
633+
// Combine with UNION
634+
unionQuery := fmt.Sprintf("(%s) UNION ALL (%s)", fromQuery, toQuery)
635+
636+
return unionQuery
637+
}
638+
639+
func (c *ClickHouseConnector) addPostQueryClauses(query string, qf QueryFilter) string {
640+
// Add GROUP BY clause if needed (for aggregations)
641+
if len(qf.GroupBy) > 0 {
642+
groupByClause := fmt.Sprintf(" GROUP BY %s", strings.Join(qf.GroupBy, ", "))
643+
// For UNION queries, we need to wrap the entire query in a subquery to apply GROUP BY
644+
if strings.Contains(query, "UNION ALL") {
645+
query = fmt.Sprintf("SELECT * FROM (%s) %s", query, groupByClause)
646+
} else {
647+
// For standard queries, just append GROUP BY
648+
query += groupByClause
649+
}
650+
}
651+
652+
// For UNION queries, ORDER BY and LIMIT are already applied to individual queries
653+
// For standard queries, apply ORDER BY and LIMIT
654+
if !strings.Contains(query, "UNION ALL") {
655+
// Add ORDER BY clause
656+
if qf.SortBy != "" {
657+
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
658+
}
659+
660+
// Add limit clause
661+
if qf.Page >= 0 && qf.Limit > 0 {
662+
offset := qf.Page * qf.Limit
663+
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
664+
} else if qf.Limit > 0 {
665+
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
666+
}
667+
} else {
668+
// For UNION queries, we need to apply final LIMIT after the UNION
669+
// This ensures we get exactly the requested number of results
670+
if qf.Page >= 0 && qf.Limit > 0 {
671+
offset := qf.Page * qf.Limit
672+
query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d OFFSET %d", query, qf.Limit, offset)
673+
} else if qf.Limit > 0 {
674+
query = fmt.Sprintf("SELECT * FROM (%s) LIMIT %d", query, qf.Limit)
675+
}
676+
}
677+
678+
// Add settings at the very end
679+
if c.cfg.MaxQueryTime > 0 {
680+
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
681+
}
682+
683+
return query
684+
}
685+
686+
func (c *ClickHouseConnector) buildWhereClauses(table string, qf QueryFilter) []string {
620687
whereClauses := []string{}
688+
621689
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
622690
whereClauses = append(whereClauses, createFilterClause("chain_id", qf.ChainId.String()))
623691
}
692+
624693
blockNumbersClause := createBlockNumbersClause(qf.BlockNumbers)
625694
if blockNumbersClause != "" {
626695
whereClauses = append(whereClauses, blockNumbersClause)
627696
}
697+
628698
contractAddressClause := createContractAddressClause(table, qf.ContractAddress)
629699
if contractAddressClause != "" {
630700
whereClauses = append(whereClauses, contractAddressClause)
631701
}
632-
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
633-
if walletAddressClause != "" {
634-
whereClauses = append(whereClauses, walletAddressClause)
702+
703+
// Skip wallet address clause for UNION queries as it's handled separately
704+
if table != "transactions" && qf.WalletAddress != "" {
705+
walletAddressClause := createWalletAddressClause(table, qf.WalletAddress)
706+
if walletAddressClause != "" {
707+
whereClauses = append(whereClauses, walletAddressClause)
708+
}
635709
}
710+
636711
fromAddressClause := createFromAddressClause(table, qf.FromAddress)
637712
if fromAddressClause != "" {
638713
whereClauses = append(whereClauses, fromAddressClause)
639714
}
715+
640716
signatureClause := createSignatureClause(table, qf.Signature)
641717
if signatureClause != "" {
642718
whereClauses = append(whereClauses, signatureClause)
643719
}
720+
644721
// Add filter params
645722
for key, value := range qf.FilterParams {
646723
whereClauses = append(whereClauses, createFilterClause(key, strings.ToLower(value)))
647724
}
648725

649-
// Add WHERE clause to query if there are any conditions
650-
if len(whereClauses) > 0 {
651-
query += " WHERE " + strings.Join(whereClauses, " AND ")
652-
}
653-
654-
// Add ORDER BY clause
655-
if qf.SortBy != "" {
656-
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
657-
}
658-
659-
// Add limit clause
660-
if qf.Page >= 0 && qf.Limit > 0 {
661-
offset := qf.Page * qf.Limit
662-
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
663-
} else if qf.Limit > 0 {
664-
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
665-
}
666-
667-
if c.cfg.MaxQueryTime > 0 {
668-
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
669-
}
670-
671-
return query
726+
return whereClauses
672727
}
673728

674729
func createFilterClause(key, value string) string {
@@ -2064,3 +2119,8 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
20642119

20652120
return blockData, nil
20662121
}
2122+
2123+
// Helper function to test query generation
2124+
func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string {
2125+
return c.buildQuery(table, columns, qf)
2126+
}

0 commit comments

Comments
 (0)