Skip to content

Commit b28241b

Browse files
authored
cmd/workload: filter fuzzer test (#31613)
This PR adds a `filterfuzz` subcommand to the workload tester that generates requests similarly to `filtergen` (though with a much smaller block length limit) and also verifies the results by retrieving all block receipts in the range and locally filtering out relevant results. Unlike `filtergen` that operates on the finalized chain range only, `filterfuzz` does check the head region, actually it seeds a new query at every new chain head.
1 parent 7b693ea commit b28241b

File tree

4 files changed

+378
-34
lines changed

4 files changed

+378
-34
lines changed

cmd/workload/filtertest.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,14 @@ func (s *filterTestSuite) loadQueries() error {
182182

183183
// filterQuery is a single query for testing.
184184
type filterQuery struct {
185-
FromBlock int64 `json:"fromBlock"`
186-
ToBlock int64 `json:"toBlock"`
187-
Address []common.Address `json:"address"`
188-
Topics [][]common.Hash `json:"topics"`
189-
ResultHash *common.Hash `json:"resultHash,omitempty"`
190-
results []types.Log
191-
Err error `json:"error,omitempty"`
185+
FromBlock int64 `json:"fromBlock"`
186+
ToBlock int64 `json:"toBlock"`
187+
lastBlockHash common.Hash
188+
Address []common.Address `json:"address"`
189+
Topics [][]common.Hash `json:"topics"`
190+
ResultHash *common.Hash `json:"resultHash,omitempty"`
191+
results []types.Log
192+
Err error `json:"error,omitempty"`
192193
}
193194

194195
func (fq *filterQuery) isWildcard() bool {

cmd/workload/filtertestfuzz.go

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
// Copyright 2025 The go-ethereum Authors
2+
// This file is part of go-ethereum.
3+
//
4+
// go-ethereum is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// go-ethereum is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU General Public License
15+
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"math/big"
23+
"reflect"
24+
"slices"
25+
"time"
26+
27+
"github.com/ethereum/go-ethereum/common"
28+
"github.com/ethereum/go-ethereum/common/lru"
29+
"github.com/ethereum/go-ethereum/core/types"
30+
"github.com/ethereum/go-ethereum/rpc"
31+
"github.com/urfave/cli/v2"
32+
)
33+
34+
const maxFilterRangeForTestFuzz = 300
35+
36+
var (
37+
filterFuzzCommand = &cli.Command{
38+
Name: "filterfuzz",
39+
Usage: "Generates queries and compares results against matches derived from receipts",
40+
ArgsUsage: "<RPC endpoint URL>",
41+
Action: filterFuzzCmd,
42+
Flags: []cli.Flag{},
43+
}
44+
)
45+
46+
// filterFuzzCmd is the main function of the filter fuzzer.
47+
func filterFuzzCmd(ctx *cli.Context) error {
48+
f := newFilterTestGen(ctx, maxFilterRangeForTestFuzz)
49+
var lastHead *types.Header
50+
headerCache := lru.NewCache[common.Hash, *types.Header](200)
51+
52+
commonAncestor := func(oldPtr, newPtr *types.Header) *types.Header {
53+
if oldPtr == nil || newPtr == nil {
54+
return nil
55+
}
56+
if newPtr.Number.Uint64() > oldPtr.Number.Uint64()+100 || oldPtr.Number.Uint64() > newPtr.Number.Uint64()+100 {
57+
return nil
58+
}
59+
for oldPtr.Hash() != newPtr.Hash() {
60+
if newPtr.Number.Uint64() >= oldPtr.Number.Uint64() {
61+
if parent, _ := headerCache.Get(newPtr.ParentHash); parent != nil {
62+
newPtr = parent
63+
} else {
64+
newPtr, _ = getHeaderByHash(f.client, newPtr.ParentHash)
65+
if newPtr == nil {
66+
return nil
67+
}
68+
headerCache.Add(newPtr.Hash(), newPtr)
69+
}
70+
}
71+
if oldPtr.Number.Uint64() > newPtr.Number.Uint64() {
72+
oldPtr, _ = headerCache.Get(oldPtr.ParentHash)
73+
if oldPtr == nil {
74+
return nil
75+
}
76+
}
77+
}
78+
return newPtr
79+
}
80+
81+
fetchHead := func() (*types.Header, bool) {
82+
currentHead, err := getLatestHeader(f.client)
83+
if err != nil {
84+
fmt.Println("Could not fetch head block", err)
85+
return nil, false
86+
}
87+
headerCache.Add(currentHead.Hash(), currentHead)
88+
if lastHead != nil && currentHead.Hash() == lastHead.Hash() {
89+
return currentHead, false
90+
}
91+
f.blockLimit = currentHead.Number.Int64()
92+
ca := commonAncestor(lastHead, currentHead)
93+
fmt.Print("*** New head ", f.blockLimit)
94+
if ca == nil {
95+
fmt.Println(" <no common ancestor>")
96+
} else {
97+
if reorged := lastHead.Number.Uint64() - ca.Number.Uint64(); reorged > 0 {
98+
fmt.Print(" reorged ", reorged)
99+
}
100+
if missed := currentHead.Number.Uint64() - ca.Number.Uint64() - 1; missed > 0 {
101+
fmt.Print(" missed ", missed)
102+
}
103+
fmt.Println()
104+
}
105+
lastHead = currentHead
106+
return currentHead, true
107+
}
108+
109+
tryExtendQuery := func(query *filterQuery) *filterQuery {
110+
for {
111+
extQuery := f.extendRange(query)
112+
if extQuery == nil {
113+
return query
114+
}
115+
extQuery.checkLastBlockHash(f.client)
116+
extQuery.run(f.client, nil)
117+
if extQuery.Err == nil && len(extQuery.results) == 0 {
118+
// query is useless now due to major reorg; abandon and continue
119+
fmt.Println("Zero length results")
120+
return nil
121+
}
122+
if extQuery.Err != nil {
123+
extQuery.printError()
124+
return nil
125+
}
126+
if len(extQuery.results) > maxFilterResultSize {
127+
return query
128+
}
129+
query = extQuery
130+
}
131+
}
132+
133+
var (
134+
mmQuery *filterQuery
135+
mmRetry, mmNextRetry int
136+
)
137+
138+
mainLoop:
139+
for {
140+
select {
141+
case <-ctx.Done():
142+
return nil
143+
default:
144+
}
145+
var query *filterQuery
146+
if mmQuery != nil {
147+
if mmRetry == 0 {
148+
query = mmQuery
149+
mmRetry = mmNextRetry
150+
mmNextRetry *= 2
151+
query.checkLastBlockHash(f.client)
152+
query.run(f.client, nil)
153+
if query.Err != nil {
154+
query.printError()
155+
continue
156+
}
157+
fmt.Println("Retrying query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results))
158+
} else {
159+
mmRetry--
160+
}
161+
}
162+
if query == nil {
163+
currentHead, isNewHead := fetchHead()
164+
if currentHead == nil {
165+
select {
166+
case <-ctx.Done():
167+
return nil
168+
case <-time.After(time.Second):
169+
}
170+
continue mainLoop
171+
}
172+
if isNewHead {
173+
query = f.newHeadSeedQuery(currentHead.Number.Int64())
174+
} else {
175+
query = f.newQuery()
176+
}
177+
query.checkLastBlockHash(f.client)
178+
query.run(f.client, nil)
179+
if query.Err != nil {
180+
query.printError()
181+
continue
182+
}
183+
fmt.Println("New query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results))
184+
if len(query.results) == 0 || len(query.results) > maxFilterResultSize {
185+
continue mainLoop
186+
}
187+
if query = tryExtendQuery(query); query == nil {
188+
continue mainLoop
189+
}
190+
}
191+
if !query.checkLastBlockHash(f.client) {
192+
fmt.Println("Reorg during search")
193+
continue mainLoop
194+
}
195+
// now we have a new query; check results
196+
results, err := query.getResultsFromReceipts(f.client)
197+
if err != nil {
198+
fmt.Println("Could not fetch results from receipts", err)
199+
continue mainLoop
200+
}
201+
if !query.checkLastBlockHash(f.client) {
202+
fmt.Println("Reorg during search")
203+
continue mainLoop
204+
}
205+
if !reflect.DeepEqual(query.results, results) {
206+
fmt.Println("Results mismatch from:", query.FromBlock, "to:", query.ToBlock, "addresses:", query.Address, "topics:", query.Topics)
207+
resShared, resGetLogs, resReceipts := compareResults(query.results, results)
208+
fmt.Println(" shared:", len(resShared))
209+
fmt.Println(" only from getLogs:", len(resGetLogs), resGetLogs)
210+
fmt.Println(" only from receipts:", len(resReceipts), resReceipts)
211+
if mmQuery != query {
212+
mmQuery = query
213+
mmRetry = 0
214+
mmNextRetry = 1
215+
}
216+
continue mainLoop
217+
}
218+
fmt.Println("Successful query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results))
219+
f.storeQuery(query)
220+
}
221+
}
222+
223+
func compareResults(a, b []types.Log) (shared, onlya, onlyb []types.Log) {
224+
for len(a) > 0 && len(b) > 0 {
225+
if reflect.DeepEqual(a[0], b[0]) {
226+
shared = append(shared, a[0])
227+
a = a[1:]
228+
b = b[1:]
229+
} else {
230+
for i := 1; ; i++ {
231+
if i >= len(a) { // b[0] not found in a
232+
onlyb = append(onlyb, b[0])
233+
b = b[1:]
234+
break
235+
}
236+
if i >= len(b) { // a[0] not found in b
237+
onlya = append(onlya, a[0])
238+
a = a[1:]
239+
break
240+
}
241+
if reflect.DeepEqual(b[0], a[i]) { // a[:i] not found in b
242+
onlya = append(onlya, a[:i]...)
243+
a = a[i:]
244+
break
245+
}
246+
if reflect.DeepEqual(a[0], b[i]) { // b[:i] not found in a
247+
onlyb = append(onlyb, b[:i]...)
248+
b = b[i:]
249+
break
250+
}
251+
}
252+
}
253+
}
254+
onlya = append(onlya, a...)
255+
onlyb = append(onlyb, b...)
256+
return
257+
}
258+
259+
func getLatestHeader(client *client) (*types.Header, error) {
260+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
261+
defer cancel()
262+
263+
return client.Eth.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber)))
264+
}
265+
266+
func getHeaderByHash(client *client, hash common.Hash) (*types.Header, error) {
267+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
268+
defer cancel()
269+
270+
return client.Eth.HeaderByHash(ctx, hash)
271+
}
272+
273+
// newHeadSeedQuery creates a query that gets all logs from the latest head.
274+
func (s *filterTestGen) newHeadSeedQuery(head int64) *filterQuery {
275+
return &filterQuery{
276+
FromBlock: head,
277+
ToBlock: head,
278+
}
279+
}
280+
281+
func (fq *filterQuery) checkLastBlockHash(client *client) bool {
282+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
283+
defer cancel()
284+
285+
header, err := client.Eth.HeaderByNumber(ctx, big.NewInt(fq.ToBlock))
286+
if err != nil {
287+
fmt.Println("Cound not fetch last block hash of query number:", fq.ToBlock, "error:", err)
288+
fq.lastBlockHash = common.Hash{}
289+
return false
290+
}
291+
hash := header.Hash()
292+
if fq.lastBlockHash == hash {
293+
return true
294+
}
295+
fq.lastBlockHash = hash
296+
return false
297+
}
298+
299+
func (fq *filterQuery) filterLog(log *types.Log) bool {
300+
if len(fq.Address) > 0 && !slices.Contains(fq.Address, log.Address) {
301+
return false
302+
}
303+
// If the to filtered topics is greater than the amount of topics in logs, skip.
304+
if len(fq.Topics) > len(log.Topics) {
305+
return false
306+
}
307+
for i, sub := range fq.Topics {
308+
if len(sub) == 0 {
309+
continue // empty rule set == wildcard
310+
}
311+
if !slices.Contains(sub, log.Topics[i]) {
312+
return false
313+
}
314+
}
315+
return true
316+
}
317+
318+
func (fq *filterQuery) getResultsFromReceipts(client *client) ([]types.Log, error) {
319+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
320+
defer cancel()
321+
322+
var results []types.Log
323+
for blockNumber := fq.FromBlock; blockNumber <= fq.ToBlock; blockNumber++ {
324+
receipts, err := client.Eth.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockNumber)))
325+
if err != nil {
326+
return nil, err
327+
}
328+
for _, receipt := range receipts {
329+
for _, log := range receipt.Logs {
330+
if fq.filterLog(log) {
331+
results = append(results, *log)
332+
}
333+
}
334+
}
335+
}
336+
return results, nil
337+
}

0 commit comments

Comments
 (0)