Skip to content

Commit d22e74d

Browse files
committed
feat: separate storage method for aggregates
1 parent 2969e99 commit d22e74d

File tree

6 files changed

+169
-109
lines changed

6 files changed

+169
-109
lines changed

api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type QueryResponse struct {
7070
// @Description Query result data
7171
Data interface{} `json:"data,omitempty"`
7272
// @Description Aggregation results
73-
Aggregations map[string]string `json:"aggregations,omitempty"`
73+
Aggregations []map[string]interface{} `json:"aggregations,omitempty"`
7474
}
7575

7676
func writeError(w http.ResponseWriter, message string, code int) {

internal/handlers/logs_handlers.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -133,39 +133,59 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) {
133133
return
134134
}
135135

136-
logs, err := mainStorage.GetLogs(storage.QueryFilter{
136+
// Prepare the QueryFilter
137+
qf := storage.QueryFilter{
137138
FilterParams: queryParams.FilterParams,
138-
GroupBy: queryParams.GroupBy,
139+
ContractAddress: contractAddress,
140+
Signature: signatureHash,
141+
ChainId: chainId,
139142
SortBy: queryParams.SortBy,
140143
SortOrder: queryParams.SortOrder,
141144
Page: queryParams.Page,
142145
Limit: queryParams.Limit,
143-
Aggregates: queryParams.Aggregates,
144-
ContractAddress: contractAddress,
145-
Signature: signatureHash,
146-
ChainId: chainId,
147-
})
148-
if err != nil {
149-
log.Error().Err(err).Msg("Error querying logs")
150-
api.InternalErrorHandler(c)
151-
return
152146
}
153147

154-
response := api.QueryResponse{
148+
// Initialize the QueryResult
149+
queryResult := api.QueryResponse{
155150
Meta: api.Meta{
156151
ChainId: chainId.Uint64(),
157152
ContractAddress: contractAddress,
158153
Signature: signatureHash,
159154
Page: queryParams.Page,
160155
Limit: queryParams.Limit,
161-
TotalItems: len(logs.Data),
162-
TotalPages: 0, // TODO: Implement total pages count
156+
TotalItems: 0,
157+
TotalPages: 0, // Implement total pages count if needed
163158
},
164-
Data: logs.Data,
165-
Aggregations: logs.Aggregates,
159+
Data: nil,
160+
Aggregations: nil,
161+
}
162+
163+
// If aggregates are specified, retrieve them
164+
if len(queryParams.Aggregates) > 0 {
165+
qf.Aggregates = queryParams.Aggregates
166+
qf.GroupBy = queryParams.GroupBy
167+
168+
aggregatesResult, err := mainStorage.GetAggregations("logs", qf)
169+
if err != nil {
170+
log.Error().Err(err).Msg("Error querying aggregates")
171+
api.InternalErrorHandler(c)
172+
return
173+
}
174+
queryResult.Aggregations = aggregatesResult.Aggregates
175+
queryResult.Meta.TotalItems = len(aggregatesResult.Aggregates)
176+
} else {
177+
// Retrieve logs data
178+
logsResult, err := mainStorage.GetLogs(qf)
179+
if err != nil {
180+
log.Error().Err(err).Msg("Error querying logs")
181+
api.InternalErrorHandler(c)
182+
return
183+
}
184+
queryResult.Data = logsResult.Data
185+
queryResult.Meta.TotalItems = len(logsResult.Data)
166186
}
167187

168-
sendJSONResponse(c, response)
188+
sendJSONResponse(c, queryResult)
169189
}
170190

171191
func getMainStorage() (storage.IMainStorage, error) {

internal/handlers/transactions_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string
163163
TotalPages: 0, // TODO: Implement total pages count
164164
},
165165
Data: result.Data,
166-
Aggregations: result.Aggregates,
166+
Aggregations: nil,
167167
}
168168

169169
c.JSON(http.StatusOK, response)

