Skip to content

Commit d7ad50b

Browse files
committed
Store parsigdb metrics
1 parent 6426bd7 commit d7ad50b

15 files changed

+92
-15
lines changed

app/app.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,27 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
567567
return err
568568
}
569569

570-
parSigDB := parsigdb.NewMemDB(lock.Threshold, deadlinerFunc("parsigdb"))
570+
var slotDuration uint64
571+
var genesisTime time.Time
572+
if conf.TestnetConfig.IsNonZero() {
573+
slotDuration = 12
574+
genesisTime = time.Unix(conf.TestnetConfig.GenesisTimestamp, 0)
575+
} else {
576+
network, err := eth2util.ForkVersionToNetwork(lock.ForkVersion)
577+
if err != nil {
578+
network = "mainnet"
579+
}
580+
slotDuration, err = eth2util.NetworkToSlotDuration(network)
581+
if err != nil {
582+
return err
583+
}
584+
genesisTime, err = eth2util.NetworkToGenesisTime(network)
585+
if err != nil {
586+
return err
587+
}
588+
589+
}
590+
parSigDB := parsigdb.NewMemDB(ctx, lock.Threshold, deadlinerFunc("parsigdb"), parsigdb.NewMemDBMetadata(slotDuration, genesisTime))
571591

572592
var parSigEx core.ParSigEx
573593
if conf.TestConfig.ParSigExFunc != nil {

core/dutydb/memory.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func NewMemDB(deadliner core.Deadliner) *MemDB {
3434
}
3535

3636
// MemDB is an in-memory dutyDB implementation.
37-
// It is a placeholder for the badgerDB implementation.
3837
type MemDB struct {
3938
mu sync.Mutex
4039

core/parsigdb/memory.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,41 @@ import (
66
"bytes"
77
"context"
88
"encoding/json"
9+
"strconv"
910
"sync"
11+
"time"
1012

1113
"github.com/obolnetwork/charon/app/errors"
1214
"github.com/obolnetwork/charon/app/log"
1315
"github.com/obolnetwork/charon/app/z"
1416
"github.com/obolnetwork/charon/core"
1517
)
1618

19+
// NewMemDBMetadata returns a new in-memory partial signature database instance.
20+
func NewMemDBMetadata(slotDuration uint64, genesisTime time.Time) MemDBMetadata {
21+
return MemDBMetadata{
22+
slotDuration: slotDuration,
23+
genesisTime: genesisTime,
24+
}
25+
}
26+
27+
type MemDBMetadata struct {
28+
slotDuration uint64
29+
genesisTime time.Time
30+
}
31+
1732
// NewMemDB returns a new in-memory partial signature database instance.
18-
func NewMemDB(threshold int, deadliner core.Deadliner) *MemDB {
33+
func NewMemDB(ctx context.Context, threshold int, deadliner core.Deadliner, metadata MemDBMetadata) *MemDB {
1934
return &MemDB{
2035
entries: make(map[key][]core.ParSignedData),
2136
keysByDuty: make(map[core.Duty][]key),
2237
threshold: threshold,
2338
deadliner: deadliner,
39+
metadata: metadata,
2440
}
2541
}
2642

27-
// MemDB is a placeholder in-memory partial signature database.
28-
// It will be replaced with a BadgerDB implementation.
43+
// MemDB is an in-memory partial signature database.
2944
type MemDB struct {
3045
mu sync.Mutex
3146
internalSubs []func(context.Context, core.Duty, core.ParSignedDataSet) error
@@ -35,6 +50,8 @@ type MemDB struct {
3550
keysByDuty map[core.Duty][]key
3651
threshold int
3752
deadliner core.Deadliner
53+
54+
metadata MemDBMetadata
3855
}
3956

4057
// SubscribeInternal registers a callback when an internal
@@ -146,6 +163,18 @@ func (db *MemDB) store(k key, value core.ParSignedData) ([]core.ParSignedData, b
146163
db.mu.Lock()
147164
defer db.mu.Unlock()
148165

166+
now := time.Now().UnixMilli()
167+
168+
slotStart := (uint64(db.metadata.genesisTime.Unix()) + k.Duty.Slot*db.metadata.slotDuration) * 1000 // in ms
169+
timeSinceSlotStart := float64(now-int64(slotStart)) / 1000 // in seconds
170+
switch k.Duty.Type {
171+
case core.DutyAttester:
172+
timeSinceSlotStart -= 4.0
173+
case core.DutyAggregator, core.DutySyncContribution:
174+
timeSinceSlotStart -= 8.0
175+
}
176+
parsigStored.WithLabelValues(k.Duty.Type.String(), strconv.FormatInt(int64(value.ShareIdx), 10)).Observe(timeSinceSlotStart)
177+
149178
for _, s := range db.entries[k] {
150179
if s.ShareIdx == value.ShareIdx {
151180
equal, err := parSignedDataEqual(s, value)

core/parsigdb/memory_internal_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package parsigdb
55
import (
66
"context"
77
"testing"
8+
"time"
89

910
eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
1011
"github.com/attestantio/go-eth2-client/spec/altair"
@@ -13,6 +14,7 @@ import (
1314

1415
"github.com/obolnetwork/charon/cluster"
1516
"github.com/obolnetwork/charon/core"
17+
"github.com/obolnetwork/charon/eth2util"
1618
"github.com/obolnetwork/charon/testutil"
1719
)
1820

@@ -115,7 +117,7 @@ func TestMemDBThreshold(t *testing.T) {
115117
)
116118

117119
deadliner := newTestDeadliner()
118-
db := NewMemDB(th, deadliner)
120+
db := NewMemDB(t.Context(), th, deadliner, NewMemDBMetadata(eth2util.Mainnet.SlotDuration, time.Unix(eth2util.Mainnet.GenesisTimestamp, 0)))
119121

120122
ctx := t.Context()
121123

core/parsigdb/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,11 @@ var exitCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1414
Name: "exit_total",
1515
Help: "Total number of partially signed voluntary exits per public key",
1616
}, []string{"pubkey"}) // Ok to use pubkey (high cardinality) here since these are very rare
17+
18+
var parsigStored = promauto.NewHistogramVec(prometheus.HistogramOpts{
19+
Namespace: "core",
20+
Subsystem: "parsigdb",
21+
Name: "store",
22+
Help: "Latency of partial signatures received since earliest expected time, per duty, per peer index",
23+
Buckets: []float64{.001, 0.01, 0.05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3, 5},
24+
}, []string{"duty", "peer_idx"})

dkg/dkg.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func Run(ctx context.Context, conf Config) (err error) {
242242
return errors.Wrap(err, "get peer IDs")
243243
}
244244

245-
ex := newExchanger(p2pNode, nodeIdx.PeerIdx, peerIDs, []sigType{
245+
ex := newExchanger(ctx, p2pNode, nodeIdx.PeerIdx, peerIDs, []sigType{
246246
sigLock,
247247
sigDepositData,
248248
sigValidatorRegistration,

dkg/exchanger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type exchanger struct {
5656
sigDatasChan chan map[core.PubKey][]core.ParSignedData
5757
}
5858

59-
func newExchanger(p2pNode host.Host, peerIdx int, peers []peer.ID, sigTypes []sigType, timeout time.Duration) *exchanger {
59+
func newExchanger(ctx context.Context, p2pNode host.Host, peerIdx int, peers []peer.ID, sigTypes []sigType, timeout time.Duration) *exchanger {
6060
// Partial signature roots not known yet, so skip verification in parsigex, rather verify before we aggregate.
6161
noopVerifier := func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error {
6262
return nil
@@ -82,7 +82,7 @@ func newExchanger(p2pNode host.Host, peerIdx int, peers []peer.ID, sigTypes []si
8282

8383
ex := &exchanger{
8484
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
85-
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
85+
sigdb: parsigdb.NewMemDB(ctx, len(peers), noopDeadliner{}, parsigdb.NewMemDBMetadata(0, time.Now())), // metadata timestamps are used for metrics, irrelevant for DKG
8686
sigex: parsigex.NewParSigEx(p2pNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc, p2p.WithSendTimeout(timeout), p2p.WithReceiveTimeout(timeout)),
8787
sigTypes: st,
8888
sigData: dataByPubkey{

dkg/exchanger_internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestExchanger(t *testing.T) {
9494
}
9595

9696
for i := range nodes {
97-
ex := newExchanger(hosts[i], i, peers, expectedSigTypes, 8*time.Second)
97+
ex := newExchanger(t.Context(), hosts[i], i, peers, expectedSigTypes, 8*time.Second)
9898
exchangers = append(exchangers, ex)
9999
}
100100

dkg/protocol_addoperators.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (p *addOperatorsProtocol) PostInit(ctx context.Context, pctx *ProtocolConte
8585

8686
p.allENRs = append(p.allENRs, p.newENRs...)
8787

88-
pctx.SigExchanger = newExchanger(pctx.ThisNode, pctx.ThisNodeIdx.PeerIdx, pctx.PeerIDs, []sigType{sigLock}, pctx.Config.Timeout)
88+
pctx.SigExchanger = newExchanger(ctx, pctx.ThisNode, pctx.ThisNodeIdx.PeerIdx, pctx.PeerIDs, []sigType{sigLock}, pctx.Config.Timeout)
8989
pctx.Caster = bcast.New(pctx.ThisNode, pctx.PeerIDs, pctx.ENRPrivateKey)
9090
pctx.NodeSigCaster = newNodeSigBcast(pctx.Peers, pctx.ThisNodeIdx, pctx.Caster)
9191

dkg/protocol_removeoperators.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (p *removeOperatorsProtocol) PostInit(ctx context.Context, pctx *ProtocolCo
151151
PeerIdx: nodeIdx,
152152
ShareIdx: nodeIdx + 1,
153153
}
154-
pctx.SigExchanger = newExchanger(pctx.ThisNode, nodeIdx, newPeerIDs, []sigType{sigLock}, pctx.Config.Timeout)
154+
pctx.SigExchanger = newExchanger(ctx, pctx.ThisNode, nodeIdx, newPeerIDs, []sigType{sigLock}, pctx.Config.Timeout)
155155
}
156156

157157
reshareConfig := pedersen.NewReshareConfig(len(pctx.Lock.Validators), p.newThreshold, nil, oldPeerIDs)

0 commit comments

Comments
 (0)