Skip to content

Commit e7b3bae

Browse files
committed
feat: separate storage method for aggregates
1 parent e782547 commit e7b3bae

File tree

6 files changed

+170
-110
lines changed

6 files changed

+170
-110
lines changed

api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type QueryResponse struct {
7070
// @Description Query result data
7171
Data interface{} `json:"data,omitempty"`
7272
// @Description Aggregation results
73-
Aggregations map[string]string `json:"aggregations,omitempty"`
73+
Aggregations []map[string]interface{} `json:"aggregations,omitempty"`
7474
}
7575

7676
func writeError(w http.ResponseWriter, message string, code int) {

internal/handlers/logs_handlers.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -133,39 +133,59 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) {
133133
return
134134
}
135135

136-
logs, err := mainStorage.GetLogs(storage.QueryFilter{
136+
// Prepare the QueryFilter
137+
qf := storage.QueryFilter{
137138
FilterParams: queryParams.FilterParams,
138-
GroupBy: queryParams.GroupBy,
139+
ContractAddress: contractAddress,
140+
Signature: signatureHash,
141+
ChainId: chainId,
139142
SortBy: queryParams.SortBy,
140143
SortOrder: queryParams.SortOrder,
141144
Page: queryParams.Page,
142145
Limit: queryParams.Limit,
143-
Aggregates: queryParams.Aggregates,
144-
ContractAddress: contractAddress,
145-
Signature: signatureHash,
146-
ChainId: chainId,
147-
})
148-
if err != nil {
149-
log.Error().Err(err).Msg("Error querying logs")
150-
api.InternalErrorHandler(c)
151-
return
152146
}
153147

154-
response := api.QueryResponse{
148+
// Initialize the QueryResult
149+
queryResult := api.QueryResponse{
155150
Meta: api.Meta{
156151
ChainId: chainId.Uint64(),
157152
ContractAddress: contractAddress,
158153
Signature: signatureHash,
159154
Page: queryParams.Page,
160155
Limit: queryParams.Limit,
161-
TotalItems: len(logs.Data),
162-
TotalPages: 0, // TODO: Implement total pages count
156+
TotalItems: 0,
157+
TotalPages: 0, // Implement total pages count if needed
163158
},
164-
Data: logs.Data,
165-
Aggregations: logs.Aggregates,
159+
Data: nil,
160+
Aggregations: nil,
161+
}
162+
163+
// If aggregates are specified, retrieve them
164+
if len(queryParams.Aggregates) > 0 {
165+
qf.Aggregates = queryParams.Aggregates
166+
qf.GroupBy = queryParams.GroupBy
167+
168+
aggregatesResult, err := mainStorage.GetAggregations("logs", qf)
169+
if err != nil {
170+
log.Error().Err(err).Msg("Error querying aggregates")
171+
api.InternalErrorHandler(c)
172+
return
173+
}
174+
queryResult.Aggregations = aggregatesResult.Aggregates
175+
queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates)
176+
} else {
177+
// Retrieve logs data
178+
logsResult, err := mainStorage.GetLogs(qf)
179+
if err != nil {
180+
log.Error().Err(err).Msg("Error querying logs")
181+
api.InternalErrorHandler(c)
182+
return
183+
}
184+
queryResult.Data = logsResult.Data
185+
queryResult.Meta.TotalItems = len(logsResult.Data)
166186
}
167187

168-
sendJSONResponse(c, response)
188+
sendJSONResponse(c, queryResult)
169189
}
170190

171191
func getMainStorage() (storage.IMainStorage, error) {

internal/handlers/transactions_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string
163163
TotalPages: 0, // TODO: Implement total pages count
164164
},
165165
Data: result.Data,
166-
Aggregations: result.Aggregates,
166+
Aggregations: nil,
167167
}
168168

169169
c.JSON(http.StatusOK, response)

internal/storage/clickhouse.go

Lines changed: 61 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -342,19 +342,66 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo
342342
return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction)
343343
}
344344

345-
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) {
346-
var columns string
345+
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) {
346+
columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
347+
return executeQuery[common.Log](c, "logs", columns, qf, scanLog)
348+
}
349+
350+
func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
351+
// Build the SELECT clause with aggregates
352+
columns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ")
353+
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)
354+
355+
// Apply filters
356+
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
357+
query = addFilterParams("chain_id", qf.ChainId.String(), query)
358+
}
359+
query = addContractAddress(table, query, qf.ContractAddress)
360+
361+
if qf.Signature != "" {
362+
query += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
363+
}
347364

348-
if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 {
349-
// Build columns for SELECT when grouping or aggregating
350-
selectColumns := append(qf.GroupBy, qf.Aggregates...)
351-
columns = strings.Join(selectColumns, ", ")
352-
} else {
353-
// Default columns when not grouping
354-
columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
365+
for key, value := range qf.FilterParams {
366+
query = addFilterParams(key, strings.ToLower(value), query)
355367
}
356368

357-
return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap)
369+
// Add GROUP BY clause if specified
370+
if len(qf.GroupBy) > 0 {
371+
groupByColumns := strings.Join(qf.GroupBy, ", ")
372+
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
373+
}
374+
375+
// Execute the query
376+
rows, err := c.conn.Query(context.Background(), query)
377+
if err != nil {
378+
return QueryResult[interface{}]{}, err
379+
}
380+
defer rows.Close()
381+
382+
// Collect results
383+
var aggregates []map[string]interface{}
384+
for rows.Next() {
385+
columns := rows.Columns()
386+
values := make([]interface{}, len(columns))
387+
valuePtrs := make([]interface{}, len(columns))
388+
for i := range columns {
389+
valuePtrs[i] = &values[i]
390+
}
391+
392+
if err := rows.Scan(valuePtrs...); err != nil {
393+
return QueryResult[interface{}]{}, err
394+
}
395+
396+
result := make(map[string]interface{})
397+
for i, col := range columns {
398+
result[col] = values[i]
399+
}
400+
401+
aggregates = append(aggregates, result)
402+
}
403+
404+
return QueryResult[interface{}]{Data: nil, Aggregates: aggregates}, nil
358405
}
359406

