Skip to content

Commit 36874b6

Browse files
s1nafjl
andauthored
eth/filters: add global block logs cache (#25459)
This adds a cache for block logs which is shared by all filters. The cache size of is configurable using the `--cache.blocklogs` flag. Co-authored-by: Felix Lange <[email protected]>
1 parent 77308cd commit 36874b6

File tree

21 files changed

+310
-200
lines changed

21 files changed

+310
-200
lines changed

accounts/abi/bind/backends/simulated.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ type SimulatedBackend struct {
6868
pendingState *state.StateDB // Currently pending state that will be the active on request
6969
pendingReceipts types.Receipts // Currently receipts for the pending block
7070

71-
events *filters.EventSystem // Event system for filtering log events live
71+
events *filters.EventSystem // for filtering log events live
72+
filterSystem *filters.FilterSystem // for filtering database logs
7273

7374
config *params.ChainConfig
7475
}
@@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
8687
blockchain: blockchain,
8788
config: genesis.Config,
8889
}
89-
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
90+
91+
filterBackend := &filterBackend{database, blockchain, backend}
92+
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
93+
backend.events = filters.NewEventSystem(backend.filterSystem, false)
94+
9095
backend.rollback(blockchain.CurrentBlock())
9196
return backend
9297
}
@@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
689694
var filter *filters.Filter
690695
if query.BlockHash != nil {
691696
// Block filter requested, construct a single-shot filter
692-
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
697+
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
693698
} else {
694699
// Initialize unset filter boundaries to run from genesis to chain head
695700
from := int64(0)
@@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
701706
to = query.ToBlock.Int64()
702707
}
703708
// Construct the range filter
704-
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
709+
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
705710
}
706711
// Run the filter and return all the logs
707712
logs, err := filter.Logs(ctx)
@@ -827,7 +832,8 @@ type filterBackend struct {
827832
backend *SimulatedBackend
828833
}
829834

830-
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
835+
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
836+
831837
func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }
832838

833839
func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
@@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
853859
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
854860
}
855861

856-
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
857-
number := rawdb.ReadHeaderNumber(fb.db, hash)
858-
if number == nil {
859-
return nil, nil
860-
}
861-
receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config())
862-
if receipts == nil {
863-
return nil, nil
864-
}
865-
logs := make([][]*types.Log, len(receipts))
866-
for i, receipt := range receipts {
867-
logs[i] = receipt.Logs
868-
}
862+
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
863+
logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config())
869864
return logs, nil
870865
}
871866

