Skip to content

Commit 28ff21c

Browse files
authored
feat(store)!: metrics for Store (#129)
Includes: * headHeight gauge * flushTime histogram * readTime histogram * writeQueueBlocked counter Breaking because removes existing `head` metrics which are now moved into the store itself. Also fixes two minor bugs
1 parent a8ce731 commit 28ff21c

File tree

4 files changed

+192
-58
lines changed

4 files changed

+192
-58
lines changed

metrics.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

store/metrics.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package store
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"time"
7+
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
)
12+
13+
var meter = otel.Meter("header/store")
14+
15+
type metrics struct {
16+
headHeight atomic.Int64
17+
headHeightInst metric.Int64ObservableGauge
18+
headHeightReg metric.Registration
19+
20+
flushTimeInst metric.Float64Histogram
21+
readTimeInst metric.Float64Histogram
22+
23+
writesQueueBlockedInst metric.Int64Counter
24+
}
25+
26+
func newMetrics() (m *metrics, err error) {
27+
m = new(metrics)
28+
m.headHeightInst, err = meter.Int64ObservableGauge(
29+
"hdr_store_head_height_gauge",
30+
metric.WithDescription("current header store head height(subjective height)"),
31+
)
32+
if err != nil {
33+
return nil, err
34+
}
35+
m.headHeightReg, err = meter.RegisterCallback(m.observeHeight, m.headHeightInst)
36+
if err != nil {
37+
return nil, err
38+
}
39+
m.flushTimeInst, err = meter.Float64Histogram(
40+
"hdr_store_flush_time_hist",
41+
metric.WithDescription("header store flush time in seconds"),
42+
)
43+
if err != nil {
44+
return nil, err
45+
}
46+
m.readTimeInst, err = meter.Float64Histogram(
47+
"hdr_store_read_time_hist",
48+
metric.WithDescription("header store single header read time from datastore in seconds and ignoring cache"),
49+
)
50+
if err != nil {
51+
return nil, err
52+
}
53+
m.writesQueueBlockedInst, err = meter.Int64Counter(
54+
"hdr_store_writes_blocked_counter",
55+
metric.WithDescription("header store writes blocked counter"),
56+
)
57+
if err != nil {
58+
return nil, err
59+
}
60+
return m, nil
61+
}
62+
63+
func (m *metrics) newHead(height uint64) {
64+
m.observe(context.Background(), func(ctx context.Context) {
65+
m.headHeight.Store(int64(height))
66+
})
67+
}
68+
69+
func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error {
70+
obs.ObserveInt64(m.headHeightInst, m.headHeight.Load())
71+
return nil
72+
}
73+
74+
func (m *metrics) flush(ctx context.Context, duration time.Duration, amount int, failed bool) {
75+
m.observe(ctx, func(ctx context.Context) {
76+
m.flushTimeInst.Record(ctx,
77+
duration.Seconds(),
78+
metric.WithAttributes(
79+
attribute.Int("amount", amount/100), // divide by 100 to reduce cardinality
80+
attribute.Bool("failed", failed),
81+
),
82+
)
83+
})
84+
}
85+
86+
func (m *metrics) readSingle(ctx context.Context, duration time.Duration, failed bool) {
87+
m.observe(ctx, func(ctx context.Context) {
88+
m.readTimeInst.Record(ctx,
89+
duration.Seconds(),
90+
metric.WithAttributes(attribute.Bool("failed", failed)),
91+
)
92+
})
93+
}
94+
95+
func (m *metrics) writesQueueBlocked(ctx context.Context) {
96+
m.observe(ctx, func(ctx context.Context) {
97+
m.writesQueueBlockedInst.Add(ctx, 1)
98+
})
99+
}
100+
101+
func (m *metrics) observe(ctx context.Context, f func(context.Context)) {
102+
if m == nil {
103+
return
104+
}
105+
106+
if ctx.Err() != nil {
107+
ctx = context.Background()
108+
}
109+
110+
f(ctx)
111+
}
112+
113+
func (m *metrics) Close() error {
114+
if m == nil {
115+
return nil
116+
}
117+
118+
return m.headHeightReg.Unregister()
119+
}

store/options.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ type Parameters struct {
1818
// IndexCacheSize defines the maximum amount of entries in the Height to Hash index cache.
1919
IndexCacheSize int
2020

21-
// WriteBatchSize defines the size of the batched header write.
21+
// WriteBatchSize defines the size of the batched header flush.
2222
// Headers are written in batches not to thrash the underlying Datastore with writes.
2323
WriteBatchSize int
2424

2525
// storePrefix defines the prefix used to wrap the store
2626
// OPTIONAL
2727
storePrefix datastore.Key
28+
29+
// metrics is a flag that enables metrics collection
30+
metrics bool
2831
}
2932

3033
// DefaultParameters returns the default params to configure the store.
@@ -51,6 +54,13 @@ func (p *Parameters) Validate() error {
5154
return nil
5255
}
5356

