Skip to content
Merged
5 changes: 2 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ const (
)

const (
multiPartFormData = "multipart/form-data"
contentTypeTar = "application/x-tar"
boolHeaderSetValue = "true"
multiPartFormData = "multipart/form-data"
contentTypeTar = "application/x-tar"
)

var (
Expand Down
1 change: 0 additions & 1 deletion pkg/api/chequebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

const (
errChequebookBalance = "cannot get chequebook balance"
errChequebookNoAmount = "did not specify amount"
errChequebookNoWithdraw = "cannot withdraw"
errChequebookNoDeposit = "cannot deposit"
errChequebookInsufficientFunds = "insufficient funds"
Expand Down
5 changes: 2 additions & 3 deletions pkg/api/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

const (
writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close
targetMaxLength = 3 // max target length in bytes, in order to prevent grieving by excess computation
writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close
)

func (s *Service) pssPostHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -42,7 +41,7 @@ func (s *Service) pssPostHandler(w http.ResponseWriter, r *http.Request) {
topic := pss.NewTopic(paths.Topic)

var targets pss.Targets
for _, v := range strings.Split(paths.Targets, ",") {
for v := range strings.SplitSeq(paths.Targets, ",") {
target := struct {
Val []byte `map:"target" validate:"required,max=3"`
}{}
Expand Down
90 changes: 90 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type metrics struct {
ExpiryRunsCount prometheus.Counter

ReserveMissingBatch prometheus.Gauge

// ReserveSample metrics
ReserveSampleDuration *prometheus.HistogramVec
ReserveSampleChunksIterated prometheus.Counter
ReserveSampleChunksLoaded prometheus.Counter
ReserveSampleChunksLoadFailed prometheus.Counter
ReserveSampleTaddrDuration *prometheus.HistogramVec
ReserveSampleStampValidations prometheus.Counter
ReserveSampleStampValidDuration *prometheus.HistogramVec
ReserveSampleSize prometheus.Gauge
ReserveSampleWorkers prometheus.Gauge
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -163,6 +174,85 @@ func newMetrics() metrics {
Help: "Number of times the expiry worker was fired.",
},
),
// ReserveSample metrics
ReserveSampleDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_duration",
Help: "Duration of ReserveSample operations.",
Buckets: prometheus.DefBuckets,
},
[]string{"status"},
),
ReserveSampleChunksIterated: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_chunks_iterated",
Help: "Total number of chunks iterated during ReserveSample.",
},
),
ReserveSampleChunksLoaded: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_chunks_loaded",
Help: "Total number of chunks successfully loaded during ReserveSample.",
},
),
ReserveSampleChunksLoadFailed: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_chunks_load_failed",
Help: "Total number of chunks that failed to load during ReserveSample.",
},
),
ReserveSampleTaddrDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_taddr_duration",
Help: "Duration of transformed address calculations during ReserveSample.",
Buckets: prometheus.DefBuckets,
},
[]string{"chunk_type"},
),
ReserveSampleStampValidations: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_stamp_validations",
Help: "Total number of stamp validations during ReserveSample.",
},
),
ReserveSampleStampValidDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_stamp_valid_duration",
Help: "Duration of stamp validations during ReserveSample.",
Buckets: prometheus.DefBuckets,
},
[]string{"status"},
),
ReserveSampleSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_size",
Help: "Number of items in the final ReserveSample.",
},
),
ReserveSampleWorkers: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_sample_workers",
Help: "Number of parallel workers used in ReserveSample.",
},
),
}
}

