Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/ethersphere/bee/v2

go 1.24
go 1.24.0

toolchain go1.24.2

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
Expand Down Expand Up @@ -55,6 +57,8 @@ require (
resenje.org/web v0.4.3
)

require github.com/ethersphere/batch-archive v0.0.2

require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1
github.com/ethereum/go-ethereum v1.10.4/go.mod h1:nEE0TP5MtxGzOMd7egIrbPJMQBnhVU3ELNxhBglIzhg=
github.com/ethereum/go-ethereum v1.14.3 h1:5zvnAqLtnCZrU9uod1JCvHWJbPMURzYFHfc2eHz4PHA=
github.com/ethereum/go-ethereum v1.14.3/go.mod h1:1STrq471D0BQbCX9He0hUj4bHxX2k6mt5nOQJhDNOJ8=
github.com/ethersphere/batch-archive v0.0.2 h1:OmZw4tn9o0p9+MyBZq7KngufqqJ91OETtK4uxXWgEVg=
github.com/ethersphere/batch-archive v0.0.2/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/go-price-oracle-abi v0.6.8 h1:23Y0msO4ZRvB9o1NRdFDd0eewlnx37XxQm2DKbL6Qk8=
github.com/ethersphere/go-price-oracle-abi v0.6.8/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc3 h1:uuowc0ekcipVwYkn1Rud9LySZ094hrDq2/YfRKyjbbQ=
Expand Down
26 changes: 26 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,32 @@ func NewBee(
}
)

if !o.Resync && !batchStoreExists && (networkID == mainnetNetworkID) {
chainBackend, err := NewSnapshotLogFilterer(logger)
if err != nil {
logger.Debug("failed to initialize batch snapshot chain backend", "error", err)
} else {
eventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)

batchSvc, err := batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
if err != nil {
logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...")
} else {
err = batchSvc.Start(ctx, postageSyncStart, initBatchState)
syncStatus.Store(true)
if err != nil {
syncErr.Store(err)
logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...")
} else {
postageSyncStart = chainBackend.maxBlockHeight
}
}
if errClose := eventListener.Close(); errClose != nil {
logger.Error(errClose, "failed to close event listener (snapshot) failure")
}
}
}

if batchSvc != nil && chainEnabled {
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")

Expand Down
153 changes: 153 additions & 0 deletions pkg/node/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package node

import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"

"slices"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
archive "github.com/ethersphere/batch-archive"
"github.com/ethersphere/bee/v2/pkg/log"
)

type SnapshotLogFilterer struct {
logger log.Logger
loadedLogs []types.Log
maxBlockHeight uint64
isLoaded bool
}

func NewSnapshotLogFilterer(logger log.Logger) (*SnapshotLogFilterer, error) {
f := &SnapshotLogFilterer{
logger: logger,
}

if err := f.loadAndProcessSnapshot(); err != nil {
return nil, fmt.Errorf("failed to load and process snapshot during initialization: %w", err)
}

return f, nil
}

func (f *SnapshotLogFilterer) loadAndProcessSnapshot() error {
f.logger.Info("loading batch snapshot during construction")
data := archive.GetBatchSnapshot(true)

dataReader := bytes.NewReader(data)
gzipReader, err := gzip.NewReader(dataReader)
if err != nil {
f.logger.Error(err, "failed to create gzip reader for batch import")
return fmt.Errorf("create gzip reader: %w", err)
}
defer gzipReader.Close()

if err := f.parseLogs(gzipReader); err != nil {
f.logger.Error(err, "failed to parse logs from snapshot")
return err
}

f.isLoaded = true
f.logger.Info("batch snapshot loaded successfully during construction", "log_count", len(f.loadedLogs), "max_block_height", f.maxBlockHeight)
return nil
}

func (f *SnapshotLogFilterer) parseLogs(reader io.Reader) error {
var parsedLogs []types.Log
var currentMaxBlockHeight uint64
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
line := scanner.Bytes()
if len(bytes.TrimSpace(line)) == 0 {
continue
}

var logEntry types.Log
if err := json.Unmarshal(line, &logEntry); err != nil {
f.logger.Warning("failed to unmarshal log event, skipping line", "error", err, "line_snippet", string(line[:min(len(line), 100)]))
continue
}

if logEntry.BlockNumber > currentMaxBlockHeight {
currentMaxBlockHeight = logEntry.BlockNumber
}
parsedLogs = append(parsedLogs, logEntry)
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("error scanning batch import data: %w", err)
}

f.loadedLogs = parsedLogs
f.maxBlockHeight = currentMaxBlockHeight
f.isLoaded = true
return nil
}

func (f *SnapshotLogFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
f.logger.Debug("filtering pre-loaded logs", "total_logs", len(f.loadedLogs), "query", query)

filtered := make([]types.Log, 0, len(f.loadedLogs))

for _, log := range f.loadedLogs {
if query.FromBlock != nil && log.BlockNumber < query.FromBlock.Uint64() {
continue
}
if query.ToBlock != nil && log.BlockNumber > query.ToBlock.Uint64() {
continue
}

if len(query.Addresses) > 0 {
addressMatch := slices.Contains(query.Addresses, log.Address)
if !addressMatch {
continue
}
}

if len(query.Topics) > 0 {
topicMatch := true

for i := 0; i < len(query.Topics) && i < 4; i++ {
if i >= len(query.Topics) || len(query.Topics[i]) == 0 {
continue
}

if i >= len(log.Topics) {
topicMatch = false
break
}

hasMatch := slices.Contains(query.Topics[i], log.Topics[i])

if !hasMatch {
topicMatch = false
break
}
}

if !topicMatch {
continue
}
}

filtered = append(filtered, log)
}

f.logger.Debug("filtered logs", "input_count", len(f.loadedLogs), "output_count", len(filtered))
return filtered, nil
}

func (f *SnapshotLogFilterer) BlockNumber(_ context.Context) (uint64, error) {
return f.maxBlockHeight, nil
}
Loading