Skip to content

Commit 67f74c0

Browse files
authored
test: postage snapshot (#5124)
1 parent 0ca57d8 commit 67f74c0

File tree

4 files changed

+180
-6
lines changed

4 files changed

+180
-6
lines changed

pkg/node/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ func NewBee(
804804
)
805805

806806
if !o.SkipPostageSnapshot && !batchStoreExists && (networkID == mainnetNetworkID) {
807-
chainBackend := NewSnapshotLogFilterer(logger)
807+
chainBackend := NewSnapshotLogFilterer(logger, archiveSnapshotGetter{})
808808

809809
snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)
810810

pkg/node/snapshot.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,36 @@ import (
2525

2626
var _ listener.BlockHeightContractFilterer = (*SnapshotLogFilterer)(nil)
2727

28+
type SnapshotGetter interface {
29+
GetBatchSnapshot() []byte
30+
}
31+
32+
type archiveSnapshotGetter struct{}
33+
34+
func (a archiveSnapshotGetter) GetBatchSnapshot() []byte {
35+
return archive.GetBatchSnapshot()
36+
}
37+
2838
type SnapshotLogFilterer struct {
2939
logger log.Logger
3040
loadedLogs []types.Log
3141
maxBlockHeight uint64
3242
initOnce sync.Once
43+
getter SnapshotGetter
3344
}
3445

35-
func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer {
46+
func NewSnapshotLogFilterer(logger log.Logger, getter SnapshotGetter) *SnapshotLogFilterer {
3647
return &SnapshotLogFilterer{
3748
logger: logger,
49+
getter: getter,
3850
}
3951
}
4052

4153
// loadSnapshot is responsible for loading and processing the snapshot data.
4254
// It is intended to be called exactly once by initOnce.Do.
4355
func (f *SnapshotLogFilterer) loadSnapshot() error {
4456
f.logger.Info("loading batch snapshot")
45-
data := archive.GetBatchSnapshot()
57+
data := f.getter.GetBatchSnapshot()
4658
dataReader := bytes.NewReader(data)
4759
gzipReader, err := gzip.NewReader(dataReader)
4860
if err != nil {
@@ -71,8 +83,13 @@ func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error {
7183
if err == io.EOF {
7284
break
7385
}
74-
f.logger.Warning("failed to decode log event, skipping", "error", err)
75-
continue
86+
return fmt.Errorf("%w: failed to decode log event at position %d: %w", listener.ErrParseSnapshot, len(parsedLogs), err)
87+
}
88+
89+
// Validate sorting order (required for binary search in FilterLogs)
90+
if logEntry.BlockNumber < currentMaxBlockHeight {
91+
return fmt.Errorf("%w: snapshot data is not sorted by block number at index %d (block %d < %d)",
92+
listener.ErrParseSnapshot, len(parsedLogs), logEntry.BlockNumber, currentMaxBlockHeight)
7693
}
7794

7895
if logEntry.BlockNumber > currentMaxBlockHeight {

pkg/node/snapshot_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2025 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package node_test
6+
7+
import (
8+
"bytes"
9+
"compress/gzip"
10+
"context"
11+
"encoding/json"
12+
"math/big"
13+
"testing"
14+
15+
"github.com/ethereum/go-ethereum"
16+
"github.com/ethereum/go-ethereum/common"
17+
"github.com/ethereum/go-ethereum/core/types"
18+
"github.com/ethersphere/bee/v2/pkg/log"
19+
"github.com/ethersphere/bee/v2/pkg/node"
20+
"github.com/ethersphere/bee/v2/pkg/postage/listener"
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
type mockSnapshotGetter struct {
26+
data []byte
27+
}
28+
29+
func newMockSnapshotGetter(data []byte) mockSnapshotGetter {
30+
return mockSnapshotGetter{data}
31+
}
32+
func (m mockSnapshotGetter) GetBatchSnapshot() []byte {
33+
return m.data
34+
}
35+
36+
func makeSnapshotData(logs []types.Log) []byte {
37+
var buf bytes.Buffer
38+
gz := gzip.NewWriter(&buf)
39+
enc := json.NewEncoder(gz)
40+
for _, l := range logs {
41+
_ = enc.Encode(l)
42+
}
43+
gz.Close()
44+
return buf.Bytes()
45+
}
46+
47+
func TestNewSnapshotLogFilterer(t *testing.T) {
48+
t.Parallel()
49+
t.Run("invalid gzip", func(t *testing.T) {
50+
t.Parallel()
51+
getter := newMockSnapshotGetter([]byte("not-gzip"))
52+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
53+
_, err := filterer.BlockNumber(context.Background())
54+
assert.Error(t, err)
55+
})
56+
57+
t.Run("invalid log entry", func(t *testing.T) {
58+
t.Parallel()
59+
var buf bytes.Buffer
60+
gz := gzip.NewWriter(&buf)
61+
_, err := gz.Write([]byte("not-a-log-entry"))
62+
require.NoError(t, err)
63+
gz.Close()
64+
getter := newMockSnapshotGetter(buf.Bytes())
65+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
66+
_, err = filterer.BlockNumber(context.Background())
67+
assert.ErrorIs(t, err, listener.ErrParseSnapshot)
68+
})
69+
70+
t.Run("non-sorted", func(t *testing.T) {
71+
t.Parallel()
72+
logs := []types.Log{
73+
{BlockNumber: 1, Topics: []common.Hash{}},
74+
{BlockNumber: 3, Topics: []common.Hash{}},
75+
{BlockNumber: 2, Topics: []common.Hash{}},
76+
}
77+
getter := newMockSnapshotGetter(makeSnapshotData(logs))
78+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
79+
80+
_, err := filterer.BlockNumber(context.Background())
81+
assert.ErrorIs(t, err, listener.ErrParseSnapshot)
82+
})
83+
84+
t.Run("get block number", func(t *testing.T) {
85+
t.Parallel()
86+
logs := []types.Log{
87+
{BlockNumber: 1, Topics: []common.Hash{}},
88+
{BlockNumber: 2, Topics: []common.Hash{}},
89+
{BlockNumber: 2, Topics: []common.Hash{}},
90+
{BlockNumber: 3, Topics: []common.Hash{}},
91+
}
92+
getter := newMockSnapshotGetter(makeSnapshotData(logs))
93+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
94+
95+
blockNumber, err := filterer.BlockNumber(context.Background())
96+
assert.NoError(t, err)
97+
assert.Equal(t, uint64(3), blockNumber)
98+
})
99+
t.Run("filter", func(t *testing.T) {
100+
t.Parallel()
101+
logs := []types.Log{
102+
{BlockNumber: 1, Address: common.HexToAddress("0x1"), TxHash: common.HexToHash("0x1"), Topics: []common.Hash{common.HexToHash("0xa1")}},
103+
{BlockNumber: 2, Address: common.HexToAddress("0x2"), TxHash: common.HexToHash("0x2"), Topics: []common.Hash{common.HexToHash("0xa1")}},
104+
{BlockNumber: 3, Address: common.HexToAddress("0x3"), TxHash: common.HexToHash("0x3"), Topics: []common.Hash{common.HexToHash("0xa3")}},
105+
{BlockNumber: 4, Address: common.HexToAddress("0x4"), TxHash: common.HexToHash("0x4"), Topics: []common.Hash{common.HexToHash("0xa4")}},
106+
{BlockNumber: 5, Address: common.HexToAddress("0x4"), TxHash: common.HexToHash("0x4"), Topics: []common.Hash{common.HexToHash("0xa4"), common.HexToHash("0xa5")}},
107+
}
108+
getter := newMockSnapshotGetter(makeSnapshotData(logs))
109+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
110+
111+
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
112+
FromBlock: big.NewInt(2),
113+
ToBlock: big.NewInt(3),
114+
})
115+
require.NoError(t, err)
116+
require.Len(t, res, 2)
117+
assert.Equal(t, uint64(2), res[0].BlockNumber)
118+
assert.Equal(t, uint64(3), res[1].BlockNumber)
119+
120+
res, err = filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
121+
Addresses: []common.Address{common.HexToAddress("0x3"), common.HexToAddress("0x4")},
122+
})
123+
require.NoError(t, err)
124+
require.Len(t, res, 3)
125+
assert.Equal(t, 0, res[0].Address.Cmp(common.HexToAddress("0x3")))
126+
assert.Equal(t, 0, res[1].Address.Cmp(common.HexToAddress("0x4")))
127+
assert.Equal(t, 0, res[2].Address.Cmp(common.HexToAddress("0x4")))
128+
129+
res, err = filterer.FilterLogs(context.Background(), ethereum.FilterQuery{})
130+
require.NoError(t, err)
131+
require.Len(t, res, 5)
132+
133+
res, err = filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
134+
Topics: [][]common.Hash{},
135+
})
136+
require.NoError(t, err)
137+
require.Len(t, res, 5)
138+
139+
res, err = filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
140+
Topics: [][]common.Hash{
141+
{common.HexToHash("0xa1"), common.HexToHash("0xa4"), common.HexToHash("0xa5")},
142+
},
143+
})
144+
require.NoError(t, err)
145+
require.Len(t, res, 4)
146+
assert.Equal(t, 0, res[0].Topics[0].Cmp(common.HexToHash("0xa1")))
147+
assert.Equal(t, 0, res[1].Topics[0].Cmp(common.HexToHash("0xa1")))
148+
assert.Equal(t, 0, res[2].Topics[0].Cmp(common.HexToHash("0xa4")))
149+
assert.Equal(t, 0, res[3].Topics[0].Cmp(common.HexToHash("0xa4")))
150+
})
151+
}

pkg/postage/listener/listener.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
var (
4242
ErrPostageSyncingStalled = errors.New("postage syncing stalled")
4343
ErrPostagePaused = errors.New("postage contract is paused")
44+
ErrParseSnapshot = errors.New("failed to parse snapshot data")
4445
)
4546

4647
type BlockHeightContractFilterer interface {
@@ -293,7 +294,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
293294
if errors.Is(err, context.Canceled) {
294295
return nil
295296
}
296-
297+
if errors.Is(err, ErrParseSnapshot) {
298+
return err
299+
}
297300
l.metrics.BackendErrors.Inc()
298301
l.logger.Warning("could not get block number", "error", err)
299302
lastConfirmedBlock = 0
@@ -328,6 +331,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
328331

329332
events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to))))
330333
if err != nil {
334+
if errors.Is(err, ErrParseSnapshot) {
335+
return err
336+
}
331337
l.metrics.BackendErrors.Inc()
332338
l.logger.Warning("could not get blockchain log", "error", err)
333339
lastConfirmedBlock = 0

0 commit comments

Comments
 (0)