Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/common/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Transaction struct {
Gas uint64 `json:"gas"`
GasPrice *big.Int `json:"gas_price"`
Data string `json:"data"`
FunctionSelector string `json:"function_selector"`
MaxFeePerGas *big.Int `json:"max_fee_per_gas"`
MaxPriorityFeePerGas *big.Int `json:"max_priority_fee_per_gas"`
TransactionType uint8 `json:"transaction_type"`
Expand Down
11 changes: 11 additions & 0 deletions internal/rpc/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func serializeTransaction(chainId *big.Int, rawTx interface{}, blockTimestamp ui
Gas: hexToUint64(tx["gas"]),
GasPrice: hexToBigInt(tx["gasPrice"]),
Data: interfaceToString(tx["input"]),
FunctionSelector: extractFunctionSelector(interfaceToString(tx["input"])),
MaxFeePerGas: hexToBigInt(tx["maxFeePerGas"]),
MaxPriorityFeePerGas: hexToBigInt(tx["maxPriorityFeePerGas"]),
TransactionType: uint8(hexToUint64(tx["type"])),
Expand All @@ -169,6 +170,16 @@ func serializeTransaction(chainId *big.Int, rawTx interface{}, blockTimestamp ui
}
}

/**
* Extracts the function selector (first 4 bytes) from a transaction input.
*/
func extractFunctionSelector(s string) string {
if len(s) < 10 {
return ""
}
return s[0:10]
}

func serializeLogs(chainId *big.Int, rawLogs []map[string]interface{}, block common.Block) []common.Log {
serializedLogs := make([]common.Log, len(rawLogs))
for i, rawLog := range rawLogs {
Expand Down
16 changes: 13 additions & 3 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"crypto/tls"
"database/sql"
"encoding/json"
"fmt"
"math/big"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro
query := `
INSERT INTO ` + c.cfg.Database + `.transactions (
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
from_address, to_address, value, gas, gas_price, data, max_fee_per_gas, max_priority_fee_per_gas,
from_address, to_address, value, gas, gas_price, data, function_selector, max_fee_per_gas, max_priority_fee_per_gas,
transaction_type, r, s, v, access_list
)
`
Expand All @@ -133,6 +134,7 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro
tx.Gas,
tx.GasPrice,
tx.Data,
tx.FunctionSelector,
tx.MaxFeePerGas,
tx.MaxPriorityFeePerGas,
tx.TransactionType,
Expand Down Expand Up @@ -490,28 +492,36 @@ func scanLog(rows driver.Rows) (common.Log, error) {
}

func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT max(number) FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
query += " ORDER BY number DESC LIMIT 1"
err = c.conn.QueryRow(context.Background(), query).Scan(&maxBlockNumber)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String())
return maxBlockNumber, nil
}

func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT max(block_number) FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
if rangeEnd.Sign() > 0 {
query += fmt.Sprintf(" AND block_number <= %s", rangeEnd.String())
}
query += " ORDER BY block_number DESC LIMIT 1"
err = c.conn.QueryRow(context.Background(), query).Scan(&maxBlockNumber)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
return maxBlockNumber, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/tools/clickhouse_create_blocks_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CREATE TABLE blocks (
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 1,
INDEX idx_number number TYPE minmax GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, hash)
ORDER BY (chain_id, number)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
7 changes: 5 additions & 2 deletions internal/tools/clickhouse_create_logs_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ CREATE TABLE logs (
`insert_timestamp` DateTime DEFAULT now(),
`is_deleted` UInt8 DEFAULT 0,
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_address address TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic0 topic_0 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1,
INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, transaction_hash, log_index, block_hash)
ORDER BY (chain_id, block_number, transaction_hash, log_index)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
4 changes: 2 additions & 2 deletions internal/tools/clickhouse_create_traces_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ CREATE TABLE traces (
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_type type TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, transaction_hash, trace_address, block_hash)
ORDER BY (chain_id, block_number, transaction_hash, trace_address)
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
6 changes: 4 additions & 2 deletions internal/tools/clickhouse_create_transactions_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CREATE TABLE transactions (
`gas` UInt64,
`gas_price` UInt256,
`data` String,
`function_selector` FixedString(10),
`max_fee_per_gas` UInt128,
`max_priority_fee_per_gas` UInt128,
`transaction_type` UInt8,
Expand All @@ -22,9 +23,10 @@ CREATE TABLE transactions (
`is_deleted` UInt8 DEFAULT 0,
`insert_timestamp` DateTime DEFAULT now(),
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
INDEX idx_block_number block_number TYPE minmax GRANULARITY 1,
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1,
INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1,
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
ORDER BY (chain_id, hash) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
ORDER BY (chain_id, block_number, hash) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1;
Loading