Skip to content

Commit 48c19ef

Browse files
authored
feat: optimize postage batch snapshot (#5343)
1 parent 07cc20d commit 48c19ef

File tree

4 files changed

+130
-12
lines changed

4 files changed

+130
-12
lines changed

pkg/node/snapshot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func NewSnapshotLogFilterer(logger log.Logger, getter SnapshotGetter) *SnapshotL
5050
}
5151
}
5252

53+
func (f *SnapshotLogFilterer) GetBatchSnapshot() []byte {
54+
return f.getter.GetBatchSnapshot()
55+
}
56+
5357
// loadSnapshot is responsible for loading and processing the snapshot data.
5458
// It is intended to be called exactly once by initOnce.Do.
5559
func (f *SnapshotLogFilterer) loadSnapshot() error {

pkg/node/snapshot_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/ethersphere/bee/v2/pkg/postage/listener"
2121
"github.com/stretchr/testify/assert"
2222
"github.com/stretchr/testify/require"
23+
24+
archive "github.com/ethersphere/batch-archive"
2325
)
2426

2527
type mockSnapshotGetter struct {
@@ -33,6 +35,12 @@ func (m mockSnapshotGetter) GetBatchSnapshot() []byte {
3335
return m.data
3436
}
3537

38+
type realSnapshotGetter struct{}
39+
40+
func (r realSnapshotGetter) GetBatchSnapshot() []byte {
41+
return archive.GetBatchSnapshot()
42+
}
43+
3644
func makeSnapshotData(logs []types.Log) []byte {
3745
var buf bytes.Buffer
3846
gz := gzip.NewWriter(&buf)
@@ -149,3 +157,77 @@ func TestNewSnapshotLogFilterer(t *testing.T) {
149157
assert.Equal(t, 0, res[3].Topics[0].Cmp(common.HexToHash("0xa4")))
150158
})
151159
}
160+
161+
func TestSnapshotLogFilterer_RealSnapshot(t *testing.T) {
162+
t.Parallel()
163+
164+
getter := realSnapshotGetter{}
165+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
166+
167+
t.Run("block number", func(t *testing.T) {
168+
blockNumber, err := filterer.BlockNumber(context.Background())
169+
assert.NoError(t, err)
170+
assert.Greater(t, blockNumber, uint64(0))
171+
})
172+
173+
t.Run("filter range", func(t *testing.T) {
174+
// arbitrary range that should exist in the snapshot
175+
from := big.NewInt(20000000)
176+
to := big.NewInt(20001000)
177+
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
178+
FromBlock: from,
179+
ToBlock: to,
180+
})
181+
require.NoError(t, err)
182+
for _, l := range res {
183+
assert.GreaterOrEqual(t, l.BlockNumber, from.Uint64())
184+
assert.LessOrEqual(t, l.BlockNumber, to.Uint64())
185+
}
186+
})
187+
188+
t.Run("filter address mismatch", func(t *testing.T) {
189+
// random address that should not match the postage stamp contract
190+
addr := common.HexToAddress("0x1234567890123456789012345678901234567890")
191+
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
192+
Addresses: []common.Address{addr},
193+
})
194+
require.NoError(t, err)
195+
assert.Empty(t, res)
196+
})
197+
}
198+
199+
func BenchmarkNewSnapshotLogFilterer_Load(b *testing.B) {
200+
getter := realSnapshotGetter{}
201+
b.ResetTimer()
202+
for i := 0; i < b.N; i++ {
203+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
204+
_, err := filterer.BlockNumber(context.Background())
205+
if err != nil {
206+
b.Fatal(err)
207+
}
208+
}
209+
}
210+
211+
func BenchmarkSnapshotLogFilterer(b *testing.B) {
212+
getter := realSnapshotGetter{}
213+
filterer := node.NewSnapshotLogFilterer(log.Noop, getter)
214+
// ensure loaded
215+
if _, err := filterer.BlockNumber(context.Background()); err != nil {
216+
b.Fatal(err)
217+
}
218+
219+
b.Run("FilterLogs", func(b *testing.B) {
220+
b.ResetTimer()
221+
for i := 0; i < b.N; i++ {
222+
from := big.NewInt(20000000)
223+
to := big.NewInt(20001000)
224+
_, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
225+
FromBlock: from,
226+
ToBlock: to,
227+
})
228+
if err != nil {
229+
b.Fatal(err)
230+
}
231+
}
232+
})
233+
}

pkg/postage/batchservice/batchservice.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type batchService struct {
4040

4141
checksum hash.Hash // checksum hasher
4242
resync bool
43+
44+
pendingChainState *postage.ChainState
4345
}
4446

4547
type Interface interface {
@@ -95,15 +97,22 @@ func New(
9597
}
9698
}
9799

98-
return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, sum, resync}, nil
100+
return &batchService{stateStore: stateStore, storer: storer, logger: logger.WithName(loggerName).Register(), listener: listener, owner: owner, batchListener: batchListener, checksum: sum, resync: resync}, nil
101+
}
102+
103+
func (svc *batchService) getChainState() *postage.ChainState {
104+
if svc.pendingChainState != nil {
105+
return svc.pendingChainState
106+
}
107+
return svc.storer.GetChainState()
99108
}
100109

