Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func getConfigByNetworkID(networkID uint64, defaultBlockTimeInSeconds uint64) *n
config.chainID = chaincfg.Mainnet.ChainID
case 5: // Staging.
config.chainID = chaincfg.Testnet.ChainID
config.blockTime = 12 * time.Second
case chaincfg.Testnet.NetworkID:
config.bootNodes = []string{"/dnsaddr/testnet.ethswarm.org"}
config.blockTime = 12 * time.Second
Expand Down
13 changes: 5 additions & 8 deletions pkg/storageincentives/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui

phaseEvents.On(commit, func(ctx context.Context) {
phaseEvents.Cancel(claim)

round, _ := a.state.currentRoundAndPhase()
err := a.handleCommit(ctx, round)
logErr(commit, round, err)
Expand All @@ -154,7 +153,6 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui
phaseEvents.On(claim, func(ctx context.Context) {
phaseEvents.Cancel(reveal)
phaseEvents.Publish(sample)

round, _ := a.state.currentRoundAndPhase()
logErr(claim, round, a.handleClaim(ctx, round))
})
Expand Down Expand Up @@ -215,7 +213,7 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui
prevPhase = currentPhase
a.metrics.CurrentPhase.Set(float64(currentPhase))

a.logger.Info("entered new phase", "phase", currentPhase.String(), "round", round, "block", block)
a.logger.Info("entered new phase", "phase", currentPhase.String(), "round", round, "block", block, "blockInRound", p)

a.state.SetCurrentEvent(currentPhase, round)
a.state.SetFullySynced(a.fullSyncedFunc())
Expand Down Expand Up @@ -245,7 +243,7 @@ func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase ui
phaseCheckInterval := blockTime
// optimization, we do not need to check the phase change at every new block
if blocksPerPhase > 10 {
phaseCheckInterval = blockTime * 5
phaseCheckInterval = blockTime * 3
}

for {
Expand Down Expand Up @@ -276,8 +274,7 @@ func (a *Agent) handleCommit(ctx context.Context, round uint64) error {
return nil
}

err := a.commit(ctx, sample, round)
if err != nil {
if err := a.commit(ctx, sample, round); err != nil {
return err
}

Expand Down Expand Up @@ -342,8 +339,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error {

// In case when there are too many expired batches, Claim trx could runs out of gas.
// To prevent this, node should first expire batches before Claiming a reward.
err = a.batchExpirer.ExpireBatches(ctx)
if err != nil {
if err = a.batchExpirer.ExpireBatches(ctx); err != nil {
a.logger.Info("expire batches failed", "err", err)
// Even when error happens, proceed with claim handler
// because this should not prevent node from claiming a reward
Expand Down Expand Up @@ -407,6 +403,7 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) {
a.logger.Info("not playing in this round")
return false, nil
}

a.state.SetLastSelectedRound(round + 1)
a.metrics.NeighborhoodSelected.Inc()
a.logger.Info("neighbourhood chosen", "round", round)
Expand Down
43 changes: 23 additions & 20 deletions pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (db *DB) ReserveSample(
consensusTime uint64,
minBatchBalance *big.Int,
) (Sample, error) {

g, ctx := errgroup.WithContext(ctx)

allStats := &SampleStats{}
Expand All @@ -85,9 +84,14 @@ func (db *DB) ReserveSample(

allStats.BatchesBelowValueDuration = time.Since(t)

chunkC := make(chan *reserve.ChunkBinItem)
// Use buffered channels to reduce blocking between phases
workers := max(4, runtime.NumCPU())
chunkChanBuffer := min(1000, workers*10)
sampleChanBuffer := min(500, workers*5)

chunkC := make(chan *reserve.ChunkBinItem, chunkChanBuffer)

// Phase 1: Iterate chunk addresses
// Phase 1: Iterate chunk addresses with pre-filtering
g.Go(func() error {
start := time.Now()
stats := SampleStats{}
Expand All @@ -101,6 +105,20 @@ func (db *DB) ReserveSample(
if swarm.Proximity(ch.Address.Bytes(), anchor) < committedDepth {
return false, nil
}

// Pre-filter by batch balance to avoid loading chunks unnecessarily
if _, found := excludedBatchIDs[string(ch.BatchID)]; found {
stats.BelowBalanceIgnored++
return false, nil
}

// Pre-filter by chunk type to avoid loading invalid chunks
if ch.ChunkType != swarm.ChunkTypeSingleOwner &&
ch.ChunkType != swarm.ChunkTypeContentAddressed {
stats.RogueChunk++
return false, nil
}

select {
case chunkC <- ch:
stats.TotalIterated++
Expand All @@ -113,16 +131,15 @@ func (db *DB) ReserveSample(
})

// Phase 2: Get the chunk data and calculate transformed hash
sampleItemChan := make(chan SampleItem)
sampleItemChan := make(chan SampleItem, sampleChanBuffer)

prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}

workers := max(4, runtime.NumCPU())
db.logger.Debug("reserve sampler workers", "count", workers)

for i := 0; i < workers; i++ {
for range workers {
g.Go(func() error {
wstat := SampleStats{}
hasher := bmt.NewHasher(prefixHasherFactory)
Expand All @@ -131,20 +148,6 @@ func (db *DB) ReserveSample(
}()

for chItem := range chunkC {
// exclude chunks who's batches balance are below minimum
if _, found := excludedBatchIDs[string(chItem.BatchID)]; found {
wstat.BelowBalanceIgnored++

continue
}

// Skip chunks if they are not SOC or CAC
if chItem.ChunkType != swarm.ChunkTypeSingleOwner &&
chItem.ChunkType != swarm.ChunkTypeContentAddressed {
wstat.RogueChunk++
continue
}

chunkLoadStart := time.Now()

chunk, err := db.ChunkStore().Get(ctx, chItem.Address)
Expand Down
Loading