Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,43 @@ rpc:
batchDelay: 100
```

#### RPC Block Receipts Enabled
If this is `true`, will use `eth_getBlockReceipts` instead of `eth_getLogs` if the RPC supports it. Allows getting receipt data for transactions, but is not supported by every RPC. Default is `false`.

cmd: `--rpc-block-receipts-enabled`
env: `RPC_BLOCKRECEIPTS_ENABLED`
yaml:
```yaml
rpc:
blockReceipts:
enabled: true
```

#### RPC Block Receipts Blocks Per Request
How many blocks at a time to fetch block receipts for from the RPC. Default is 250.
Has no effect if it's larger than RPC blocks per request.

cmd: `--rpc-block-receipts-blocksPerRequest`
env: `RPC_BLOCKRECEIPTS_BLOCKSPERREQUEST`
yaml:
```yaml
rpc:
blockReceipts:
blocksPerRequest: 100
```

#### RPC Block Receipts Batch Delay
Milliseconds to wait between batches of block receipts when fetching from the RPC. Default is 0.

cmd: `--rpc-block-receipts-batchDelay`
env: `RPC_BLOCKRECEIPTS_BATCHDELAY`
yaml:
```yaml
rpc:
blockReceipts:
batchDelay: 100
```

#### RPC Traces Enabled
Whether to enable fetching traces from the RPC. Default is `true`, but it will try to detect if the RPC supports traces automatically.

Expand Down
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func init() {
rootCmd.PersistentFlags().Int("rpc-blocks-batchDelay", 0, "Milliseconds to wait between batches of blocks when fetching from the RPC")
rootCmd.PersistentFlags().Int("rpc-logs-blocksPerRequest", 0, "How many blocks to fetch logs per request")
rootCmd.PersistentFlags().Int("rpc-logs-batchDelay", 0, "Milliseconds to wait between batches of logs when fetching from the RPC")
rootCmd.PersistentFlags().Bool("rpc-blockReceipts-enabled", false, "Whether to enable fetching block receipts from the RPC")
rootCmd.PersistentFlags().Int("rpc-blockReceipts-blocksPerRequest", 0, "How many blocks to fetch receipts for per request")
rootCmd.PersistentFlags().Int("rpc-blockReceipts-batchDelay", 0, "Milliseconds to wait between batches of receipts when fetching from the RPC")
rootCmd.PersistentFlags().Bool("rpc-traces-enabled", true, "Whether to enable fetching traces from the RPC")
rootCmd.PersistentFlags().Int("rpc-traces-blocksPerRequest", 0, "How many blocks to fetch traces per request")
rootCmd.PersistentFlags().Int("rpc-traces-batchDelay", 0, "Milliseconds to wait between batches of traces when fetching from the RPC")
Expand Down Expand Up @@ -89,6 +92,9 @@ func init() {
viper.BindPFlag("rpc.blocks.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blocks-batchDelay"))
viper.BindPFlag("rpc.logs.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-logs-blocksPerRequest"))
viper.BindPFlag("rpc.logs.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-logs-batchDelay"))
viper.BindPFlag("rpc.blockReceipts.enabled", rootCmd.PersistentFlags().Lookup("rpc-blockReceipts-enabled"))
viper.BindPFlag("rpc.blockReceipts.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blockReceipts-blocksPerRequest"))
viper.BindPFlag("rpc.blockReceipts.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blockReceipts-batchDelay"))
viper.BindPFlag("rpc.traces.enabled", rootCmd.PersistentFlags().Lookup("rpc-traces-enabled"))
viper.BindPFlag("rpc.traces.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-traces-blocksPerRequest"))
viper.BindPFlag("rpc.traces.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-traces-batchDelay"))
Expand Down
9 changes: 9 additions & 0 deletions configs/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ rpc:
logs:
blocksPerRequest: 400
batchDelay: 100
blockReceipts:
enabled: true
blocksPerRequest: 500
batchDelay: 100
traces:
enabled: true
blocksPerRequest: 200
Expand All @@ -29,6 +33,11 @@ failureRecoverer:
interval: 10000
blocksPerRun: 100

reorgHandler:
enabled: true
interval: 1000
blocksPerScan: 50

storage:
main:
clickhouse:
Expand Down
18 changes: 9 additions & 9 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,22 @@ type RedisConfig struct {
DB int `mapstructure:"db"`
}

type RPCBatchSizeConfig struct {
type RPCBatchRequestConfig struct {
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
BatchDelay int `mapstructure:"batchDelay"`
}

type RPCTracesConfig struct {
Enabled bool `mapstructure:"enabled"`
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
BatchDelay int `mapstructure:"batchDelay"`
type ToggleableRPCBatchRequestConfig struct {
Enabled bool `mapstructure:"enabled"`
RPCBatchRequestConfig
}

type RPCConfig struct {
URL string `mapstructure:"url"`
Blocks RPCBatchSizeConfig `mapstructure:"blocks"`
Logs RPCBatchSizeConfig `mapstructure:"logs"`
Traces RPCTracesConfig `mapstructure:"traces"`
URL string `mapstructure:"url"`
Blocks RPCBatchRequestConfig `mapstructure:"blocks"`
Logs RPCBatchRequestConfig `mapstructure:"logs"`
BlockReceipts ToggleableRPCBatchRequestConfig `mapstructure:"blockReceipts"`
Traces ToggleableRPCBatchRequestConfig `mapstructure:"traces"`
}

type APIConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions internal/common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ type Log struct {
}

type RawLogs = []map[string]interface{}
type RawReceipts = []RawReceipt
type RawReceipt = map[string]interface{}
10 changes: 9 additions & 1 deletion internal/common/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,13 @@ type Transaction struct {
R *big.Int `json:"r"`
S *big.Int `json:"s"`
V *big.Int `json:"v"`
AccessListJson string `json:"access_list_json"`
AccessListJson *string `json:"access_list_json"`
ContractAddress *string `json:"contract_address"`
GasUsed *uint64 `json:"gas_used"`
CumulativeGasUsed *uint64 `json:"cumulative_gas_used"`
EffectiveGasPrice *big.Int `json:"effective_gas_price"`
BlobGasUsed *uint64 `json:"blob_gas_used"`
BlobGasPrice *big.Int `json:"blob_gas_price"`
LogsBloom *string `json:"logs_bloom"`
Status *uint64 `json:"status"`
}
4 changes: 4 additions & 0 deletions internal/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ func GetLogsParams(blockNum *big.Int) []interface{} {
func TraceBlockParams(blockNum *big.Int) []interface{} {
return []interface{}{hexutil.EncodeBig(blockNum)}
}

func GetBlockReceiptsParams(blockNum *big.Int) []interface{} {
return []interface{}{hexutil.EncodeBig(blockNum)}
}
108 changes: 84 additions & 24 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ type GetBlocksResult struct {
}

type BlocksPerRequestConfig struct {
Blocks int
Logs int
Traces int
Blocks int
Logs int
Traces int
Receipts int
}

type IRPCClient interface {
Expand All @@ -44,13 +45,14 @@ type IRPCClient interface {
}

type Client struct {
RPCClient *gethRpc.Client
EthClient *ethclient.Client
supportsTraceBlock bool
isWebsocket bool
url string
chainID *big.Int
blocksPerRequest BlocksPerRequestConfig
RPCClient *gethRpc.Client
EthClient *ethclient.Client
supportsTraceBlock bool
supportsBlockReceipts bool
isWebsocket bool
url string
chainID *big.Int
blocksPerRequest BlocksPerRequestConfig
}

func Initialize() (IRPCClient, error) {
Expand Down Expand Up @@ -111,28 +113,75 @@ func (rpc *Client) Close() {
}

func (rpc *Client) checkSupportedMethods() error {
if err := rpc.checkGetBlockByNumberSupport(); err != nil {
return err
}
if err := rpc.checkGetBlockReceiptsSupport(); err != nil {
return err
}
if err := rpc.checkGetLogsSupport(); err != nil {
return err
}
if err := rpc.checkTraceBlockSupport(); err != nil {
return err
}
return nil
}

func (rpc *Client) checkGetBlockByNumberSupport() error {
var blockByNumberResult interface{}
err := rpc.RPCClient.Call(&blockByNumberResult, "eth_getBlockByNumber", "latest", true)
if err != nil {
return fmt.Errorf("eth_getBlockByNumber method not supported: %v", err)
}
log.Debug().Msg("eth_getBlockByNumber method supported")
return nil
}

func (rpc *Client) checkGetBlockReceiptsSupport() error {
if config.Cfg.RPC.BlockReceipts.Enabled {
var getBlockReceiptsResult interface{}
receiptsErr := rpc.RPCClient.Call(&getBlockReceiptsResult, "eth_getBlockReceipts", "latest")
if receiptsErr != nil {
log.Warn().Err(receiptsErr).Msg("eth_getBlockReceipts method not supported")
return fmt.Errorf("eth_getBlockReceipts method not supported: %v", receiptsErr)
} else {
rpc.supportsBlockReceipts = true
log.Debug().Msg("eth_getBlockReceipts method supported")
}
} else {
rpc.supportsBlockReceipts = false
log.Debug().Msg("eth_getBlockReceipts method disabled")
}
return nil
}

func (rpc *Client) checkGetLogsSupport() error {
if rpc.supportsBlockReceipts {
return nil
}
var getLogsResult interface{}
logsErr := rpc.RPCClient.Call(&getLogsResult, "eth_getLogs", map[string]string{"fromBlock": "0x0", "toBlock": "0x0"})
if logsErr != nil {
return fmt.Errorf("eth_getLogs method not supported: %v", logsErr)
}
log.Debug().Msg("eth_getLogs method supported")
return nil
}

var traceBlockResult interface{}
func (rpc *Client) checkTraceBlockSupport() error {
if config.Cfg.RPC.Traces.Enabled {
var traceBlockResult interface{}
if traceBlockErr := rpc.RPCClient.Call(&traceBlockResult, "trace_block", "latest"); traceBlockErr != nil {
log.Warn().Err(traceBlockErr).Msg("Optional method trace_block not supported")
} else {
rpc.supportsTraceBlock = true
log.Debug().Msg("trace_block method supported")
}
} else {
rpc.supportsTraceBlock = false
log.Debug().Msg("trace_block method disabled")
}
rpc.supportsTraceBlock = traceBlockResult != nil
log.Debug().Msgf("trace_block method supported: %v", rpc.supportsTraceBlock)
return nil
}

Expand All @@ -147,33 +196,44 @@ func (rpc *Client) setChainID() error {

func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
var wg sync.WaitGroup
var blocks []RPCFetchBatchResult[common.RawBlock]
var logs []RPCFetchBatchResult[common.RawLogs]
var traces []RPCFetchBatchResult[common.RawTraces]

var blocks *[]RPCFetchBatchResult[common.RawBlock]
var logs *[]RPCFetchBatchResult[common.RawLogs]
var traces *[]RPCFetchBatchResult[common.RawTraces]
var receipts *[]RPCFetchBatchResult[common.RawReceipts]
wg.Add(2)

go func() {
defer wg.Done()
blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
blocks = &result
}()

go func() {
defer wg.Done()
logs = RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
}()
if rpc.supportsBlockReceipts {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
receipts = &result
}()
} else {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
logs = &result
}()
}

if rpc.supportsTraceBlock {
wg.Add(1)
go func() {
defer wg.Done()
traces = RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
traces = &result
}()
}

wg.Wait()

return SerializeFullBlocks(rpc.chainID, blocks, logs, traces)
return SerializeFullBlocks(rpc.chainID, blocks, logs, traces, receipts)
}

func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult {
Expand Down
Loading
Loading