101110
// Create will create a new batch with the given ID, owner value and depth and
102111
// stores it in the BatchedStore.
103112
func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error {
104-
// don't add batches which have value which equals total cumulative
113+
// dont add batches which have value which equals total cumulative
105114
// payout or that are going to expire already within the next couple of blocks
106-
val := big.NewInt(0).Add(svc.storer.GetChainState().TotalAmount, svc.storer.GetChainState().CurrentPrice)
115+
val := big.NewInt(0).Add(svc.getChainState().TotalAmount, svc.getChainState().CurrentPrice)
107116
if normalisedBalance.Cmp(val) <= 0 {
108117
// don't do anything
109118
return fmt.Errorf("batch service: batch %x: %w", id, ErrZeroValueBatch)
@@ -112,7 +121,7 @@ func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance
112121
ID: id,
113122
Owner: owner,
114123
Value: normalisedBalance,
115-
Start: svc.storer.GetChainState().Block,
124+
Start: svc.getChainState().Block,
116125
Depth: depth,
117126
BucketDepth: bucketDepth,
118127
Immutable: immutable,
@@ -196,10 +205,13 @@ func (svc *batchService) UpdateDepth(id []byte, depth uint8, normalisedBalance *
196205
// UpdatePrice implements the EventUpdater interface. It sets the current
197206
// price from the chain in the service chain state.
198207
func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error {
199-
cs := svc.storer.GetChainState()
208+
cs := svc.getChainState()
200209
cs.CurrentPrice = price
201-
if err := svc.storer.PutChainState(cs); err != nil {
202-
return fmt.Errorf("put chain state: %w", err)
210+
211+
if svc.pendingChainState == nil {
212+
if err := svc.storer.PutChainState(cs); err != nil {
213+
return fmt.Errorf("put chain state: %w", err)
214+
}
203215
}
204216

205217
sum, err := svc.updateChecksum(txHash)
@@ -212,7 +224,7 @@ func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error {
212224
}
213225

214226
func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
215-
cs := svc.storer.GetChainState()
227+
cs := svc.getChainState()
216228
if blockNumber == cs.Block {
217229
return nil
218230
}
@@ -223,17 +235,27 @@ func (svc *batchService) UpdateBlockNumber(blockNumber uint64) error {
223235

224236
cs.TotalAmount.Add(cs.TotalAmount, diff.Mul(diff, cs.CurrentPrice))
225237
cs.Block = blockNumber
226-
if err := svc.storer.PutChainState(cs); err != nil {
227-
return fmt.Errorf("put chain state: %w", err)
238+
239+
if svc.pendingChainState == nil {
240+
if err := svc.storer.PutChainState(cs); err != nil {
241+
return fmt.Errorf("put chain state: %w", err)
242+
}
228243
}
229244

230245
svc.logger.Debug("block height updated", "new_block", blockNumber)
231246
return nil
232247
}
233248
func (svc *batchService) TransactionStart() error {
249+
svc.pendingChainState = svc.storer.GetChainState()
234250
return svc.stateStore.Put(dirtyDBKey, true)
235251
}
236252
func (svc *batchService) TransactionEnd() error {
253+
if svc.pendingChainState != nil {
254+
if err := svc.storer.PutChainState(svc.pendingChainState); err != nil {
255+
return fmt.Errorf("put chain state: %w", err)
256+
}
257+
svc.pendingChainState = nil
258+
}
237259
return svc.stateStore.Delete(dirtyDBKey)
238260
}
239261

pkg/postage/listener/listener.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const loggerName = "listener"
2929

3030
const (
3131
blockPage = 5000 // how many blocks to sync every time we page
32+
blockPageSnapshot = 50000 // how many blocks to sync every time from snapshot
3233
tailSize = 4 // how many blocks to tail from the tip of the chain
3334
defaultBatchFactor = uint64(5) // minimal number of blocks to sync at once
3435
)
@@ -241,6 +242,15 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
241242

242243
l.logger.Debug("batch factor", "value", batchFactor)
243244

245+
// Type assertion to detect if backend is SnapshotLogFilterer
246+
pageSize := uint64(blockPage)
247+
if _, isSnapshot := l.ev.(interface{ GetBatchSnapshot() []byte }); isSnapshot {
248+
pageSize = blockPageSnapshot
249+
l.logger.Debug("using snapshot page size", "page_size", pageSize)
250+
} else {
251+
l.logger.Debug("using standard page size", "page_size", pageSize)
252+
}
253+
244254
synced := make(chan error)
245255
closeOnce := new(sync.Once)
246256
paged := true
@@ -321,9 +331,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
321331
}
322332

323333
// do some paging (sub-optimal)
324-
if to-from >= blockPage {
334+
if to-from >= pageSize {
325335
paged = true
326-
to = from + blockPage - 1
336+
to = from + pageSize - 1
327337
} else {
328338
closeOnce.Do(func() { synced <- nil })
329339
}

0 commit comments

Comments
 (0)