internal/storage/clickhouse.go

Lines changed: 60 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -291,19 +291,66 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo
291291
return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction)
292292
}
293293

294-
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) {
295-
var columns string
294+
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) {
295+
columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
296+
return executeQuery[common.Log](c, "logs", columns, qf, scanLog)
297+
}
298+
299+
func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
300+
// Build the SELECT clause with aggregates
301+
columns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ")
302+
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)
303+
304+
// Apply filters
305+
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
306+
query = addFilterParams("chain_id", qf.ChainId.String(), query)
307+
}
308+
query = addContractAddress(table, query, qf.ContractAddress)
309+
310+
if qf.Signature != "" {
311+
query += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
312+
}
296313

297-
if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 {
298-
// Build columns for SELECT when grouping or aggregating
299-
selectColumns := append(qf.GroupBy, qf.Aggregates...)
300-
columns = strings.Join(selectColumns, ", ")
301-
} else {
302-
// Default columns when not grouping
303-
columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
314+
for key, value := range qf.FilterParams {
315+
query = addFilterParams(key, strings.ToLower(value), query)
304316
}
305317

306-
return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap)
318+
// Add GROUP BY clause if specified
319+
if len(qf.GroupBy) > 0 {
320+
groupByColumns := strings.Join(qf.GroupBy, ", ")
321+
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
322+
}
323+
324+
// Execute the query
325+
rows, err := c.conn.Query(context.Background(), query)
326+
if err != nil {
327+
return QueryResult[interface{}]{}, err
328+
}
329+
defer rows.Close()
330+
331+
// Collect results
332+
var aggregates []map[string]interface{}
333+
for rows.Next() {
334+
columns := rows.Columns()
335+
values := make([]interface{}, len(columns))
336+
valuePtrs := make([]interface{}, len(columns))
337+
for i := range columns {
338+
valuePtrs[i] = &values[i]
339+
}
340+
341+
if err := rows.Scan(valuePtrs...); err != nil {
342+
return QueryResult[interface{}]{}, err
343+
}
344+
345+
result := make(map[string]interface{})
346+
for i, col := range columns {
347+
result[col] = values[i]
348+
}
349+
350+
aggregates = append(aggregates, result)
351+
}
352+
353+
return QueryResult[interface{}]{Data: nil, Aggregates: aggregates}, nil
307354
}
308355

309356
func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) {
@@ -317,7 +364,6 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
317364

318365
queryResult := QueryResult[T]{
319366
Data: []T{},
320-
Aggregates: map[string]string{},
321367
}
322368

323369
for rows.Next() {
@@ -328,21 +374,13 @@ func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf Query
328374
queryResult.Data = append(queryResult.Data, item)
329375
}
330376

331-
if len(qf.Aggregates) > 0 {
332-
aggregates, err := c.executeAggregateQuery(table, qf)
333-
if err != nil {
334-
return queryResult, err
335-
}
336-
queryResult.Aggregates = aggregates
337-
}
338-
339377
return queryResult, nil
340378
}
341379

