Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions pkg/storageincentives/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/transaction"
"resenje.org/singleflight"
)

const loggerName = "storageincentives"
Expand Down Expand Up @@ -70,6 +71,7 @@ type Agent struct {
chainStateGetter postage.ChainStateGetter
commitLock sync.Mutex
health Health
sampleFlight singleflight.Group[string, sampleResult]
}

func New(overlay swarm.Address,
Expand Down Expand Up @@ -445,35 +447,55 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) {
return true, nil
}

func (a *Agent) makeSample(ctx context.Context, committedDepth uint8) (SampleData, error) {
salt, err := a.contract.ReserveSalt(ctx)
if err != nil {
return SampleData{}, err
}
type sampleResult struct {
Items []storer.SampleItem
Hash swarm.Address
}

timeLimiter, err := a.getPreviousRoundTime(ctx)
if err != nil {
return SampleData{}, err
}
// reserveSampleAndHash runs getPreviousRoundTime, ReserveSample, and sampleHash
// as a singleflight keyed by anchor and depth to deduplicate concurrent calls.
func (a *Agent) reserveSampleAndHash(ctx context.Context, anchor []byte, depth uint8) (sampleResult, error) {
key := fmt.Sprintf("%x_%d", anchor, depth)

res, _, err := a.sampleFlight.Do(ctx, key, func(ctx context.Context) (sampleResult, error) {
timeLimiter, err := a.getPreviousRoundTime(ctx)
if err != nil {
return sampleResult{}, err
}

rSample, err := a.store.ReserveSample(ctx, anchor, depth, uint64(timeLimiter), a.minBatchBalance())
if err != nil {
return sampleResult{}, err
}

hash, err := sampleHash(rSample.Items)
if err != nil {
return sampleResult{}, err
}

return sampleResult{Items: rSample.Items, Hash: hash}, nil
})

return res, err
}

rSample, err := a.store.ReserveSample(ctx, salt, committedDepth, uint64(timeLimiter), a.minBatchBalance())
func (a *Agent) makeSample(ctx context.Context, committedDepth uint8) (SampleData, error) {
salt, err := a.contract.ReserveSalt(ctx)
if err != nil {
return SampleData{}, err
}

sampleHash, err := sampleHash(rSample.Items)
res, err := a.reserveSampleAndHash(ctx, salt, committedDepth)
if err != nil {
return SampleData{}, err
}

sample := SampleData{
return SampleData{
Anchor1: salt,
ReserveSampleItems: rSample.Items,
ReserveSampleHash: sampleHash,
ReserveSampleItems: res.Items,
ReserveSampleHash: res.Hash,
StorageRadius: committedDepth,
}

return sample, nil
}, nil
}

func (a *Agent) minBatchBalance() *big.Int {
Expand Down Expand Up @@ -571,28 +593,18 @@ func (a *Agent) SampleWithProofs(
) (SampleWithProofs, error) {
sampleStartTime := time.Now()

timeLimiter, err := a.getPreviousRoundTime(ctx)
if err != nil {
return SampleWithProofs{}, err
}

rSample, err := a.store.ReserveSample(ctx, anchor1, storageRadius, uint64(timeLimiter), a.minBatchBalance())
res, err := a.reserveSampleAndHash(ctx, anchor1, storageRadius)
if err != nil {
return SampleWithProofs{}, err
}

hash, err := sampleHash(rSample.Items)
if err != nil {
return SampleWithProofs{}, fmt.Errorf("sample hash: %w", err)
}

proofs, err := makeInclusionProofs(rSample.Items, anchor1, anchor2)
proofs, err := makeInclusionProofs(res.Items, anchor1, anchor2)
if err != nil {
return SampleWithProofs{}, fmt.Errorf("make proofs: %w", err)
}

return SampleWithProofs{
Hash: hash,
Hash: res.Hash,
Proofs: proofs,
Duration: time.Since(sampleStartTime),
}, nil
Expand Down
Loading