57+
// WithMetrics enables metrics on the Store.
58+
func WithMetrics() Option {
59+
return func(p *Parameters) {
60+
p.metrics = true
61+
}
62+
}
63+
5464
// WithStoreCacheSize is a functional option that configures the
5565
// `StoreCacheSize` parameter.
5666
func WithStoreCacheSize(size int) Option {

store/store.go

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"sync/atomic"
8+
"time"
89

910
lru "github.com/hashicorp/golang-lru"
1011
"github.com/ipfs/go-datastore"
@@ -31,6 +32,8 @@ type Store[H header.Header[H]] struct {
3132
ds datastore.Batching
3233
// adaptive replacement cache of headers
3334
cache *lru.ARCCache
35+
// metrics collection instance
36+
metrics *metrics
3437

3538
// header heights management
3639
//
@@ -102,15 +105,24 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
102105
return nil, fmt.Errorf("failed to create height indexer: %w", err)
103106
}
104107

108+
var metrics *metrics
109+
if params.metrics {
110+
metrics, err = newMetrics()
111+
if err != nil {
112+
return nil, err
113+
}
114+
}
115+
105116
return &Store[H]{
106-
Params: params,
107117
ds: wrappedStore,
118+
cache: cache,
119+
metrics: metrics,
120+
heightIndex: index,
108121
heightSub: newHeightSub[H](),
109122
writes: make(chan []H, 16),
110123
writesDn: make(chan struct{}),
111-
cache: cache,
112-
heightIndex: index,
113124
pending: newBatch[H](params.WriteBatchSize),
125+
Params: params,
114126
}, nil
115127
}
116128

@@ -141,17 +153,22 @@ func (s *Store[H]) Stop(ctx context.Context) error {
141153
default:
142154
}
143155
// signal to prevent further writes to Store
144-
s.writes <- nil
145156
select {
146-
case <-s.writesDn: // wait till it is done writing
157+
case s.writes <- nil:
158+
case <-ctx.Done():
159+
return ctx.Err()
160+
}
161+
// wait till it is done writing
162+
select {
163+
case <-s.writesDn:
147164
case <-ctx.Done():
148165
return ctx.Err()
149166
}
150167

151168
// cleanup caches
152169
s.cache.Purge()
153170
s.heightIndex.cache.Purge()
154-
return nil
171+
return s.metrics.Close()
155172
}
156173

157174
func (s *Store[H]) Height() uint64 {
@@ -172,7 +189,7 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro
172189
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
173190
return zero, header.ErrNoHead
174191
case err == nil:
175-
s.heightSub.SetHeight(uint64(head.Height()))
192+
s.heightSub.SetHeight(head.Height())
176193
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
177194
return head, nil
178195
}
@@ -188,12 +205,8 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
188205
return h, nil
189206
}
190207

191-
b, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
208+
b, err := s.get(ctx, hash)
192209
if err != nil {
193-
if errors.Is(err, datastore.ErrNotFound) {
194-
return zero, header.ErrNotFound
195-
}
196-
197210
return zero, err
198211
}
199212

@@ -356,15 +369,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
356369
verified, head = append(verified, h), h
357370
}
358371

372+
onWrite := func() {
373+
newHead := verified[len(verified)-1]
374+
s.writeHead.Store(&newHead)
375+
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
376+
s.metrics.newHead(newHead.Height())
377+
}
378+
359379
// queue headers to be written on disk
360380
select {
361381
case s.writes <- verified:
362-
ln := len(verified)
363-
s.writeHead.Store(&verified[ln-1])
364-
wh := *s.writeHead.Load()
365-
log.Infow("new head", "height", wh.Height(), "hash", wh.Hash())
366382
// we return an error here after writing,
367383
// as there might be an invalid header in between of a given range
384+
onWrite()
385+
return err
386+
default:
387+
s.metrics.writesQueueBlocked(ctx)
388+
}
389+
// if the writes queue is full, we block until it is not
390+
select {
391+
case s.writes <- verified:
392+
onWrite()
368393
return err
369394
case <-s.writesDn:
370395
return errStoppedStore
@@ -393,13 +418,17 @@ func (s *Store[H]) flushLoop() {
393418
continue
394419
}
395420

396-
err := s.flush(ctx, s.pending.GetAll()...)
421+
startTime := time.Now()
422+
toFlush := s.pending.GetAll()
423+
err := s.flush(ctx, toFlush...)
397424
if err != nil {
425+
from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height()
398426
// TODO(@Wondertan): Should this be a fatal error case with os.Exit?
399-
from, to := uint64(headers[0].Height()), uint64(headers[len(headers)-1].Height())
400427
log.Errorw("writing header batch", "from", from, "to", to)
428+
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)
401429
continue
402430
}
431+
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
403432
// reset pending
404433
s.pending.Reset()
405434

@@ -472,3 +501,18 @@ func (s *Store[H]) readHead(ctx context.Context) (H, error) {
472501

473502
return s.Get(ctx, head)
474503
}
504+
505+
func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
506+
startTime := time.Now()
507+
data, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
508+
if err != nil {
509+
s.metrics.readSingle(ctx, time.Since(startTime), true)
510+
if errors.Is(err, datastore.ErrNotFound) {
511+
return nil, header.ErrNotFound
512+
}
513+
return nil, err
514+
}
515+
516+
s.metrics.readSingle(ctx, time.Since(startTime), false)
517+
return data, nil
518+
}

0 commit comments

Comments
 (0)