@@ -15,10 +15,9 @@ import (
1515 "github.com/filecoin-project/go-f3/internal/clock"
1616 "github.com/filecoin-project/go-f3/internal/psutil"
1717 "github.com/filecoin-project/go-f3/manifest"
18- "go.opentelemetry.io/otel/metric"
19-
2018 pubsub "github.com/libp2p/go-libp2p-pubsub"
21- peer "github.com/libp2p/go-libp2p/core/peer"
19+ "github.com/libp2p/go-libp2p/core/peer"
20+ "go.opentelemetry.io/otel/metric"
2221 "go.uber.org/multierr"
2322 "golang.org/x/sync/errgroup"
2423)
@@ -109,7 +108,6 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
109108 }
110109
111110 finalityCertificates , unsubCerts := h .certStore .Subscribe ()
112-
113111 h .errgrp .Go (func () (_err error ) {
114112 defer func () {
115113 unsubCerts ()
@@ -164,6 +162,45 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
164162 }
165163 return nil
166164 })
165+
166+ // Asynchronously checkpoint the decided tipset keys by explicitly making a
167+ // separate subscription to the cert store. This may cause a sync in a case where
168+ // the finalized tipset is not already stored by the chain store, which is a
169+ // blocking operation. Hence, the asynchronous checkpointing.
170+ //
171+ // Note, there is no guarantee that every finalized tipset will be checkpointed.
172+ // Because:
173+ // 1. the subscription only returns the latest certificate, i.e. may
174+ // miss intermediate certificates, and
175+ // 2. errors that may occur during checkpointing are silently logged
176+ // to allow checkpointing of future finality certificates.
177+ //
178+ // Triggering the checkpointing here means that certstore remains the sole source
179+ // of truth in terms of tipsets that have been finalised.
180+ finalize , unsubFinalize := h .certStore .Subscribe ()
181+ h .errgrp .Go (func () error {
182+ defer unsubFinalize ()
183+ for h .runningCtx .Err () == nil {
184+ select {
185+ case <- h .runningCtx .Done ():
186+ return nil
187+ case cert , ok := <- finalize :
188+ if ! ok {
189+ // This should never happen according to certstore subscribe semantic. If it
190+ // does, error loudly since the chances are the cause is a programmer error.
191+ return errors .New ("cert store subscription to finalize tipsets was closed unexpectedly" )
192+ }
193+ key := cert .ECChain .Head ().Key
194+ if err := h .ec .Finalize (h .runningCtx , key ); err != nil {
195+ // There is not much we can do here other than logging. The next instance start
196+ // will effectively retry checkpointing the latest finalized tipset. This error
197+ // will not impact the selection of next instance chain.
198+ log .Error (fmt .Errorf ("error while finalizing decision at EC: %w" , err ))
199+ }
200+ }
201+ }
202+ return nil
203+ })
167204 return nil
168205}
169206
@@ -408,7 +445,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e
408445 return res [1 :], nil
409446}
410447
411- func (h * gpbftRunner ) Stop (_ctx context.Context ) error {
448+ func (h * gpbftRunner ) Stop (context.Context ) error {
412449 h .ctxCancel ()
413450 return multierr .Combine (
414451 h .errgrp .Wait (),
@@ -604,7 +641,7 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
604641 // we cannot reuse the timer because we don't know if it was read or not
605642 h .alertTimer .Stop ()
606643 if at .IsZero () {
607- // It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
644+ // If "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
608645 // replace it for the reason stated above.
609646 h .alertTimer = h .clock .Timer (0 )
610647 if ! h .alertTimer .Stop () {
0 commit comments