Expand Down
33 changes: 27 additions & 6 deletions pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func (db *DB) ReserveSample(

t := time.Now()

// Record metrics for ReserveSample
defer func() {
duration := time.Since(t)
if err := g.Wait(); err != nil {
db.metrics.ReserveSampleDuration.WithLabelValues("failure").Observe(duration.Seconds())
} else {
db.metrics.ReserveSampleDuration.WithLabelValues("success").Observe(duration.Seconds())
}
}()

excludedBatchIDs, err := db.batchesBelowValue(minBatchBalance)
if err != nil {
db.logger.Error(err, "get batches below value")
Expand All @@ -85,6 +95,7 @@ func (db *DB) ReserveSample(
allStats.BatchesBelowValueDuration = time.Since(t)

workers := max(4, runtime.NumCPU())
db.metrics.ReserveSampleWorkers.Set(float64(workers))
chunkC := make(chan *reserve.ChunkBinItem, 3*workers)

// Phase 1: Iterate chunk addresses
Expand All @@ -104,6 +115,7 @@ func (db *DB) ReserveSample(
select {
case chunkC <- ch:
stats.TotalIterated++
db.metrics.ReserveSampleChunksIterated.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be more performant if stats.totaliterated was just sent in the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better approach would be to remove TotalIterated since it uses a mutex. Using Inc() on a counter with atomic operations is much faster than a mutex. The overhead of writing to an atomic variable is negligible compared to the total amount of current work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TotalIterated is incremented without mutex, the mutex is used only at the end of the goroutine when it is added to the common, aggregating. Not sure which one is really the faster (atomic operations have hardware level contention still and there are many increments) but one of them should be kept only since they are the same.

return false, nil
case <-ctx.Done():
return false, ctx.Err()
Expand Down Expand Up @@ -133,7 +145,6 @@ func (db *DB) ReserveSample(
// exclude chunks who's batches balance are below minimum
if _, found := excludedBatchIDs[string(chItem.BatchID)]; found {
wstat.BelowBalanceIgnored++

continue
}

Expand All @@ -149,18 +160,22 @@ func (db *DB) ReserveSample(
chunk, err := db.ChunkStore().Get(ctx, chItem.Address)
if err != nil {
wstat.ChunkLoadFailed++
db.metrics.ReserveSampleChunksLoadFailed.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as at TotalIterated

db.logger.Debug("failed loading chunk", "chunk_address", chItem.Address, "error", err)
continue
}

wstat.ChunkLoadDuration += time.Since(chunkLoadStart)
db.metrics.ReserveSampleChunksLoaded.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is TotalIterated - (BelowBalanceIgnored + RogueChunk + ChunkLoadFailed). There are metrics for all of them.


taddrStart := time.Now()
taddr, err := transformedAddress(hasher, chunk, chItem.ChunkType)
if err != nil {
return err
}
wstat.TaddrDuration += time.Since(taddrStart)
taddrDuration := time.Since(taddrStart)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you created a new variable for it but you don't use that anywhere else than in the below line.

wstat.TaddrDuration += taddrDuration
db.metrics.ReserveSampleTaddrDuration.WithLabelValues(chItem.ChunkType.String()).Observe(taddrDuration.Seconds())
Copy link
Member

@nugaon nugaon Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seconds resolution is too low. Miliseconds or nanoseconds would be better


select {
case sampleItemChan <- SampleItem{
Expand Down Expand Up @@ -224,8 +239,6 @@ func (db *DB) ReserveSample(
}

if le(item.TransformedAddress, currentMaxAddr) || len(sampleItems) < SampleSize {
start := time.Now()

stamp, err := chunkstamp.LoadWithBatchID(db.storage.IndexStore(), "reserve", item.ChunkAddress, item.Stamp.BatchID())
if err != nil {
stats.StampLoadFailed++
Expand All @@ -241,13 +254,18 @@ func (db *DB) ReserveSample(
continue
}

stampValidStart := time.Now()
if _, err := db.validStamp(ch); err != nil {
stats.InvalidStamp++
db.metrics.ReserveSampleStampValidDuration.WithLabelValues("invalid").Observe(time.Since(stampValidStart).Seconds())
db.logger.Debug("invalid stamp for chunk", "chunk_address", ch.Address(), "error", err)
continue
}

stats.ValidStampDuration += time.Since(start)
stampValidDuration := time.Since(stampValidStart)
stats.ValidStampDuration += stampValidDuration
db.metrics.ReserveSampleStampValidations.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is stats.SampleInserts

db.metrics.ReserveSampleStampValidDuration.WithLabelValues("valid").Observe(stampValidDuration.Seconds())
Copy link
Member

@nugaon nugaon Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seconds is too low resolution


item.Stamp = postage.NewStamp(stamp.BatchID(), stamp.Index(), stamp.Timestamp(), stamp.Sig())

Expand All @@ -267,6 +285,9 @@ func (db *DB) ReserveSample(

db.logger.Info("reserve sampler finished", "duration", time.Since(t), "storage_radius", committedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats))

// Record final sample size
db.metrics.ReserveSampleSize.Set(float64(len(sampleItems)))

return Sample{Stats: *allStats, Items: sampleItems}, nil
}

Expand Down Expand Up @@ -382,7 +403,7 @@ func RandSample(t *testing.T, anchor []byte) Sample {
t.Helper()

chunks := make([]swarm.Chunk, SampleSize)
for i := 0; i < SampleSize; i++ {
for i := range SampleSize {
ch := chunk.GenerateTestRandomChunk()
if i%3 == 0 {
ch = chunk.GenerateTestRandomSoChunk(t, ch)
Expand Down
Loading