Skip to content

Commit f49c874

Browse files
authored
Simplify CertStore subscriber (#646)
I have yet to write any code that cares about intermediate certificates and they can easily be queried with `GetRange`. This change: 1. Makes the certificate subscription system always return the latest certificate, dropping intermediates. 2. Drops the dependency on go-broadcast. 3. Simplifies the code that relies on the certificate subscription logic.
1 parent 92a5658 commit f49c874

File tree

5 files changed

+55
-51
lines changed

5 files changed

+55
-51
lines changed

certstore/certstore.go

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/filecoin-project/go-f3/certs"
1313
"github.com/filecoin-project/go-f3/gpbft"
1414

15-
"github.com/Kubuxu/go-broadcast"
1615
"github.com/ipfs/go-datastore"
1716
"github.com/ipfs/go-datastore/namespace"
1817
"github.com/ipfs/go-datastore/query"
@@ -30,11 +29,12 @@ var (
3029

3130
// Store is responsible for storing and relaying information about new finality certificates
3231
type Store struct {
33-
writeLk sync.Mutex
32+
mu sync.RWMutex
3433
ds datastore.Datastore
35-
busCerts broadcast.Channel[*certs.FinalityCertificate]
3634
firstInstance uint64
3735
powerTableFrequency uint64
36+
subscribers map[chan *certs.FinalityCertificate]struct{}
37+
latestCertificate *certs.FinalityCertificate
3838

3939
latestPowerTable gpbft.PowerEntries
4040
}
@@ -44,6 +44,7 @@ func open(ctx context.Context, ds datastore.Datastore) (*Store, error) {
4444
cs := &Store{
4545
ds: namespace.Wrap(ds, datastore.NewKey("/certstore")),
4646
powerTableFrequency: defaultPowerTableFrequency,
47+
subscribers: make(map[chan *certs.FinalityCertificate]struct{}),
4748
}
4849
err := maybeContinueDelete(ctx, ds)
4950
if err != nil {
@@ -57,14 +58,13 @@ func open(ctx context.Context, ds datastore.Datastore) (*Store, error) {
5758
return nil, fmt.Errorf("determining latest cert: %w", err)
5859
}
5960

60-
latestCert, err := cs.Get(ctx, latestInstance)
61+
cs.latestCertificate, err = cs.Get(ctx, latestInstance)
6162
if err != nil {
6263
return nil, fmt.Errorf("loading latest cert: %w", err)
6364
}
6465

65-
metrics.latestInstance.Record(ctx, int64(latestCert.GPBFTInstance))
66-
metrics.latestFinalizedEpoch.Record(ctx, latestCert.ECChain.Head().Epoch)
67-
cs.busCerts.Publish(latestCert)
66+
metrics.latestInstance.Record(ctx, int64(cs.latestCertificate.GPBFTInstance))
67+
metrics.latestFinalizedEpoch.Record(ctx, cs.latestCertificate.ECChain.Head().Epoch)
6868

6969
return cs, nil
7070
}
@@ -109,7 +109,7 @@ func OpenOrCreateStore(ctx context.Context, ds datastore.Datastore, firstInstanc
109109
return nil, fmt.Errorf("failed to read initial instance number: %w", err)
110110
}
111111
cs.firstInstance = firstInstance
112-
if latest := cs.Latest(); latest != nil {
112+
if latest := cs.latestCertificate; latest != nil {
113113
cs.latestPowerTable, err = cs.GetPowerTable(ctx, latest.GPBFTInstance+1)
114114
if err != nil {
115115
return nil, fmt.Errorf("failed to load latest power table: %w", err)
@@ -162,7 +162,7 @@ func OpenStore(ctx context.Context, ds datastore.Datastore) (*Store, error) {
162162
return nil, fmt.Errorf("getting first instance: %w", err)
163163
}
164164
latestPowerTable := cs.firstInstance
165-
if latest := cs.Latest(); latest != nil {
165+
if latest := cs.latestCertificate; latest != nil {
166166
latestPowerTable = latest.GPBFTInstance + 1
167167
}
168168
cs.latestPowerTable, err = cs.GetPowerTable(ctx, latestPowerTable)
@@ -195,7 +195,9 @@ func (cs *Store) writeInstanceNumber(ctx context.Context, key datastore.Key, val
195195

196196
// Latest returns the newest available certificate
197197
func (cs *Store) Latest() *certs.FinalityCertificate {
198-
return cs.busCerts.Last()
198+
cs.mu.RLock()
199+
defer cs.mu.RUnlock()
200+
return cs.latestCertificate
199201
}
200202

201203
// Get returns the FinalityCertificate at the specified instance, or an error derived from
@@ -349,11 +351,11 @@ func (cs *Store) Put(ctx context.Context, cert *certs.FinalityCertificate) error
349351
}
350352

351353
// Take a lock to ensure ordering.
352-
cs.writeLk.Lock()
353-
defer cs.writeLk.Unlock()
354+
cs.mu.Lock()
355+
defer cs.mu.Unlock()
354356

355357
nextCert := cs.firstInstance
356-
if latestCert := cs.Latest(); latestCert != nil {
358+
if latestCert := cs.latestCertificate; latestCert != nil {
357359
nextCert = latestCert.GPBFTInstance + 1
358360
}
359361
if cert.GPBFTInstance > nextCert {
@@ -412,21 +414,47 @@ func (cs *Store) Put(ctx context.Context, cert *certs.FinalityCertificate) error
412414
}
413415

414416
cs.latestPowerTable = newPowerTable
417+
cs.latestCertificate = cert
418+
for ch := range cs.subscribers {
419+
// Always drain first.
420+
select {
421+
case <-ch:
422+
default:
423+
}
424+
// Then write the latest certificate.
425+
ch <- cs.latestCertificate
426+
}
427+
415428
metrics.latestInstance.Record(ctx, int64(cert.GPBFTInstance))
416429
metrics.tipsetsPerInstance.Record(ctx, int64(len(cert.ECChain.Suffix())))
417430
metrics.latestFinalizedEpoch.Record(ctx, cert.ECChain.Head().Epoch)
418-
cs.busCerts.Publish(cert)
419431

420432
return nil
421433
}
422434

423-
// SubscribeForNewCerts is used to subscribe to the broadcast channel.
424-
// If the passed channel is full at any point, it will be dropped from subscription and closed.
425-
// To stop subscribing, either the closer function can be used or the channel can be abandoned.
426-
// Passing a channel multiple times to the Subscribe function will result in a panic.
427-
// The channel will receive new certificates sequentially.
428-
func (cs *Store) SubscribeForNewCerts(ch chan<- *certs.FinalityCertificate) (last *certs.FinalityCertificate, closer func()) {
429-
return cs.busCerts.Subscribe(ch)
435+
// Subscribe subscribes to new certificate notifications. When read, it will always return the
436+
// latest not-yet-seen certificate (including the latest certificate when Subscribe is first
437+
// called, if we have any) but it will drop intermediate certificates. If you need all the
438+
// certificates, you should keep track of the last certificate you received and call GetRange to get
439+
// the ones between.
440+
//
441+
// The caller must call the closer to unsubscribe and release resources.
442+
func (cs *Store) Subscribe() (out <-chan *certs.FinalityCertificate, closer func()) {
443+
cs.mu.Lock()
444+
defer cs.mu.Unlock()
445+
ch := make(chan *certs.FinalityCertificate, 1)
446+
if cs.latestCertificate != nil {
447+
ch <- cs.latestCertificate
448+
}
449+
cs.subscribers[ch] = struct{}{}
450+
return ch, func() {
451+
cs.mu.Lock()
452+
defer cs.mu.Unlock()
453+
if _, ok := cs.subscribers[ch]; ok {
454+
delete(cs.subscribers, ch)
455+
close(ch)
456+
}
457+
}
430458
}
431459

432460
var tombstoneKey = datastore.NewKey("/tombstone")

certstore/certstore_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestDeleteAll(t *testing.T) {
222222
verifyEmpty()
223223
}
224224

225-
func TestSubscribeForNewCerts(t *testing.T) {
225+
func TestSubscribe(t *testing.T) {
226226
t.Parallel()
227227

228228
ctx := context.Background()
@@ -233,8 +233,7 @@ func TestSubscribeForNewCerts(t *testing.T) {
233233
cs, err := CreateStore(ctx, ds, 1, pt)
234234
require.NoError(t, err)
235235

236-
ch := make(chan *certs.FinalityCertificate, 1)
237-
_, closer := cs.SubscribeForNewCerts(ch)
236+
ch, closer := cs.Subscribe()
238237
defer closer()
239238

240239
cert := makeCert(1, supp)

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/filecoin-project/go-f3
33
go 1.22
44

55
require (
6-
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6
76
github.com/drand/kyber v1.3.1
87
github.com/drand/kyber-bls12381 v0.3.1
98
github.com/filecoin-project/go-bitfield v0.2.4

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1
88
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
99
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
1010
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
11-
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6 h1:yh2/1fz3ajTaeKskSWxtSBNScdRZfQ/A5nyd9+64T6M=
12-
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6/go.mod h1:5LOj/fF3Oc/cvJqzDiyfx4XwtBPRWUYEz+V+b13sH5U=
1311
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
1412
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
1513
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=

host.go

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -108,30 +108,19 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
108108
return fmt.Errorf("starting a participant: %w", err)
109109
}
110110

111-
// Subscribe to new certificates. We don't bother canceling the subscription as that'll
112-
// happen automatically when the channel fills.
113-
finalityCertificates := make(chan *certs.FinalityCertificate, 4)
114-
_, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
111+
finalityCertificates, unsubCerts := h.certStore.Subscribe()
115112

116113
h.errgrp.Go(func() (_err error) {
117114
defer func() {
115+
unsubCerts()
118116
if _err != nil && h.runningCtx.Err() == nil {
119117
log.Errorf("exited GPBFT runner early: %+v", _err)
120118
}
121119
}()
122120
for h.runningCtx.Err() == nil {
123121
// prioritise finality certificates and alarm delivery
124122
select {
125-
case c, ok := <-finalityCertificates:
126-
// We only care about the latest certificate, skip passed old ones.
127-
if len(finalityCertificates) > 0 {
128-
continue
129-
}
130-
131-
if !ok {
132-
finalityCertificates = make(chan *certs.FinalityCertificate, 4)
133-
c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
134-
}
123+
case c := <-finalityCertificates:
135124
if err := h.receiveCertificate(c); err != nil {
136125
return err
137126
}
@@ -146,16 +135,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
146135

147136
// Handle messages, finality certificates, and alarms
148137
select {
149-
case c, ok := <-finalityCertificates:
150-
// We only care about the latest certificate, skip passed old ones.
151-
if len(finalityCertificates) > 0 {
152-
continue
153-
}
154-
155-
if !ok {
156-
finalityCertificates = make(chan *certs.FinalityCertificate, 4)
157-
c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
158-
}
138+
case c := <-finalityCertificates:
159139
if err := h.receiveCertificate(c); err != nil {
160140
return err
161141
}

0 commit comments

Comments
 (0)