diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index dab68c447fb..fc47aa1454e 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -466,6 +466,7 @@ func getConfigByNetworkID(networkID uint64, defaultBlockTimeInSeconds uint64) *n config.chainID = chaincfg.Mainnet.ChainID case 5: // Staging. config.chainID = chaincfg.Testnet.ChainID + config.blockTime = 12 * time.Second case chaincfg.Testnet.NetworkID: config.bootNodes = []string{"/dnsaddr/testnet.ethswarm.org"} config.blockTime = 12 * time.Second diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index a87540d390d..feccbb89e65 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -139,7 +139,6 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui phaseEvents.On(commit, func(ctx context.Context) { phaseEvents.Cancel(claim) - round, _ := a.state.currentRoundAndPhase() err := a.handleCommit(ctx, round) logErr(commit, round, err) @@ -154,7 +153,6 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui phaseEvents.On(claim, func(ctx context.Context) { phaseEvents.Cancel(reveal) phaseEvents.Publish(sample) - round, _ := a.state.currentRoundAndPhase() logErr(claim, round, a.handleClaim(ctx, round)) }) @@ -215,7 +213,7 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui prevPhase = currentPhase a.metrics.CurrentPhase.Set(float64(currentPhase)) - a.logger.Info("entered new phase", "phase", currentPhase.String(), "round", round, "block", block) + a.logger.Info("entered new phase", "phase", currentPhase.String(), "round", round, "block", block, "blockInRound", p) a.state.SetCurrentEvent(currentPhase, round) a.state.SetFullySynced(a.fullSyncedFunc()) @@ -245,7 +243,7 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui phaseCheckInterval := blockTime // optimization, we do not need to check the phase change at every new block if blocksPerPhase > 10 { - phaseCheckInterval = blockTime * 5 + phaseCheckInterval = blockTime * 3 } for { @@ -276,8 +274,7 @@ func (a *Agent) handleCommit(ctx context.Context, round uint64) error { return nil } - err := a.commit(ctx, sample, round) - if err != nil { + if err := a.commit(ctx, sample, round); err != nil { return err } @@ -342,8 +339,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { // In case when there are too many expired batches, Claim trx could runs out of gas. // To prevent this, node should first expire batches before Claiming a reward. - err = a.batchExpirer.ExpireBatches(ctx) - if err != nil { + if err = a.batchExpirer.ExpireBatches(ctx); err != nil { a.logger.Info("expire batches failed", "err", err) // Even when error happens, proceed with claim handler // because this should not prevent node from claiming a reward @@ -407,6 +403,7 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { a.logger.Info("not playing in this round") return false, nil } + a.state.SetLastSelectedRound(round + 1) a.metrics.NeighborhoodSelected.Inc() a.logger.Info("neighbourhood chosen", "round", round) diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index e02027ce00b..bcc096b0efe 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -65,7 +65,6 @@ func (db *DB) ReserveSample( consensusTime uint64, minBatchBalance *big.Int, ) (Sample, error) { - g, ctx := errgroup.WithContext(ctx) allStats := &SampleStats{} @@ -85,9 +84,14 @@ func (db *DB) ReserveSample( allStats.BatchesBelowValueDuration = time.Since(t) - chunkC := make(chan *reserve.ChunkBinItem) + // Use buffered channels to reduce blocking between phases + workers := max(4, runtime.NumCPU()) + chunkChanBuffer := min(1000, workers*10) + sampleChanBuffer := min(500, workers*5) + + chunkC := make(chan *reserve.ChunkBinItem, chunkChanBuffer) - // Phase 1: Iterate chunk addresses + // Phase 1: Iterate chunk addresses with pre-filtering g.Go(func() error { start := time.Now() stats := SampleStats{} @@ -101,6 +105,20 @@ func (db *DB) ReserveSample( if swarm.Proximity(ch.Address.Bytes(), anchor) < committedDepth { return false, nil } + + // Pre-filter by batch balance to avoid loading chunks unnecessarily + if _, found := excludedBatchIDs[string(ch.BatchID)]; found { + stats.BelowBalanceIgnored++ + return false, nil + } + + // Pre-filter by chunk type to avoid loading invalid chunks + if ch.ChunkType != swarm.ChunkTypeSingleOwner && + ch.ChunkType != swarm.ChunkTypeContentAddressed { + stats.RogueChunk++ + return false, nil + } + select { case chunkC <- ch: stats.TotalIterated++ @@ -113,16 +131,15 @@ func (db *DB) ReserveSample( }) // Phase 2: Get the chunk data and calculate transformed hash - sampleItemChan := make(chan SampleItem) + sampleItemChan := make(chan SampleItem, sampleChanBuffer) prefixHasherFactory := func() hash.Hash { return swarm.NewPrefixHasher(anchor) } - workers := max(4, runtime.NumCPU()) db.logger.Debug("reserve sampler workers", "count", workers) - for i := 0; i < workers; i++ { + for range workers { g.Go(func() error { wstat := SampleStats{} hasher := bmt.NewHasher(prefixHasherFactory) @@ -131,20 +148,6 @@ func (db *DB) ReserveSample( }() for chItem := range chunkC { - // exclude chunks who's batches balance are below minimum - if _, found := excludedBatchIDs[string(chItem.BatchID)]; found { - wstat.BelowBalanceIgnored++ - - continue - } - - // Skip chunks if they are not SOC or CAC - if chItem.ChunkType != swarm.ChunkTypeSingleOwner && - chItem.ChunkType != swarm.ChunkTypeContentAddressed { - wstat.RogueChunk++ - continue - } - chunkLoadStart := time.Now() chunk, err := db.ChunkStore().Get(ctx, chItem.Address)