342380
func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) string {
343381
query := fmt.Sprintf("SELECT %s FROM %s.%s WHERE is_deleted = 0", columns, c.cfg.Database, table)
344382

345-
if qf.ChainId.Sign() > 0 {
383+
if qf.ChainId != nil && qf.ChainId.Sign() > 0 {
346384
query = addFilterParams("chain_id", qf.ChainId.String(), query)
347385
}
348386
query = addContractAddress(table, query, qf.ContractAddress)
@@ -356,12 +394,6 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
356394
query = addFilterParams(key, strings.ToLower(value), query)
357395
}
358396

359-
// Add GROUP BY clause if specified
360-
if len(qf.GroupBy) > 0 {
361-
groupByColumns := strings.Join(qf.GroupBy, ", ")
362-
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
363-
}
364-
365397
// Add ORDER BY clause
366398
if qf.SortBy != "" {
367399
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
@@ -371,9 +403,8 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
371403
if qf.Page > 0 && qf.Limit > 0 {
372404
offset := (qf.Page - 1) * qf.Limit
373405
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
374-
} else {
375-
// Add limit clause
376-
query += getLimitClause(int(qf.Limit))
406+
} else if qf.Limit > 0 {
407+
query += fmt.Sprintf(" LIMIT %d", qf.Limit)
377408
}
378409

379410
return query
@@ -433,35 +464,6 @@ func getTopicValueFormat(topic string) string {
433464
return result
434465
}
435466

436-
func (c *ClickHouseConnector) executeAggregateQuery(table string, qf QueryFilter) (map[string]string, error) {
437-
aggregateQuery := "SELECT " + strings.Join(qf.Aggregates, ", ") +
438-
fmt.Sprintf(" FROM %s.%s WHERE is_deleted = 0", c.cfg.Database, table)
439-
440-
if qf.ContractAddress != "" {
441-
aggregateQuery += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress)
442-
}
443-
if qf.Signature != "" {
444-
aggregateQuery += fmt.Sprintf(" AND topic_0 = '%s'", qf.Signature)
445-
}
446-
for key, value := range qf.FilterParams {
447-
aggregateQuery += fmt.Sprintf(" AND %s = '%s'", key, value)
448-
}
449-
450-
row := c.conn.QueryRow(context.Background(), aggregateQuery)
451-
aggregateResultsJSON, err := json.Marshal(row)
452-
if err != nil {
453-
return nil, fmt.Errorf("error marshaling aggregate results to JSON: %w", err)
454-
}
455-
456-
var aggregateResultsMap map[string]string
457-
err = json.Unmarshal(aggregateResultsJSON, &aggregateResultsMap)
458-
if err != nil {
459-
return nil, fmt.Errorf("error unmarshaling aggregate results JSON to map: %w", err)
460-
}
461-
462-
return aggregateResultsMap, nil
463-
}
464-
465467
func scanTransaction(rows driver.Rows) (common.Transaction, error) {
466468
var tx common.Transaction
467469
err := rows.Scan(
@@ -521,27 +523,6 @@ func scanLog(rows driver.Rows) (common.Log, error) {
521523
return log, nil
522524
}
523525

524-
func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) {
525-
columns := rows.Columns()
526-
values := make([]interface{}, len(columns))
527-
valuePtrs := make([]interface{}, len(columns))
528-
529-
for i := range columns {
530-
valuePtrs[i] = &values[i]
531-
}
532-
533-
if err := rows.Scan(valuePtrs...); err != nil {
534-
return nil, err
535-
}
536-
537-
result := make(map[string]interface{})
538-
for i, col := range columns {
539-
result[col] = values[i]
540-
}
541-
542-
return result, nil
543-
}
544-
545526
func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
546527
query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
547528
if chainId.Sign() > 0 {

internal/storage/connector.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@ type QueryFilter struct {
1818
Page int
1919
Limit int
2020
Offset int
21-
Aggregates []string
21+
Aggregates []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"]
2222
FromAddress string
2323
ContractAddress string
2424
Signature string
2525
}
2626
type QueryResult[T any] struct {
2727
// TODO: findout how to only allow Log/transaction arrays or split the result
28-
Data []T `json:"data"`
29-
Aggregates map[string]string `json:"aggregates"`
28+
Data []T `json:"data"`
29+
Aggregates []map[string]interface{} `json:"aggregates"`
3030
}
31+
3132
type IStorage struct {
3233
OrchestratorStorage IOrchestratorStorage
3334
MainStorage IMainStorage
@@ -54,7 +55,8 @@ type IMainStorage interface {
5455

5556
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
5657
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
57-
GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error)
58+
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
59+
GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
5860
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5961
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
6062
/**

0 commit comments

Comments
 (0)