Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
module github.com/ethersphere/bee/v2

go 1.24
go 1.24.0

toolchain go1.24.2

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/armon/go-radix v1.0.0
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/coreos/go-semver v0.3.0
github.com/ethereum/go-ethereum v1.14.3
github.com/ethersphere/batch-archive v0.0.3
github.com/ethersphere/go-price-oracle-abi v0.6.8
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3
github.com/ethersphere/go-sw3-abi v0.6.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1
github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg=
github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA=
github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8=
github.com/ethersphere/batch-archive v0.0.3 h1:rAzvixdDkxLV5A6XdbG3uxts8ciJ+1ShgZUKE2+qsqI=
github.com/ethersphere/batch-archive v0.0.3/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/go-price-oracle-abi v0.6.8 h1:23Y0msO4ZRvB9o1NRdFDd0eewlnx37XxQm2DKbL6Qk8=
github.com/ethersphere/go-price-oracle-abi v0.6.8/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 h1:uuowc0ekcipVwYkn1Rud9LySZ094hrDq2/YfRKyjbbQ=
Expand Down
24 changes: 24 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,30 @@ func NewBee(
}
)

if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) {
chainBackend := NewSnapshotLogFilterer(logger)

snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)

snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
if err != nil {
logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...")
} else {
err = snapshotBatchSvc.Start(ctx, postageSyncStart, initBatchState)
syncStatus.Store(true)
if err != nil {
syncErr.Store(err)
logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...")
} else {
postageSyncStart = chainBackend.maxBlockHeight
}
}
if errClose := snapshotEventListener.Close(); errClose != nil {
logger.Error(errClose, "failed to close event listener (snapshot) failure")
}

}

if batchSvc != nil && chainEnabled {
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")

Expand Down
168 changes: 168 additions & 0 deletions pkg/node/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package node

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"sort"
"sync"

"slices"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
archive "github.com/ethersphere/batch-archive"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage/listener"
)

var _ listener.BlockHeightContractFilterer = (*SnapshotLogFilterer)(nil)

type SnapshotLogFilterer struct {
logger log.Logger
loadedLogs []types.Log
maxBlockHeight uint64
isLoaded bool
initOnce sync.Once
initErr error
}

func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer {
return &SnapshotLogFilterer{
logger: logger,
}
}

// loadSnapshot is responsible for loading and processing the snapshot data.
// It is intended to be called exactly once by initOnce.Do.
func (f *SnapshotLogFilterer) loadSnapshot() {
f.logger.Info("loading batch snapshot")
data := archive.GetBatchSnapshot()
dataReader := bytes.NewReader(data)
gzipReader, err := gzip.NewReader(dataReader)
if err != nil {
f.logger.Error(err, "failed to create gzip reader for batch import")
f.initErr = fmt.Errorf("create gzip reader: %w", err)
return
}
defer gzipReader.Close()

if err := f.parseLogs(gzipReader); err != nil {
f.logger.Error(err, "failed to parse logs from snapshot")
f.initErr = err
return
}

f.isLoaded = true
f.logger.Info("batch snapshot loaded and sorted successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight)
}

func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error {
var parsedLogs []types.Log
var currentMaxBlockHeight uint64

decoder := json.NewDecoder(reader)
for {
var logEntry types.Log
if err := decoder.Decode(&logEntry); err != nil {
if err == io.EOF {
break
}
f.logger.Warning("failed to decode log event, skipping", "error", err)
continue
}

if logEntry.BlockNumber > currentMaxBlockHeight {
currentMaxBlockHeight = logEntry.BlockNumber
}
parsedLogs = append(parsedLogs, logEntry)
}

f.loadedLogs = parsedLogs
f.maxBlockHeight = currentMaxBlockHeight
return nil
}

// ensureLoaded calls loadSnapshot via sync.Once to ensure thread-safe, one-time initialization.
func (f *SnapshotLogFilterer) ensureLoaded() error {
f.initOnce.Do(f.loadSnapshot)
return f.initErr
}

func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
if err := f.ensureLoaded(); err != nil {
return nil, fmt.Errorf("failed to ensure snapshot was loaded for FilterLogs: %w", err)
}
if !f.isLoaded {
return nil, fmt.Errorf("snapshot not loaded, cannot filter logs (initialization might have failed without an error)")
}

f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query_from_block", query.FromBlock, "query_to_block", query.ToBlock, "query_addresses_count", len(query.Addresses), "query_topics_count", len(query.Topics))

filtered := make([]types.Log, 0)

startIndex := 0
if query.FromBlock != nil {
fromBlockNum := query.FromBlock.Uint64()
startIndex = sort.Search(len(f.loadedLogs), func(i int) bool {
return f.loadedLogs[i].BlockNumber >= fromBlockNum
})
}

scannedCount := 0
for i := startIndex; i < len(f.loadedLogs); i++ {
logEntry := f.loadedLogs[i]
scannedCount++

if query.ToBlock != nil && logEntry.BlockNumber > query.ToBlock.Uint64() {
break
}

if len(query.Addresses) > 0 && !slices.Contains(query.Addresses, logEntry.Address) {
continue
}

if len(query.Topics) > 0 {
match := true
for topicIndex, topicCriteria := range query.Topics {
if len(topicCriteria) == 0 {
continue
}
if topicIndex >= len(logEntry.Topics) {
match = false
break
}

if !slices.Contains(topicCriteria, logEntry.Topics[topicIndex]) {
match = false
break
}
}
if !match {
continue
}
}

filtered = append(filtered, logEntry)
}

f.logger.Debug("filtered logs complete", "input_log_count", len(f.loadedLogs), "potential_logs_in_block_range", scannedCount, "output_count", len(filtered))
return filtered, nil
}

func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) {
if err := f.ensureLoaded(); err != nil {
return 0, fmt.Errorf("failed to ensure snapshot was loaded for BlockNumber: %w", err)
}
if !f.isLoaded {
return 0, fmt.Errorf("snapshot not loaded, cannot get block number")
}
return f.maxBlockHeight, nil
}
Loading