360407
func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) {
@@ -367,8 +414,7 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
367414
defer rows.Close()
368415

369416
queryResult := QueryResult[T]{
370-
Data: []T{},
371-
Aggregates: map[string]string{},
417+
Data: []T{},
372418
}
373419

374420
for rows.Next() {
@@ -379,21 +425,13 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
379425
queryResult.Data = append(queryResult.Data, item)
380426
}
381427

382-
if len(qf.Aggregates) > 0 {
383-
aggregates, err := c.executeAggregateQuery(table, qf)
384-
if err != nil {
385-
return queryResult, err
386-
}
387-
queryResult.Aggregates = aggregates
388-
}
389-
390428
return queryResult, nil
391429
}
392430

393431
func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string {
394432
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)
395433

396-
if qf.ChainId.Sign() > 0 {
434+
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
397435
query = addFilterParams("chain_id", qf.ChainId.String(), query)
398436
}
399437
query = addContractAddress(table, query, qf.ContractAddress)
@@ -407,12 +445,6 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
407445
query = addFilterParams(key, strings.ToLower(value), query)
408446
}
409447

410-
// Add GROUP BY clause if specified
411-
if len(qf.GroupBy) > 0 {
412-
groupByColumns := strings.Join(qf.GroupBy, ", ")
413-
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
414-
}
415-
416448
// Add ORDER BY clause
417449
if qf.SortBy != "" {
418450
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
@@ -422,9 +454,8 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
422454
if qf.Page > 0 && qf.Limit > 0 {
423455
offset := (qf.Page - 1) * qf.Limit
424456
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
425-
} else {
426-
// Add limit clause
427-
query += getLimitClause(int(qf.Limit))
457+
} else if qf.Limit > 0 {
458+
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
428459
}
429460

430461
return query
@@ -484,35 +515,6 @@ func getTopicValueFormat(topic string) string {
484515
return result
485516
}
486517

487-
func (c *ClickHouseConnector) executeAggregateQuery(table string, qf QueryFilter) (map[string]string, error) {
488-
aggregateQuery := "SELECT " + strings.Join(qf.Aggregates, ", ") +
489-
fmt.Sprintf(" FROM %s.%s WHERE is_deleted = 0", c.cfg.Database, table)
490-
491-
if qf.ContractAddress != "" {
492-
aggregateQuery += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress)
493-
}
494-
if qf.Signature != "" {
495-
aggregateQuery += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
496-
}
497-
for key, value := range qf.FilterParams {
498-
aggregateQuery += fmt.Sprintf(" AND %s = '%s'", key, value)
499-
}
500-
501-
row := c.conn.QueryRow(context.Background(), aggregateQuery)
502-
aggregateResultsJSON, err := json.Marshal(row)
503-
if err != nil {
504-
return nil, fmt.Errorf("error marshaling aggregate results to JSON: %w", err)
505-
}
506-
507-
var aggregateResultsMap map[string]string
508-
err = json.Unmarshal(aggregateResultsJSON, &aggregateResultsMap)
509-
if err != nil {
510-
return nil, fmt.Errorf("error unmarshaling aggregate results JSON to map: %w", err)
511-
}
512-
513-
return aggregateResultsMap, nil
514-
}
515-
516518
func scanTransaction(rows driver.Rows) (common.Transaction, error) {
517519
var tx common.Transaction
518520
err := rows.Scan(
@@ -572,27 +574,6 @@ func scanLog(rows driver.Rows) (common.Log, error) {
572574
return log, nil
573575
}
574576

575-
func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) {
576-
columns := rows.Columns()
577-
values := make([]interface{}, len(columns))
578-
valuePtrs := make([]interface{}, len(columns))
579-
580-
for i := range columns {
581-
valuePtrs[i] = &values[i]
582-
}
583-
584-
if err := rows.Scan(valuePtrs...); err != nil {
585-
return nil, err
586-
}
587-
588-
result := make(map[string]interface{})
589-
for i, col := range columns {
590-
result[col] = values[i]
591-
}
592-
593-
return result, nil
594-
}
595-
596577
func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
597578
query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
598579
if chainId.Sign() > 0 {

internal/storage/connector.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@ type QueryFilter struct {
1818
Page int
1919
Limit int
2020
Offset int
21-
Aggregates []string
21+
Aggregates []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"]
2222
FromAddress string
2323
ContractAddress string
2424
Signature string
2525
}
2626
type QueryResult[T any] struct {
2727
// TODO: findout how to only allow Log/transaction arrays or split the result
28-
Data []T `json:"data"`
29-
Aggregates map[string]string `json:"aggregates"`
28+
Data []T `json:"data"`
29+
Aggregates []map[string]interface{} `json:"aggregates"`
3030
}
31+
3132
type IStorage struct {
3233
OrchestratorStorage IOrchestratorStorage
3334
MainStorage IMainStorage
@@ -54,7 +55,8 @@ type IMainStorage interface {
5455

5556
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
5657
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
57-
GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error)
58+
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
59+
GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
5860
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5961
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
6062
/**

0 commit comments

Comments
 (0)