cmd/geth/config.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
163163
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
164164
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
165165
}
166+
166167
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
168+
167169
// Warn users to migrate if they have a legacy freezer format.
168170
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
169171
firstIdx := uint64(0)
@@ -181,10 +183,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
181183
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
182184
}
183185
}
184-
// Configure GraphQL if requested
186+
187+
// Configure log filter RPC API.
188+
filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth)
189+
190+
// Configure GraphQL if requested.
185191
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
186-
utils.RegisterGraphQLService(stack, backend, cfg.Node)
192+
utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node)
187193
}
194+
188195
// Add the Ethereum Stats daemon if requested.
189196
if cfg.Ethstats.URL != "" {
190197
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ var (
117117
utils.CacheSnapshotFlag,
118118
utils.CacheNoPrefetchFlag,
119119
utils.CachePreimagesFlag,
120+
utils.CacheLogSizeFlag,
120121
utils.FDLimitFlag,
121122
utils.ListenPortFlag,
122123
utils.DiscoveryPortFlag,

cmd/utils/flags.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
4444
"github.com/ethereum/go-ethereum/eth/downloader"
4545
"github.com/ethereum/go-ethereum/eth/ethconfig"
46+
"github.com/ethereum/go-ethereum/eth/filters"
4647
"github.com/ethereum/go-ethereum/eth/gasprice"
4748
"github.com/ethereum/go-ethereum/eth/tracers"
4849
"github.com/ethereum/go-ethereum/ethdb"
@@ -64,6 +65,7 @@ import (
6465
"github.com/ethereum/go-ethereum/p2p/nat"
6566
"github.com/ethereum/go-ethereum/p2p/netutil"
6667
"github.com/ethereum/go-ethereum/params"
68+
"github.com/ethereum/go-ethereum/rpc"
6769
pcsclite "github.com/gballet/go-libpcsclite"
6870
gopsutil "github.com/shirou/gopsutil/mem"
6971
"github.com/urfave/cli/v2"
@@ -491,6 +493,12 @@ var (
491493
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
492494
Category: flags.PerfCategory,
493495
}
496+
CacheLogSizeFlag = &cli.IntFlag{
497+
Name: "cache.blocklogs",
498+
Usage: "Size (in number of blocks) of the log cache for filtering",
499+
Category: flags.PerfCategory,
500+
Value: ethconfig.Defaults.FilterLogCacheSize,
501+
}
494502
FDLimitFlag = &cli.IntFlag{
495503
Name: "fdlimit",
496504
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
@@ -1808,6 +1816,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
18081816
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
18091817
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
18101818
}
1819+
if ctx.IsSet(CacheLogSizeFlag.Name) {
1820+
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
1821+
}
18111822
if !ctx.Bool(SnapshotFlag.Name) {
18121823
// If snap-sync is requested, this flag is also required
18131824
if cfg.SyncMode == downloader.SnapSync {
@@ -2005,21 +2016,34 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend
20052016
return backend.APIBackend, backend
20062017
}
20072018

2008-
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
2009-
// the given node.
2019+
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
20102020
func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) {
20112021
if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil {
20122022
Fatalf("Failed to register the Ethereum Stats service: %v", err)
20132023
}
20142024
}
20152025

2016-
// RegisterGraphQLService is a utility function to construct a new service and register it against a node.
2017-
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) {
2018-
if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil {
2026+
// RegisterGraphQLService adds the GraphQL API to the node.
2027+
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg *node.Config) {
2028+
err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts)
2029+
if err != nil {
20192030
Fatalf("Failed to register the GraphQL service: %v", err)
20202031
}
20212032
}
20222033

2034+
// RegisterFilterAPI adds the eth log filtering RPC API to the node.
2035+
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
2036+
isLightClient := ethcfg.SyncMode == downloader.LightSync
2037+
filterSystem := filters.NewFilterSystem(backend, filters.Config{
2038+
LogCacheSize: ethcfg.FilterLogCacheSize,
2039+
})
2040+
stack.RegisterAPIs([]rpc.API{{
2041+
Namespace: "eth",
2042+
Service: filters.NewFilterAPI(filterSystem, isLightClient),
2043+
}})
2044+
return filterSystem
2045+
}
2046+
20232047
func SetupMetrics(ctx *cli.Context) {
20242048
if metrics.Enabled {
20252049
log.Info("Enabling metrics collection")

eth/api_backend.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package eth
1919
import (
2020
"context"
2121
"errors"
22-
"fmt"
2322
"math/big"
2423
"time"
2524

@@ -202,17 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
202201
return b.eth.blockchain.GetReceiptsByHash(hash), nil
203202
}
204203

205-
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
206-
db := b.eth.ChainDb()
207-
number := rawdb.ReadHeaderNumber(db, hash)
208-
if number == nil {
209-
return nil, fmt.Errorf("failed to get block number for hash %#x", hash)
210-
}
211-
logs := rawdb.ReadLogs(db, hash, *number, b.eth.blockchain.Config())
212-
if logs == nil {
213-
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", *number, hash.TerminalString())
214-
}
215-
return logs, nil
204+
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
205+
return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil
216206
}
217207

218208
func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {

eth/backend.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"strings"
2626
"sync"
2727
"sync/atomic"
28-
"time"
2928

3029
"github.com/ethereum/go-ethereum/accounts"
3130
"github.com/ethereum/go-ethereum/common"
@@ -41,7 +40,6 @@ import (
4140
"github.com/ethereum/go-ethereum/core/vm"
4241
"github.com/ethereum/go-ethereum/eth/downloader"
4342
"github.com/ethereum/go-ethereum/eth/ethconfig"
44-
"github.com/ethereum/go-ethereum/eth/filters"
4543
"github.com/ethereum/go-ethereum/eth/gasprice"
4644
"github.com/ethereum/go-ethereum/eth/protocols/eth"
4745
"github.com/ethereum/go-ethereum/eth/protocols/snap"
@@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API {
315313
}, {
316314
Namespace: "eth",
317315
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
318-
}, {
319-
Namespace: "eth",
320-
Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute),
321316
}, {
322317
Namespace: "admin",
323318
Service: NewAdminAPI(s),

eth/ethconfig/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ var Defaults = Config{
8383
TrieDirtyCache: 256,
8484
TrieTimeout: 60 * time.Minute,
8585
SnapshotCache: 102,
86+
FilterLogCacheSize: 32,
8687
Miner: miner.Config{
8788
GasCeil: 30000000,
8889
GasPrice: big.NewInt(params.GWei),
@@ -171,6 +172,9 @@ type Config struct {
171172
SnapshotCache int
172173
Preimages bool
173174

175+
// This is the number of blocks for which logs will be cached in the filter system.
176+
FilterLogCacheSize int
177+
174178
// Mining options
175179
Miner miner.Config
176180

eth/ethconfig/gen_config.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

eth/filters/api.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,22 @@ type filter struct {
4646
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
4747
// information related to the Ethereum protocol such als blocks, transactions and logs.
4848
type FilterAPI struct {
49-
backend Backend
49+
sys *FilterSystem
5050
events *EventSystem
5151
filtersMu sync.Mutex
5252
filters map[rpc.ID]*filter
5353
timeout time.Duration
5454
}
5555

5656
// NewFilterAPI returns a new FilterAPI instance.
57-
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
57+
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
5858
api := &FilterAPI{
59-
backend: backend,
60-
events: NewEventSystem(backend, lightMode),
59+
sys: system,
60+
events: NewEventSystem(system, lightMode),
6161
filters: make(map[rpc.ID]*filter),
62-
timeout: timeout,
62+
timeout: system.cfg.Timeout,
6363
}
64-
go api.timeoutLoop(timeout)
64+
go api.timeoutLoop(system.cfg.Timeout)
6565

6666
return api
6767
}
@@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
320320
var filter *Filter
321321
if crit.BlockHash != nil {
322322
// Block filter requested, construct a single-shot filter
323-
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
323+
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
324324
} else {
325325
// Convert the RPC block numbers into internal representations
326326
begin := rpc.LatestBlockNumber.Int64()
@@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
332332
end = crit.ToBlock.Int64()
333333
}
334334
// Construct the range filter
335-
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
335+
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
336336
}
337337
// Run the filter and return all the logs
338338
logs, err := filter.Logs(ctx)
@@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
371371
var filter *Filter
372372
if f.crit.BlockHash != nil {
373373
// Block filter requested, construct a single-shot filter
374-
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
374+
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
375375
} else {
376376
// Convert the RPC block numbers into internal representations
377377
begin := rpc.LatestBlockNumber.Int64()
@@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
383383
end = f.crit.ToBlock.Int64()
384384
}
385385
// Construct the range filter
386-
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
386+
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
387387
}
388388
// Run the filter and return all the logs
389389
logs, err := filter.Logs(ctx)

0 commit comments

Comments
 (0)