diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 4eed7ba2616..527b3645d37 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -80,6 +80,7 @@ const ( optionNameTransactionDebugMode = "transaction-debug-mode" optionMinimumStorageRadius = "minimum-storage-radius" optionReserveCapacityDoubling = "reserve-capacity-doubling" + optionSkipPostageSnapshot = "skip-postage-snapshot" ) // nolint:gochecknoinits @@ -288,6 +289,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionNameTransactionDebugMode, false, "skips the gas estimate step for contract transactions") cmd.Flags().Uint(optionMinimumStorageRadius, 0, "minimum radius storage threshold") cmd.Flags().Int(optionReserveCapacityDoubling, 0, "reserve capacity doubling") + cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot") } func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) { diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 5db4bccf857..1b1baac9503 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -315,6 +315,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo ResolverConnectionCfgs: resolverCfgs, Resync: c.config.GetBool(optionNameResync), RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), + SkipPostageSnapshot: c.config.GetBool(optionSkipPostageSnapshot), StakingContractAddress: c.config.GetString(optionNameStakingAddress), StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity), StaticNodes: staticNodes, diff --git a/go.mod b/go.mod index 7fe9c9d5d1d..f68c23aaeed 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/ethersphere/bee/v2 -go 1.24 +go 1.24.0 + +toolchain go1.24.2 require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 @@ -8,6 +10,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.14.3 + github.com/ethersphere/batch-archive v0.0.3 github.com/ethersphere/go-price-oracle-abi v0.6.8 github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 github.com/ethersphere/go-sw3-abi v0.6.5 diff --git a/go.sum b/go.sum index 8fd9582750b..d9a29df4f70 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1 github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg= github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA= github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8= +github.com/ethersphere/batch-archive v0.0.3 h1:rAzvixdDkxLV5A6XdbG3uxts8ciJ+1ShgZUKE2+qsqI= +github.com/ethersphere/batch-archive v0.0.3/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.8 h1:23Y0msO4ZRvB9o1NRdFDd0eewlnx37XxQm2DKbL6Qk8= github.com/ethersphere/go-price-oracle-abi v0.6.8/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 h1:uuowc0ekcipVwYkn1Rud9LySZ094hrDq2/YfRKyjbbQ= diff --git a/packaging/bee.yaml b/packaging/bee.yaml index 9c3bd436f29..9fedf77586c 100644 --- a/packaging/bee.yaml +++ b/packaging/bee.yaml @@ -78,6 +78,8 @@ password-file: "/var/lib/bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/var/lib/bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/homebrew-amd64/bee.yaml b/packaging/homebrew-amd64/bee.yaml index 0ebaf0a72f3..193619a6652 100644 --- a/packaging/homebrew-amd64/bee.yaml +++ b/packaging/homebrew-amd64/bee.yaml @@ -78,6 +78,8 @@ password-file: "/usr/local/var/lib/swarm-bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/usr/local/var/lib/swarm-bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/homebrew-arm64/bee.yaml b/packaging/homebrew-arm64/bee.yaml index 2a66f4ebd71..75ec057aa94 100644 --- a/packaging/homebrew-arm64/bee.yaml +++ b/packaging/homebrew-arm64/bee.yaml @@ -78,6 +78,8 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/scoop/bee.yaml b/packaging/scoop/bee.yaml index c6d16a3676e..270aee567cb 100644 --- a/packaging/scoop/bee.yaml +++ b/packaging/scoop/bee.yaml @@ -78,6 +78,8 @@ password-file: "./password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "./password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/pkg/node/node.go b/pkg/node/node.go index 8284feeeb70..9893bfb361e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -160,6 +160,7 @@ type Options struct { ResolverConnectionCfgs []multiresolver.ConnectionConfig Resync bool RetrievalCaching bool + SkipPostageSnapshot bool StakingContractAddress string StatestoreCacheCapacity uint64 StaticNodes []swarm.Address @@ -802,6 +803,30 @@ func NewBee( } ) + if !o.SkipPostageSnapshot && !batchStoreExists && (networkID == mainnetNetworkID) { + chainBackend := NewSnapshotLogFilterer(logger) + + snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) + + snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) + if err != nil { + logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") + } else { + err = snapshotBatchSvc.Start(ctx, postageSyncStart, initBatchState) + syncStatus.Store(true) + if err != nil { + syncErr.Store(err) + logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...") + } else { + postageSyncStart = chainBackend.maxBlockHeight + } + } + if errClose := snapshotEventListener.Close(); errClose != nil { + logger.Error(errClose, "failed to close event listener (snapshot) failure") + } + + } + if batchSvc != nil && chainEnabled { logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel") diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go new file mode 100644 index 00000000000..fe7963ff6c4 --- /dev/null +++ b/pkg/node/snapshot.go @@ -0,0 +1,161 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package node + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "sort" + "sync" + + "slices" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + archive "github.com/ethersphere/batch-archive" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/postage/listener" +) + +var _ listener.BlockHeightContractFilterer = (*SnapshotLogFilterer)(nil) + +type SnapshotLogFilterer struct { + logger log.Logger + loadedLogs []types.Log + maxBlockHeight uint64 + initOnce sync.Once +} + +func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer { + return &SnapshotLogFilterer{ + logger: logger, + } +} + +// loadSnapshot is responsible for loading and processing the snapshot data. +// It is intended to be called exactly once by initOnce.Do. +func (f *SnapshotLogFilterer) loadSnapshot() error { + f.logger.Info("loading batch snapshot") + data := archive.GetBatchSnapshot() + dataReader := bytes.NewReader(data) + gzipReader, err := gzip.NewReader(dataReader) + if err != nil { + f.logger.Error(err, "failed to create gzip reader for batch import") + return fmt.Errorf("create gzip reader: %w", err) + } + defer gzipReader.Close() + + if err := f.parseLogs(gzipReader); err != nil { + f.logger.Error(err, "failed to parse logs from snapshot") + return err + } + + f.logger.Info("batch snapshot loaded successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) + return nil +} + +func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { + var parsedLogs []types.Log + var currentMaxBlockHeight uint64 + + decoder := json.NewDecoder(reader) + for { + var logEntry types.Log + if err := decoder.Decode(&logEntry); err != nil { + if err == io.EOF { + break + } + f.logger.Warning("failed to decode log event, skipping", "error", err) + continue + } + + if logEntry.BlockNumber > currentMaxBlockHeight { + currentMaxBlockHeight = logEntry.BlockNumber + } + parsedLogs = append(parsedLogs, logEntry) + } + + f.loadedLogs = parsedLogs + f.maxBlockHeight = currentMaxBlockHeight + return nil +} + +// ensureLoaded calls loadSnapshot via sync.Once to ensure thread-safe, one-time initialization. +func (f *SnapshotLogFilterer) ensureLoaded() error { + var err error + f.initOnce.Do(func() { + err = f.loadSnapshot() + }) + return err +} + +func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { + if err := f.ensureLoaded(); err != nil { + return nil, fmt.Errorf("failed to ensure snapshot was loaded for FilterLogs: %w", err) + } + + f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query_from_block", query.FromBlock, "query_to_block", query.ToBlock, "query_addresses_count", len(query.Addresses), "query_topics_count", len(query.Topics)) + + filtered := make([]types.Log, 0) + + startIndex := 0 + if query.FromBlock != nil { + fromBlockNum := query.FromBlock.Uint64() + startIndex = sort.Search(len(f.loadedLogs), func(i int) bool { + return f.loadedLogs[i].BlockNumber >= fromBlockNum + }) + } + + scannedCount := 0 + for i := startIndex; i < len(f.loadedLogs); i++ { + logEntry := f.loadedLogs[i] + scannedCount++ + + if query.ToBlock != nil && logEntry.BlockNumber > query.ToBlock.Uint64() { + break + } + + if len(query.Addresses) > 0 && !slices.Contains(query.Addresses, logEntry.Address) { + continue + } + + if len(query.Topics) > 0 { + match := true + for topicIndex, topicCriteria := range query.Topics { + if len(topicCriteria) == 0 { + continue + } + if topicIndex >= len(logEntry.Topics) { + match = false + break + } + + if !slices.Contains(topicCriteria, logEntry.Topics[topicIndex]) { + match = false + break + } + } + if !match { + continue + } + } + + filtered = append(filtered, logEntry) + } + + f.logger.Debug("filtered logs complete", "input_log_count", len(f.loadedLogs), "potential_logs_in_block_range", scannedCount, "output_count", len(filtered)) + return filtered, nil +} + +func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) { + if err := f.ensureLoaded(); err != nil { + return 0, fmt.Errorf("failed to ensure snapshot was loaded for BlockNumber: %w", err) + } + return f.maxBlockHeight, nil +}