Skip to content

Commit 146d738

Browse files
committed
feat: import batches from exported postage contract events log file
1 parent 7e10b69 commit 146d738

File tree

4 files changed

+199
-1
lines changed

4 files changed

+199
-1
lines changed

go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/ethersphere/bee/v2
22

3-
go 1.24
3+
go 1.24.0
4+
5+
toolchain go1.24.2
46

57
require (
68
contrib.go.opencensus.io/exporter/prometheus v0.4.2
@@ -55,6 +57,8 @@ require (
5557
resenje.org/web v0.4.3
5658
)
5759

60+
require github.com/ethersphere/batch-archive v0.0.2
61+
5862
require (
5963
github.com/BurntSushi/toml v1.1.0 // indirect
6064
github.com/Microsoft/go-winio v0.6.1 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1
240240
github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg=
241241
github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA=
242242
github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8=
243+
github.com/ethersphere/batch-archive v0.0.2 h1:OmZw4tn9o0p9+MyBZq7KngufqqJ91OETtK4uxXWgEVg=
244+
github.com/ethersphere/batch-archive v0.0.2/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
243245
github.com/ethersphere/go-price-oracle-abi v0.2.0 h1:wtIcYLgNZHY4BjYwJCnu93SvJdVAZVvBaKinspyyHvQ=
244246
github.com/ethersphere/go-price-oracle-abi v0.2.0/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
245247
github.com/ethersphere/go-storage-incentives-abi v0.9.2 h1:6Pmxuj48LBTxayzwADNYmcbiqj6ongoRWwWV4Wp1EPo=

pkg/node/node.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,31 @@ func NewBee(
802802
}
803803
)
804804

805+
if !batchStoreExists && (networkID == mainnetNetworkID) {
806+
chainBackend, err := NewSnapshotBlockHeightContractFilterer(logger)
807+
if err != nil {
808+
logger.Debug("failed to initialize batch snapshot chain backend", "error", err)
809+
} else {
810+
eventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)
811+
b.listenerCloser = eventListener
812+
813+
batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
814+
if err != nil {
815+
return nil, fmt.Errorf("init batch service: %w", err)
816+
}
817+
818+
if err := batchSvc.Start(ctx, postageSyncStart, initBatchState); err != nil {
819+
return nil, err
820+
}
821+
822+
if err := eventListener.Close(); err != nil {
823+
return nil, err
824+
}
825+
826+
postageSyncStart = chainBackend.maxBlockHeight
827+
}
828+
}
829+
805830
if batchSvc != nil && chainEnabled {
806831
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")
807832

pkg/node/snapshot.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package node
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"compress/gzip"
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
12+
"github.com/ethereum/go-ethereum"
13+
"github.com/ethereum/go-ethereum/core/types"
14+
archive "github.com/ethersphere/batch-archive"
15+
"github.com/ethersphere/bee/v2/pkg/log"
16+
)
17+
18+
type SnapshotBlockHeightContractFilterer struct {
19+
logger log.Logger
20+
loadedLogs []types.Log
21+
maxBlockHeight uint64
22+
isLoaded bool
23+
}
24+
25+
func NewSnapshotBlockHeightContractFilterer(logger log.Logger) (*SnapshotBlockHeightContractFilterer, error) {
26+
f := &SnapshotBlockHeightContractFilterer{
27+
logger: logger,
28+
}
29+
30+
if err := f.loadAndProcessSnapshot(); err != nil {
31+
return nil, fmt.Errorf("failed to load and process snapshot during initialization: %w", err)
32+
}
33+
34+
return f, nil
35+
}
36+
37+
func (f *SnapshotBlockHeightContractFilterer) loadAndProcessSnapshot() error {
38+
f.logger.Info("loading batch snapshot during construction")
39+
data := archive.GetBatchSnapshot(true)
40+
41+
dataReader := bytes.NewReader(data)
42+
gzipReader, err := gzip.NewReader(dataReader)
43+
if err != nil {
44+
f.logger.Error(err, "failed to create gzip reader for batch import")
45+
return fmt.Errorf("create gzip reader: %w", err)
46+
}
47+
defer gzipReader.Close()
48+
49+
if err := f.parseLogs(gzipReader); err != nil {
50+
f.logger.Error(err, "failed to parse logs from snapshot")
51+
return err
52+
}
53+
54+
f.isLoaded = true
55+
f.logger.Info("batch snapshot loaded successfully during construction", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight)
56+
return nil
57+
}
58+
59+
func (f *SnapshotBlockHeightContractFilterer) parseLogs(reader io.Reader) error {
60+
var parsedLogs []types.Log
61+
var currentMaxBlockHeight uint64
62+
scanner := bufio.NewScanner(reader)
63+
64+
for scanner.Scan() {
65+
line := scanner.Bytes()
66+
if len(bytes.TrimSpace(line)) == 0 {
67+
continue
68+
}
69+
70+
var logEntry types.Log
71+
if err := json.Unmarshal(line, &logEntry); err != nil {
72+
f.logger.Warning("failed to unmarshal log event, skipping line", "error", err, "line_snippet", string(line[:min(len(line), 100)]))
73+
continue
74+
}
75+
76+
if logEntry.BlockNumber > currentMaxBlockHeight {
77+
currentMaxBlockHeight = logEntry.BlockNumber
78+
}
79+
parsedLogs = append(parsedLogs, logEntry)
80+
}
81+
82+
if err := scanner.Err(); err != nil {
83+
return fmt.Errorf("error scanning batch import data: %w", err)
84+
}
85+
86+
f.loadedLogs = parsedLogs
87+
f.maxBlockHeight = currentMaxBlockHeight
88+
f.isLoaded = true
89+
return nil
90+
}
91+
92+
func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
93+
f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query)
94+
95+
var filtered []types.Log
96+
97+
for _, log := range f.loadedLogs {
98+
// Filter by block range
99+
if query.FromBlock != nil && log.BlockNumber < query.FromBlock.Uint64() {
100+
continue
101+
}
102+
if query.ToBlock != nil && log.BlockNumber > query.ToBlock.Uint64() {
103+
continue
104+
}
105+
106+
// Filter by addresses
107+
if len(query.Addresses) > 0 {
108+
addressMatch := false
109+
for _, addr := range query.Addresses {
110+
if log.Address == addr {
111+
addressMatch = true
112+
break
113+
}
114+
}
115+
if !addressMatch {
116+
continue
117+
}
118+
}
119+
120+
// Filter by topics
121+
if len(query.Topics) > 0 {
122+
topicMatch := true
123+
124+
// We have a max of 4 topics in Ethereum logs
125+
for i := 0; i < len(query.Topics) && i < 4; i++ {
126+
// Skip if no filter for this topic position or empty filter array
127+
if i >= len(query.Topics) || len(query.Topics[i]) == 0 {
128+
continue
129+
}
130+
131+
// If we're filtering for this topic position but log doesn't have enough topics
132+
if i >= len(log.Topics) {
133+
topicMatch = false
134+
break
135+
}
136+
137+
// Check if this topic matches any in the filter array for this position
138+
hasMatch := false
139+
for _, topic := range query.Topics[i] {
140+
if log.Topics[i] == topic {
141+
hasMatch = true
142+
break
143+
}
144+
}
145+
146+
if !hasMatch {
147+
topicMatch = false
148+
break
149+
}
150+
}
151+
152+
if !topicMatch {
153+
continue
154+
}
155+
}
156+
157+
// If we got here, log passed all filters
158+
filtered = append(filtered, log)
159+
}
160+
161+
f.logger.Debug("filtered logs", "input_count", len(f.loadedLogs), "output_count", len(filtered))
162+
return filtered, nil
163+
}
164+
165+
func (f *SnapshotBlockHeightContractFilterer) BlockNumber(_ context.Context) (uint64, error) {
166+
return f.maxBlockHeight, nil
167+
}

0 commit comments

Comments
 (0)