Skip to content

Commit 1dd0cbc

Browse files
authored
feat: import batches from exported postage contract events log file (#5094)
1 parent 4c81fd5 commit 1dd0cbc

File tree

10 files changed

+207
-5
lines changed

10 files changed

+207
-5
lines changed

cmd/bee/cmd/cmd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ const (
8080
optionNameTransactionDebugMode = "transaction-debug-mode"
8181
optionMinimumStorageRadius = "minimum-storage-radius"
8282
optionReserveCapacityDoubling = "reserve-capacity-doubling"
83+
optionSkipPostageSnapshot = "skip-postage-snapshot"
8384
)
8485

8586
// nolint:gochecknoinits
@@ -288,6 +289,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
288289
cmd.Flags().Bool(optionNameTransactionDebugMode, false, "skips the gas estimate step for contract transactions")
289290
cmd.Flags().Uint(optionMinimumStorageRadius, 0, "minimum radius storage threshold")
290291
cmd.Flags().Int(optionReserveCapacityDoubling, 0, "reserve capacity doubling")
292+
cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot")
291293
}
292294

293295
func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) {

cmd/bee/cmd/start.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
315315
ResolverConnectionCfgs: resolverCfgs,
316316
Resync: c.config.GetBool(optionNameResync),
317317
RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching),
318+
SkipPostageSnapshot: c.config.GetBool(optionSkipPostageSnapshot),
318319
StakingContractAddress: c.config.GetString(optionNameStakingAddress),
319320
StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity),
320321
StaticNodes: staticNodes,

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
module github.com/ethersphere/bee/v2
22

3-
go 1.24
3+
go 1.24.0
4+
5+
toolchain go1.24.2
46

