From 146d738dd335e56ffa0042fdff0155f27687d9d9 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 8 May 2025 21:15:11 +0300 Subject: [PATCH 1/7] feat: import batches from exported postage contract events log file --- go.mod | 6 +- go.sum | 2 + pkg/node/node.go | 25 +++++++ pkg/node/snapshot.go | 167 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 pkg/node/snapshot.go diff --git a/go.mod b/go.mod index ee439eeeba7..560f3960490 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/ethersphere/bee/v2 -go 1.24 +go 1.24.0 + +toolchain go1.24.2 require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 @@ -55,6 +57,8 @@ require ( resenje.org/web v0.4.3 ) +require github.com/ethersphere/batch-archive v0.0.2 + require ( github.com/BurntSushi/toml v1.1.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect diff --git a/go.sum b/go.sum index dec082610c0..f088b98aa98 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1 github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg= github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA= github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8= +github.com/ethersphere/batch-archive v0.0.2 h1:OmZw4tn9o0p9+MyBZq7KngufqqJ91OETtK4uxXWgEVg= +github.com/ethersphere/batch-archive v0.0.2/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.2.0 h1:wtIcYLgNZHY4BjYwJCnu93SvJdVAZVvBaKinspyyHvQ= github.com/ethersphere/go-price-oracle-abi v0.2.0/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.2 h1:6Pmxuj48LBTxayzwADNYmcbiqj6ongoRWwWV4Wp1EPo= diff --git a/pkg/node/node.go b/pkg/node/node.go index 8284feeeb70..f98eecb410f 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -802,6 +802,31 @@ func NewBee( } ) + if !batchStoreExists && (networkID == mainnetNetworkID) { + chainBackend, err := NewSnapshotBlockHeightContractFilterer(logger) + if err != nil { + logger.Debug("failed to initialize batch snapshot chain backend", "error", err) + } else { + eventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) + b.listenerCloser = eventListener + + batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) + if err != nil { + return nil, fmt.Errorf("init batch service: %w", err) + } + + if err := batchSvc.Start(ctx, postageSyncStart, initBatchState); err != nil { + return nil, err + } + + if err := eventListener.Close(); err != nil { + return nil, err + } + + postageSyncStart = chainBackend.maxBlockHeight + } + } + if batchSvc != nil && chainEnabled { logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel") diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go new file mode 100644 index 00000000000..bb79a11e55d --- /dev/null +++ b/pkg/node/snapshot.go @@ -0,0 +1,167 @@ +package node + +import ( + "bufio" + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + archive "github.com/ethersphere/batch-archive" + "github.com/ethersphere/bee/v2/pkg/log" +) + +type SnapshotBlockHeightContractFilterer struct { + logger log.Logger + loadedLogs []types.Log + maxBlockHeight uint64 + isLoaded bool +} + +func NewSnapshotBlockHeightContractFilterer(logger log.Logger) (*SnapshotBlockHeightContractFilterer, error) { + f := &SnapshotBlockHeightContractFilterer{ + logger: logger, + } + + if err := f.loadAndProcessSnapshot(); err != nil { + return nil, fmt.Errorf("failed to load and process snapshot during initialization: %w", err) + } + + return f, nil +} + +func (f *SnapshotBlockHeightContractFilterer) loadAndProcessSnapshot() error { + f.logger.Info("loading batch snapshot during construction") + data := archive.GetBatchSnapshot(true) + + dataReader := bytes.NewReader(data) + gzipReader, err := gzip.NewReader(dataReader) + if err != nil { + f.logger.Error(err, "failed to create gzip reader for batch import") + return fmt.Errorf("create gzip reader: %w", err) + } + defer gzipReader.Close() + + if err := f.parseLogs(gzipReader); err != nil { + f.logger.Error(err, "failed to parse logs from snapshot") + return err + } + + f.isLoaded = true + f.logger.Info("batch snapshot loaded successfully during construction", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) + return nil +} + +func (f *SnapshotBlockHeightContractFilterer) parseLogs(reader io.Reader) error { + var parsedLogs []types.Log + var currentMaxBlockHeight uint64 + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + line := scanner.Bytes() + if len(bytes.TrimSpace(line)) == 0 { + continue + } + + var logEntry types.Log + if err := json.Unmarshal(line, &logEntry); err != nil { + f.logger.Warning("failed to unmarshal log event, skipping line", "error", err, "line_snippet", string(line[:min(len(line), 100)])) + continue + } + + if logEntry.BlockNumber > currentMaxBlockHeight { + currentMaxBlockHeight = logEntry.BlockNumber + } + parsedLogs = append(parsedLogs, logEntry) + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error scanning batch import data: %w", err) + } + + f.loadedLogs = parsedLogs + f.maxBlockHeight = currentMaxBlockHeight + f.isLoaded = true + return nil +} + +func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { + f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query) + + var filtered []types.Log + + for _, log := range f.loadedLogs { + // Filter by block range + if query.FromBlock != nil && log.BlockNumber < query.FromBlock.Uint64() { + continue + } + if query.ToBlock != nil && log.BlockNumber > query.ToBlock.Uint64() { + continue + } + + // Filter by addresses + if len(query.Addresses) > 0 { + addressMatch := false + for _, addr := range query.Addresses { + if log.Address == addr { + addressMatch = true + break + } + } + if !addressMatch { + continue + } + } + + // Filter by topics + if len(query.Topics) > 0 { + topicMatch := true + + // We have a max of 4 topics in Ethereum logs + for i := 0; i < len(query.Topics) && i < 4; i++ { + // Skip if no filter for this topic position or empty filter array + if i >= len(query.Topics) || len(query.Topics[i]) == 0 { + continue + } + + // If we're filtering for this topic position but log doesn't have enough topics + if i >= len(log.Topics) { + topicMatch = false + break + } + + // Check if this topic matches any in the filter array for this position + hasMatch := false + for _, topic := range query.Topics[i] { + if log.Topics[i] == topic { + hasMatch = true + break + } + } + + if !hasMatch { + topicMatch = false + break + } + } + + if !topicMatch { + continue + } + } + + // If we got here, log passed all filters + filtered = append(filtered, log) + } + + f.logger.Debug("filtered logs", "input_count", len(f.loadedLogs), "output_count", len(filtered)) + return filtered, nil +} + +func (f *SnapshotBlockHeightContractFilterer) BlockNumber(_ context.Context) (uint64, error) { + return f.maxBlockHeight, nil +} From 2d956edbe2d98cf04f9adf40a10480ded5ecd532 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 8 May 2025 21:35:07 +0300 Subject: [PATCH 2/7] fix: linter errors and added some more checks --- pkg/node/node.go | 9 ++++++--- pkg/node/snapshot.go | 36 ++++++++++++++++-------------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index f98eecb410f..dadb2f9a668 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -802,7 +802,7 @@ func NewBee( } ) - if !batchStoreExists && (networkID == mainnetNetworkID) { + if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) { chainBackend, err := NewSnapshotBlockHeightContractFilterer(logger) if err != nil { logger.Debug("failed to initialize batch snapshot chain backend", "error", err) @@ -815,8 +815,11 @@ func NewBee( return nil, fmt.Errorf("init batch service: %w", err) } - if err := batchSvc.Start(ctx, postageSyncStart, initBatchState); err != nil { - return nil, err + err = batchSvc.Start(ctx, postageSyncStart, initBatchState) + syncStatus.Store(true) + if err != nil { + syncErr.Store(err) + return nil, fmt.Errorf("unable to start batch service: %w", err) } if err := eventListener.Close(); err != nil { diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go index bb79a11e55d..1fad22a3c7d 100644 --- a/pkg/node/snapshot.go +++ b/pkg/node/snapshot.go @@ -1,3 +1,7 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package node import ( @@ -91,19 +95,17 @@ func (f *SnapshotBlockHeightContractFilterer) parseLogs(reader io.Reader) error func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query) - - var filtered []types.Log - + + filtered := make([]types.Log, 0, len(f.loadedLogs)) + for _, log := range f.loadedLogs { - // Filter by block range if query.FromBlock != nil && log.BlockNumber < query.FromBlock.Uint64() { continue } if query.ToBlock != nil && log.BlockNumber > query.ToBlock.Uint64() { continue } - - // Filter by addresses + if len(query.Addresses) > 0 { addressMatch := false for _, addr := range query.Addresses { @@ -116,25 +118,20 @@ func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, qu continue } } - - // Filter by topics + if len(query.Topics) > 0 { topicMatch := true - - // We have a max of 4 topics in Ethereum logs + for i := 0; i < len(query.Topics) && i < 4; i++ { - // Skip if no filter for this topic position or empty filter array if i >= len(query.Topics) || len(query.Topics[i]) == 0 { continue } - - // If we're filtering for this topic position but log doesn't have enough topics + if i >= len(log.Topics) { topicMatch = false break } - - // Check if this topic matches any in the filter array for this position + hasMatch := false for _, topic := range query.Topics[i] { if log.Topics[i] == topic { @@ -142,22 +139,21 @@ func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, qu break } } - + if !hasMatch { topicMatch = false break } } - + if !topicMatch { continue } } - - // If we got here, log passed all filters + filtered = append(filtered, log) } - + f.logger.Debug("filtered logs", "input_count", len(f.loadedLogs), "output_count", len(filtered)) return filtered, nil } From e33366e8d626a8735bc94ff88069141a46ea2a88 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 8 May 2025 22:03:26 +0300 Subject: [PATCH 3/7] fix: remove listener closer --- pkg/node/node.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index dadb2f9a668..ab7b2283a26 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -808,7 +808,6 @@ func NewBee( logger.Debug("failed to initialize batch snapshot chain backend", "error", err) } else { eventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) - b.listenerCloser = eventListener batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) if err != nil { From 88f449871c825987321af2d954e639644ee14f5a Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 13 May 2025 09:29:35 +0300 Subject: [PATCH 4/7] feat: improve error handling and small refactorings --- pkg/node/node.go | 27 +++++++++++++-------------- pkg/node/snapshot.go | 32 +++++++++++--------------------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index ab7b2283a26..d7383842edb 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -803,7 +803,7 @@ func NewBee( ) if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) { - chainBackend, err := NewSnapshotBlockHeightContractFilterer(logger) + chainBackend, err := NewSnapshotLogFilterer(logger) if err != nil { logger.Debug("failed to initialize batch snapshot chain backend", "error", err) } else { @@ -811,21 +811,20 @@ func NewBee( batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) if err != nil { - return nil, fmt.Errorf("init batch service: %w", err) - } - - err = batchSvc.Start(ctx, postageSyncStart, initBatchState) - syncStatus.Store(true) - if err != nil { - syncErr.Store(err) - return nil, fmt.Errorf("unable to start batch service: %w", err) + logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") + } else { + err = batchSvc.Start(ctx, postageSyncStart, initBatchState) + syncStatus.Store(true) + if err != nil { + syncErr.Store(err) + logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...") + } else { + postageSyncStart = chainBackend.maxBlockHeight + } } - - if err := eventListener.Close(); err != nil { - return nil, err + if errClose := eventListener.Close(); errClose != nil { + logger.Error(errClose, "failed to close event listener (snapshot) failure") } - - postageSyncStart = chainBackend.maxBlockHeight } } diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go index 1fad22a3c7d..3bd788687a9 100644 --- a/pkg/node/snapshot.go +++ b/pkg/node/snapshot.go @@ -13,21 +13,23 @@ import ( "fmt" "io" + "slices" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" archive "github.com/ethersphere/batch-archive" "github.com/ethersphere/bee/v2/pkg/log" ) -type SnapshotBlockHeightContractFilterer struct { +type SnapshotLogFilterer struct { logger log.Logger loadedLogs []types.Log maxBlockHeight uint64 isLoaded bool } -func NewSnapshotBlockHeightContractFilterer(logger log.Logger) (*SnapshotBlockHeightContractFilterer, error) { - f := &SnapshotBlockHeightContractFilterer{ +func NewSnapshotLogFilterer(logger log.Logger) (*SnapshotLogFilterer, error) { + f := &SnapshotLogFilterer{ logger: logger, } @@ -38,7 +40,7 @@ func NewSnapshotBlockHeightContractFilterer(logger log.Logger) (*SnapshotBlockHe return f, nil } -func (f *SnapshotBlockHeightContractFilterer) loadAndProcessSnapshot() error { +func (f *SnapshotLogFilterer) loadAndProcessSnapshot() error { f.logger.Info("loading batch snapshot during construction") data := archive.GetBatchSnapshot(true) @@ -60,7 +62,7 @@ func (f *SnapshotBlockHeightContractFilterer) loadAndProcessSnapshot() error { return nil } -func (f *SnapshotBlockHeightContractFilterer) parseLogs(reader io.Reader) error { +func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { var parsedLogs []types.Log var currentMaxBlockHeight uint64 scanner := bufio.NewScanner(reader) @@ -93,7 +95,7 @@ func (f *SnapshotBlockHeightContractFilterer) parseLogs(reader io.Reader) error return nil } -func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { +func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query) filtered := make([]types.Log, 0, len(f.loadedLogs)) @@ -107,13 +109,7 @@ func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, qu } if len(query.Addresses) > 0 { - addressMatch := false - for _, addr := range query.Addresses { - if log.Address == addr { - addressMatch = true - break - } - } + addressMatch := slices.Contains(query.Addresses, log.Address) if !addressMatch { continue } @@ -132,13 +128,7 @@ func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, qu break } - hasMatch := false - for _, topic := range query.Topics[i] { - if log.Topics[i] == topic { - hasMatch = true - break - } - } + hasMatch := slices.Contains(query.Topics[i], log.Topics[i]) if !hasMatch { topicMatch = false @@ -158,6 +148,6 @@ func (f *SnapshotBlockHeightContractFilterer) FilterLogs(ctx context.Context, qu return filtered, nil } -func (f *SnapshotBlockHeightContractFilterer) BlockNumber(_ context.Context) (uint64, error) { +func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) { return f.maxBlockHeight, nil } From 46f50442f5b1fb365a2200bb1093aad00995411e Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 14 May 2025 14:44:53 +0300 Subject: [PATCH 5/7] feat: improve parsing of logs and filter events --- go.mod | 3 +- go.sum | 4 +- pkg/node/node.go | 32 +++++------ pkg/node/snapshot.go | 133 ++++++++++++++++++++++++------------------- 4 files changed, 92 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index 9680b202236..f68c23aaeed 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.14.3 + github.com/ethersphere/batch-archive v0.0.3 github.com/ethersphere/go-price-oracle-abi v0.6.8 github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 github.com/ethersphere/go-sw3-abi v0.6.5 @@ -57,8 +58,6 @@ require ( resenje.org/web v0.4.3 ) -require github.com/ethersphere/batch-archive v0.0.2 - require ( github.com/BurntSushi/toml v1.1.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect diff --git a/go.sum b/go.sum index 67adec20688..d9a29df4f70 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1 github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg= github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA= github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8= -github.com/ethersphere/batch-archive v0.0.2 h1:OmZw4tn9o0p9+MyBZq7KngufqqJ91OETtK4uxXWgEVg= -github.com/ethersphere/batch-archive v0.0.2/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= +github.com/ethersphere/batch-archive v0.0.3 h1:rAzvixdDkxLV5A6XdbG3uxts8ciJ+1ShgZUKE2+qsqI= +github.com/ethersphere/batch-archive v0.0.3/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.8 h1:23Y0msO4ZRvB9o1NRdFDd0eewlnx37XxQm2DKbL6Qk8= github.com/ethersphere/go-price-oracle-abi v0.6.8/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 h1:uuowc0ekcipVwYkn1Rud9LySZ094hrDq2/YfRKyjbbQ= diff --git a/pkg/node/node.go b/pkg/node/node.go index d7383842edb..67e6ec986f0 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -803,29 +803,27 @@ func NewBee( ) if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) { - chainBackend, err := NewSnapshotLogFilterer(logger) + chainBackend := NewSnapshotLogFilterer(logger) + + snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) + + snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) if err != nil { - logger.Debug("failed to initialize batch snapshot chain backend", "error", err) + logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") } else { - eventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) - - batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) + err = snapshotBatchSvc.Start(ctx, postageSyncStart, initBatchState) + syncStatus.Store(true) if err != nil { - logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") + syncErr.Store(err) + logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...") } else { - err = batchSvc.Start(ctx, postageSyncStart, initBatchState) - syncStatus.Store(true) - if err != nil { - syncErr.Store(err) - logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...") - } else { - postageSyncStart = chainBackend.maxBlockHeight - } - } - if errClose := eventListener.Close(); errClose != nil { - logger.Error(errClose, "failed to close event listener (snapshot) failure") + postageSyncStart = chainBackend.maxBlockHeight } } + if errClose := snapshotEventListener.Close(); errClose != nil { + logger.Error(errClose, "failed to close event listener (snapshot) failure") + } + } if batchSvc != nil && chainEnabled { diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go index 3bd788687a9..726a28d0f70 100644 --- a/pkg/node/snapshot.go +++ b/pkg/node/snapshot.go @@ -5,13 +5,14 @@ package node import ( - "bufio" "bytes" "compress/gzip" "context" "encoding/json" "fmt" "io" + "sort" + "sync" "slices" @@ -19,63 +20,62 @@ import ( "github.com/ethereum/go-ethereum/core/types" archive "github.com/ethersphere/batch-archive" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/postage/listener" ) +var _ listener.BlockHeightContractFilterer = (*SnapshotLogFilterer)(nil) + type SnapshotLogFilterer struct { logger log.Logger loadedLogs []types.Log maxBlockHeight uint64 isLoaded bool + initOnce sync.Once + initErr error } -func NewSnapshotLogFilterer(logger log.Logger) (*SnapshotLogFilterer, error) { - f := &SnapshotLogFilterer{ +func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer { + return &SnapshotLogFilterer{ logger: logger, } - - if err := f.loadAndProcessSnapshot(); err != nil { - return nil, fmt.Errorf("failed to load and process snapshot during initialization: %w", err) - } - - return f, nil } -func (f *SnapshotLogFilterer) loadAndProcessSnapshot() error { - f.logger.Info("loading batch snapshot during construction") - data := archive.GetBatchSnapshot(true) - +// loadSnapshot is responsible for loading and processing the snapshot data. +// It is intended to be called exactly once by initOnce.Do. +func (f *SnapshotLogFilterer) loadSnapshot() { + f.logger.Info("loading batch snapshot") + data := archive.GetBatchSnapshot() dataReader := bytes.NewReader(data) gzipReader, err := gzip.NewReader(dataReader) if err != nil { f.logger.Error(err, "failed to create gzip reader for batch import") - return fmt.Errorf("create gzip reader: %w", err) + f.initErr = fmt.Errorf("create gzip reader: %w", err) + return } defer gzipReader.Close() if err := f.parseLogs(gzipReader); err != nil { f.logger.Error(err, "failed to parse logs from snapshot") - return err + f.initErr = err + return } f.isLoaded = true - f.logger.Info("batch snapshot loaded successfully during construction", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) - return nil + f.logger.Info("batch snapshot loaded and sorted successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) } func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { var parsedLogs []types.Log var currentMaxBlockHeight uint64 - scanner := bufio.NewScanner(reader) - - for scanner.Scan() { - line := scanner.Bytes() - if len(bytes.TrimSpace(line)) == 0 { - continue - } + decoder := json.NewDecoder(reader) + for { var logEntry types.Log - if err := json.Unmarshal(line, &logEntry); err != nil { - f.logger.Warning("failed to unmarshal log event, skipping line", "error", err, "line_snippet", string(line[:min(len(line), 100)])) + if err := decoder.Decode(&logEntry); err != nil { + if err == io.EOF { + break + } + f.logger.Warning("failed to decode log event, skipping", "error", err) continue } @@ -85,69 +85,84 @@ func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { parsedLogs = append(parsedLogs, logEntry) } - if err := scanner.Err(); err != nil { - return fmt.Errorf("error scanning batch import data: %w", err) - } - f.loadedLogs = parsedLogs f.maxBlockHeight = currentMaxBlockHeight - f.isLoaded = true return nil } +// ensureLoaded calls loadSnapshot via sync.Once to ensure thread-safe, one-time initialization. +func (f *SnapshotLogFilterer) ensureLoaded() error { + f.initOnce.Do(f.loadSnapshot) + return f.initErr +} + func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { - f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query) + if err := f.ensureLoaded(); err != nil { + return nil, fmt.Errorf("failed to ensure snapshot was loaded for FilterLogs: %w", err) + } + if !f.isLoaded { + return nil, fmt.Errorf("snapshot not loaded, cannot filter logs (initialization might have failed without an error)") + } - filtered := make([]types.Log, 0, len(f.loadedLogs)) + f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query_from_block", query.FromBlock, "query_to_block", query.ToBlock, "query_addresses_count", len(query.Addresses), "query_topics_count", len(query.Topics)) - for _, log := range f.loadedLogs { - if query.FromBlock != nil && log.BlockNumber < query.FromBlock.Uint64() { - continue - } - if query.ToBlock != nil && log.BlockNumber > query.ToBlock.Uint64() { - continue + filtered := make([]types.Log, 0) + + startIndex := 0 + if query.FromBlock != nil { + fromBlockNum := query.FromBlock.Uint64() + startIndex = sort.Search(len(f.loadedLogs), func(i int) bool { + return f.loadedLogs[i].BlockNumber >= fromBlockNum + }) + } + + scannedCount := 0 + for i := startIndex; i < len(f.loadedLogs); i++ { + logEntry := f.loadedLogs[i] + scannedCount++ + + if query.ToBlock != nil && logEntry.BlockNumber > query.ToBlock.Uint64() { + break } - if len(query.Addresses) > 0 { - addressMatch := slices.Contains(query.Addresses, log.Address) - if !addressMatch { - continue - } + if len(query.Addresses) > 0 && !slices.Contains(query.Addresses, logEntry.Address) { + continue } if len(query.Topics) > 0 { - topicMatch := true - - for i := 0; i < len(query.Topics) && i < 4; i++ { - if i >= len(query.Topics) || len(query.Topics[i]) == 0 { + match := true + for topicIndex, topicCriteria := range query.Topics { + if len(topicCriteria) == 0 { continue } - - if i >= len(log.Topics) { - topicMatch = false + if topicIndex >= len(logEntry.Topics) { + match = false break } - hasMatch := slices.Contains(query.Topics[i], log.Topics[i]) - - if !hasMatch { - topicMatch = false + if !slices.Contains(topicCriteria, logEntry.Topics[topicIndex]) { + match = false break } } - - if !topicMatch { + if !match { continue } } - filtered = append(filtered, log) + filtered = append(filtered, logEntry) } - f.logger.Debug("filtered logs", "input_count", len(f.loadedLogs), "output_count", len(filtered)) + f.logger.Debug("filtered logs complete", "input_log_count", len(f.loadedLogs), "potential_logs_in_block_range", scannedCount, "output_count", len(filtered)) return filtered, nil } func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) { + if err := f.ensureLoaded(); err != nil { + return 0, fmt.Errorf("failed to ensure snapshot was loaded for BlockNumber: %w", err) + } + if !f.isLoaded { + return 0, fmt.Errorf("snapshot not loaded, cannot get block number") + } return f.maxBlockHeight, nil } From ee22ee07587408f13c2c0cf3af915c742bbafccd Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 19 May 2025 20:16:35 +0300 Subject: [PATCH 6/7] fix: remove redundant code --- pkg/node/snapshot.go | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/pkg/node/snapshot.go b/pkg/node/snapshot.go index 726a28d0f70..fe7963ff6c4 100644 --- a/pkg/node/snapshot.go +++ b/pkg/node/snapshot.go @@ -29,9 +29,7 @@ type SnapshotLogFilterer struct { logger log.Logger loadedLogs []types.Log maxBlockHeight uint64 - isLoaded bool initOnce sync.Once - initErr error } func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer { @@ -42,26 +40,24 @@ func NewSnapshotLogFilterer(logger log.Logger) *SnapshotLogFilterer { // loadSnapshot is responsible for loading and processing the snapshot data. // It is intended to be called exactly once by initOnce.Do. -func (f *SnapshotLogFilterer) loadSnapshot() { +func (f *SnapshotLogFilterer) loadSnapshot() error { f.logger.Info("loading batch snapshot") data := archive.GetBatchSnapshot() dataReader := bytes.NewReader(data) gzipReader, err := gzip.NewReader(dataReader) if err != nil { f.logger.Error(err, "failed to create gzip reader for batch import") - f.initErr = fmt.Errorf("create gzip reader: %w", err) - return + return fmt.Errorf("create gzip reader: %w", err) } defer gzipReader.Close() if err := f.parseLogs(gzipReader); err != nil { f.logger.Error(err, "failed to parse logs from snapshot") - f.initErr = err - return + return err } - f.isLoaded = true - f.logger.Info("batch snapshot loaded and sorted successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) + f.logger.Info("batch snapshot loaded successfully", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight) + return nil } func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { @@ -92,17 +88,17 @@ func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error { // ensureLoaded calls loadSnapshot via sync.Once to ensure thread-safe, one-time initialization. func (f *SnapshotLogFilterer) ensureLoaded() error { - f.initOnce.Do(f.loadSnapshot) - return f.initErr + var err error + f.initOnce.Do(func() { + err = f.loadSnapshot() + }) + return err } func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { if err := f.ensureLoaded(); err != nil { return nil, fmt.Errorf("failed to ensure snapshot was loaded for FilterLogs: %w", err) } - if !f.isLoaded { - return nil, fmt.Errorf("snapshot not loaded, cannot filter logs (initialization might have failed without an error)") - } f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query_from_block", query.FromBlock, "query_to_block", query.ToBlock, "query_addresses_count", len(query.Addresses), "query_topics_count", len(query.Topics)) @@ -161,8 +157,5 @@ func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) { if err := f.ensureLoaded(); err != nil { return 0, fmt.Errorf("failed to ensure snapshot was loaded for BlockNumber: %w", err) } - if !f.isLoaded { - return 0, fmt.Errorf("snapshot not loaded, cannot get block number") - } return f.maxBlockHeight, nil } From 981f36a344b127766beda0d51017ce9d07727448 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 21 May 2025 09:27:30 +0300 Subject: [PATCH 7/7] feat: add new snapshot config option --- cmd/bee/cmd/cmd.go | 2 ++ cmd/bee/cmd/start.go | 1 + packaging/bee.yaml | 4 +++- packaging/homebrew-amd64/bee.yaml | 4 +++- packaging/homebrew-arm64/bee.yaml | 4 +++- packaging/scoop/bee.yaml | 4 +++- pkg/node/node.go | 3 ++- 7 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 4eed7ba2616..527b3645d37 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -80,6 +80,7 @@ const ( optionNameTransactionDebugMode = "transaction-debug-mode" optionMinimumStorageRadius = "minimum-storage-radius" optionReserveCapacityDoubling = "reserve-capacity-doubling" + optionSkipPostageSnapshot = "skip-postage-snapshot" ) // nolint:gochecknoinits @@ -288,6 +289,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionNameTransactionDebugMode, false, "skips the gas estimate step for contract transactions") cmd.Flags().Uint(optionMinimumStorageRadius, 0, "minimum radius storage threshold") cmd.Flags().Int(optionReserveCapacityDoubling, 0, "reserve capacity doubling") + cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot") } func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) { diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 5db4bccf857..1b1baac9503 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -315,6 +315,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo ResolverConnectionCfgs: resolverCfgs, Resync: c.config.GetBool(optionNameResync), RetrievalCaching: c.config.GetBool(optionNameRetrievalCaching), + SkipPostageSnapshot: c.config.GetBool(optionSkipPostageSnapshot), StakingContractAddress: c.config.GetString(optionNameStakingAddress), StatestoreCacheCapacity: c.config.GetUint64(optionNameStateStoreCacheCapacity), StaticNodes: staticNodes, diff --git a/packaging/bee.yaml b/packaging/bee.yaml index 9c3bd436f29..9fedf77586c 100644 --- a/packaging/bee.yaml +++ b/packaging/bee.yaml @@ -78,6 +78,8 @@ password-file: "/var/lib/bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/var/lib/bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/homebrew-amd64/bee.yaml b/packaging/homebrew-amd64/bee.yaml index 0ebaf0a72f3..193619a6652 100644 --- a/packaging/homebrew-amd64/bee.yaml +++ b/packaging/homebrew-amd64/bee.yaml @@ -78,6 +78,8 @@ password-file: "/usr/local/var/lib/swarm-bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/usr/local/var/lib/swarm-bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/homebrew-arm64/bee.yaml b/packaging/homebrew-arm64/bee.yaml index 2a66f4ebd71..75ec057aa94 100644 --- a/packaging/homebrew-arm64/bee.yaml +++ b/packaging/homebrew-arm64/bee.yaml @@ -78,6 +78,8 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/packaging/scoop/bee.yaml b/packaging/scoop/bee.yaml index c6d16a3676e..270aee567cb 100644 --- a/packaging/scoop/bee.yaml +++ b/packaging/scoop/bee.yaml @@ -78,6 +78,8 @@ password-file: "./password" # resolver-options: [] ## forces the node to resync postage contract data # resync: false +## skip postage snapshot +# skip-postage-snapshot: false ## staking contract address # staking-address: "" ## lru memory caching capacity in number of statestore entries @@ -115,4 +117,4 @@ password-file: "./password" ## send a welcome message string during handshakes # welcome-message: "" ## withdrawal target addresses -# withdrawal-addresses-whitelist: [] \ No newline at end of file +# withdrawal-addresses-whitelist: [] diff --git a/pkg/node/node.go b/pkg/node/node.go index 67e6ec986f0..9893bfb361e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -160,6 +160,7 @@ type Options struct { ResolverConnectionCfgs []multiresolver.ConnectionConfig Resync bool RetrievalCaching bool + SkipPostageSnapshot bool StakingContractAddress string StatestoreCacheCapacity uint64 StaticNodes []swarm.Address @@ -802,7 +803,7 @@ func NewBee( } ) - if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) { + if !o.SkipPostageSnapshot && !batchStoreExists && (networkID == mainnetNetworkID) { chainBackend := NewSnapshotLogFilterer(logger) snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)