57
require (
68
contrib.go.opencensus.io/exporter/prometheus v0.4.2
79
github.com/armon/go-radix v1.0.0
810
github.com/btcsuite/btcd/btcec/v2 v2.3.2
911
github.com/coreos/go-semver v0.3.0
1012
github.com/ethereum/go-ethereum v1.14.3
13+
github.com/ethersphere/batch-archive v0.0.3
1114
github.com/ethersphere/go-price-oracle-abi v0.6.8
1215
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3
1316
github.com/ethersphere/go-sw3-abi v0.6.5

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1
240240
github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg=
241241
github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA=
242242
github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8=
243+
github.com/ethersphere/batch-archive v0.0.3 h1:rAzvixdDkxLV5A6XdbG3uxts8ciJ+1ShgZUKE2+qsqI=
244+
github.com/ethersphere/batch-archive v0.0.3/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
243245
github.com/ethersphere/go-price-oracle-abi v0.6.8 h1:23Y0msO4ZRvB9o1NRdFDd0eewlnx37XxQm2DKbL6Qk8=
244246
github.com/ethersphere/go-price-oracle-abi v0.6.8/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
245247
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 h1:uuowc0ekcipVwYkn1Rud9LySZ094hrDq2/YfRKyjbbQ=

packaging/bee.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ password-file: "/var/lib/bee/password"
7878
# resolver-options: []
7979
## forces the node to resync postage contract data
8080
# resync: false
81+
## skip postage snapshot
82+
# skip-postage-snapshot: false
8183
## staking contract address
8284
# staking-address: ""
8385
## lru memory caching capacity in number of statestore entries
@@ -115,4 +117,4 @@ password-file: "/var/lib/bee/password"
115117
## send a welcome message string during handshakes
116118
# welcome-message: ""
117119
## withdrawal target addresses
118-
# withdrawal-addresses-whitelist: []
120+
# withdrawal-addresses-whitelist: []

packaging/homebrew-amd64/bee.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ password-file: "/usr/local/var/lib/swarm-bee/password"
7878
# resolver-options: []
7979
## forces the node to resync postage contract data
8080
# resync: false
81+
## skip postage snapshot
82+
# skip-postage-snapshot: false
8183
## staking contract address
8284
# staking-address: ""
8385
## lru memory caching capacity in number of statestore entries
@@ -115,4 +117,4 @@ password-file: "/usr/local/var/lib/swarm-bee/password"
115117
## send a welcome message string during handshakes
116118
# welcome-message: ""
117119
## withdrawal target addresses
118-
# withdrawal-addresses-whitelist: []
120+
# withdrawal-addresses-whitelist: []

packaging/homebrew-arm64/bee.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password"
7878
# resolver-options: []
7979
## forces the node to resync postage contract data
8080
# resync: false
81+
## skip postage snapshot
82+
# skip-postage-snapshot: false
8183
## staking contract address
8284
# staking-address: ""
8385
## lru memory caching capacity in number of statestore entries
@@ -115,4 +117,4 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password"
115117
## send a welcome message string during handshakes
116118
# welcome-message: ""
117119
## withdrawal target addresses
118-
# withdrawal-addresses-whitelist: []
120+
# withdrawal-addresses-whitelist: []

packaging/scoop/bee.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ password-file: "./password"
7878
# resolver-options: []
7979
## forces the node to resync postage contract data
8080
# resync: false
81+
## skip postage snapshot
82+
# skip-postage-snapshot: false
8183
## staking contract address
8284
# staking-address: ""
8385
## lru memory caching capacity in number of statestore entries
@@ -115,4 +117,4 @@ password-file: "./password"
115117
## send a welcome message string during handshakes
116118
# welcome-message: ""
117119
## withdrawal target addresses
118-
# withdrawal-addresses-whitelist: []
120+
# withdrawal-addresses-whitelist: []

pkg/node/node.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ type Options struct {
160160
ResolverConnectionCfgs []multiresolver.ConnectionConfig
161161
Resync bool
162162
RetrievalCaching bool
163+
SkipPostageSnapshot bool
163164
StakingContractAddress string
164165
StatestoreCacheCapacity uint64
165166
StaticNodes []swarm.Address
@@ -802,6 +803,30 @@ func NewBee(
802803
}
803804
)
804805

806+
if !o.SkipPostageSnapshot && !batchStoreExists && (networkID == mainnetNetworkID) {
807+
chainBackend := NewSnapshotLogFilterer(logger)
808+
809+
snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)
810+
811+
snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
812+
if err != nil {
813+
logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...")
814+
} else {
815+
err = snapshotBatchSvc.Start(ctx, postageSyncStart, initBatchState)
816+
syncStatus.Store(true)
817+
if err != nil {
818+
syncErr.Store(err)
819+
logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...")
820+
} else {
821+
postageSyncStart = chainBackend.maxBlockHeight
822+
}
823+
}
824+
if errClose := snapshotEventListener.Close(); errClose != nil {
825+
logger.Error(errClose, "failed to close event listener (snapshot) failure")
826+
}
827+
828+
}
829+
805830
if batchSvc != nil && chainEnabled {
806831
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")
807832

pkg/node/snapshot.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2025 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package node
6+
7+
import (
8+
"bytes"
9+
"compress/gzip"
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"io"
14+
"sort"
15+
"sync"
16+
17+
"slices"
18+
19+
"github.com/ethereum/go-ethereum"
20+
"github.com/ethereum/go-ethereum/core/types"
21+
archive "github.com/ethersphere/batch-archive"
22+
"github.com/ethersphere/bee/v2/pkg/log"
23+
"github.com/ethersphere/bee/v2/pkg/postage/listener"
24+
)
25+
26+
var _ listener.BlockHeightContractFilterer = (*SnapshotLogFilterer)(nil)
27+
28+
type SnapshotLogFilterer struct {
29+
logger log.Logger
30+
loadedLogs []types.Log
31+
maxBlockHeight uint64
32+
initOnce sync.Once
33+
}
34+
35+
func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer {
36+
return &SnapshotLogFilterer{
37+
logger: logger,
38+
}
39+
}
40+
41+
// loadSnapshot is responsible for loading and processing the snapshot data.
42+
// It is intended to be called exactly once by initOnce.Do.
43+
func (f *SnapshotLogFilterer) loadSnapshot() error {
44+
f.logger.Info("loading batch snapshot")
45+
data := archive.GetBatchSnapshot()
46+
dataReader := bytes.NewReader(data)
47+
gzipReader, err := gzip.NewReader(dataReader)
48+
if err != nil {
49+
f.logger.Error(err, "failed to create gzip reader for batch import")
50+
return fmt.Errorf("create gzip reader: %w", err)
51+
}
52+
defer gzipReader.Close()
53+
54+
if err := f.parseLogs(gzipReader); err != nil {
55+
f.logger.Error(err, "failed to parse logs from snapshot")
56+
return err
57+
}
58+
59+
f.logger.Info("batch snapshot loaded successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight)
60+
return nil
61+
}
62+
63+
func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error {
64+
var parsedLogs []types.Log
65+
var currentMaxBlockHeight uint64
66+
67+
decoder := json.NewDecoder(reader)
68+
for {
69+
var logEntry types.Log
70+
if err := decoder.Decode(&logEntry); err != nil {
71+
if err == io.EOF {
72+
break
73+
}
74+
f.logger.Warning("failed to decode log event, skipping", "error", err)
75+
continue
76+
}
77+
78+
if logEntry.BlockNumber > currentMaxBlockHeight {
79+
currentMaxBlockHeight = logEntry.BlockNumber
80+
}
81+
parsedLogs = append(parsedLogs, logEntry)
82+
}
83+
84+
f.loadedLogs = parsedLogs
85+
f.maxBlockHeight = currentMaxBlockHeight
86+
return nil
87+
}
88+
89+
// ensureLoaded calls loadSnapshot via sync.Once to ensure thread-safe, one-time initialization.
90+
func (f *SnapshotLogFilterer) ensureLoaded() error {
91+
var err error
92+
f.initOnce.Do(func() {
93+
err = f.loadSnapshot()
94+
})
95+
return err
96+
}
97+
98+
func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
99+
if err := f.ensureLoaded(); err != nil {
100+
return nil, fmt.Errorf("failed to ensure snapshot was loaded for FilterLogs: %w", err)
101+
}
102+
103+
f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query_from_block", query.FromBlock, "query_to_block", query.ToBlock, "query_addresses_count", len(query.Addresses), "query_topics_count", len(query.Topics))
104+
105+
filtered := make([]types.Log, 0)
106+
107+
startIndex := 0
108+
if query.FromBlock != nil {
109+
fromBlockNum := query.FromBlock.Uint64()
110+
startIndex = sort.Search(len(f.loadedLogs), func(i int) bool {
111+
return f.loadedLogs[i].BlockNumber >= fromBlockNum
112+
})
113+
}
114+
115+
scannedCount := 0
116+
for i := startIndex; i < len(f.loadedLogs); i++ {
117+
logEntry := f.loadedLogs[i]
118+
scannedCount++
119+
120+
if query.ToBlock != nil && logEntry.BlockNumber > query.ToBlock.Uint64() {
121+
break
122+
}
123+
124+
if len(query.Addresses) > 0 && !slices.Contains(query.Addresses, logEntry.Address) {
125+
continue
126+
}
127+
128+
if len(query.Topics) > 0 {
129+
match := true
130+
for topicIndex, topicCriteria := range query.Topics {
131+
if len(topicCriteria) == 0 {
132+
continue
133+
}
134+
if topicIndex >= len(logEntry.Topics) {
135+
match = false
136+
break
137+
}
138+
139+
if !slices.Contains(topicCriteria, logEntry.Topics[topicIndex]) {
140+
match = false
141+
break
142+
}
143+
}
144+
if !match {
145+
continue
146+
}
147+
}
148+
149+
filtered = append(filtered, logEntry)
150+
}
151+
152+
f.logger.Debug("filtered logs complete", "input_log_count", len(f.loadedLogs), "potential_logs_in_block_range", scannedCount, "output_count", len(filtered))
153+
return filtered, nil
154+
}
155+
156+
func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) {
157+
if err := f.ensureLoaded(); err != nil {
158+
return 0, fmt.Errorf("failed to ensure snapshot was loaded for BlockNumber: %w", err)
159+
}
160+
return f.maxBlockHeight, nil
161+
}

0 commit comments

Comments
 (0)