Skip to content

Commit a4b857e

Browse files
authored
Implement dynamic fusing manifest (#884)
* chore: Remove unused comments from fusing_provider_test.go Signed-off-by: Jakub Sztandera <[email protected]> * Implement dynamic fusing manifest Signed-off-by: Jakub Sztandera <[email protected]> * Move the init code into goroutine Signed-off-by: Jakub Sztandera <[email protected]> * fix typo, move to always defer stop Signed-off-by: Jakub Sztandera <[email protected]> * refactor startTimeOfPriority Signed-off-by: Jakub Sztandera <[email protected]> * Remove error log Signed-off-by: Jakub Sztandera <[email protected]> * Fix init behaviour Signed-off-by: Jakub Sztandera <[email protected]> * Switch to primary secondary Signed-off-by: Jakub Sztandera <[email protected]> --------- Signed-off-by: Jakub Sztandera <[email protected]>
1 parent 221e40b commit a4b857e

File tree

2 files changed

+173
-49
lines changed

2 files changed

+173
-49
lines changed

manifest/fusing_provider.go

Lines changed: 107 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package manifest
22

33
import (
44
"context"
5-
"fmt"
65
"time"
76

87
"github.com/filecoin-project/go-f3/ec"
@@ -18,13 +17,13 @@ type HeadGetter interface {
1817

1918
var _ ManifestProvider = (*FusingManifestProvider)(nil)
2019

21-
// FusingManifestProvider is a ManifestProvider that starts by providing dynamic manifest updates
22-
// then switches to a static manifest when we get within finality of said manifest's bootstrap
20+
// FusingManifestProvider is a ManifestProvider that starts by providing secondary manifest updates
21+
// then switches to a primary manifest when we get within finality of said manifest's bootstrap
2322
// epoch.
2423
type FusingManifestProvider struct {
25-
ec HeadGetter
26-
dynamic ManifestProvider
27-
static *Manifest
24+
ec HeadGetter
25+
secondary ManifestProvider
26+
primary ManifestProvider
2827

2928
manifestCh chan *Manifest
3029

@@ -34,19 +33,17 @@ type FusingManifestProvider struct {
3433
clock clock.Clock
3534
}
3635

37-
func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, static *Manifest) (*FusingManifestProvider, error) {
38-
if err := static.Validate(); err != nil {
39-
return nil, err
40-
}
41-
36+
// NewFusingManifestProvider creates a provider that will lock into the primary manifest onces it reaches BootstrapEpoch-Finality of primary manifest
37+
// the primary ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of secondary manifests.
38+
func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, secondary ManifestProvider, primary ManifestProvider) (*FusingManifestProvider, error) {
4239
clk := clock.GetClock(ctx)
4340
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
4441
errgrp, ctx := errgroup.WithContext(ctx)
4542

4643
return &FusingManifestProvider{
4744
ec: ec,
48-
dynamic: dynamic,
49-
static: static,
45+
secondary: secondary,
46+
primary: primary,
5047
errgrp: errgrp,
5148
cancel: cancel,
5249
runningCtx: ctx,
@@ -60,41 +57,112 @@ func (m *FusingManifestProvider) ManifestUpdates() <-chan *Manifest {
6057
}
6158

6259
func (m *FusingManifestProvider) Start(ctx context.Context) error {
63-
head, err := m.ec.GetHead(ctx)
64-
if err != nil {
65-
return fmt.Errorf("failed to determine current head epoch")
60+
if err := m.primary.Start(ctx); err != nil {
61+
return err
6662
}
6763

68-
switchEpoch := m.static.BootstrapEpoch - m.static.EC.Finality
69-
headEpoch := head.Epoch()
70-
71-
if headEpoch >= switchEpoch {
72-
m.manifestCh <- m.static
73-
return nil
64+
if err := m.secondary.Start(ctx); err != nil {
65+
return err
7466
}
7567

76-
epochDelay := switchEpoch - headEpoch
77-
start := head.Timestamp().Add(time.Duration(epochDelay) * m.static.EC.Period)
68+
m.errgrp.Go(func() error {
69+
defer m.primary.Stop(context.Background())
70+
defer m.secondary.Stop(context.Background())
71+
72+
startTimeOfPriority := func(head ec.TipSet, mani *Manifest) time.Time {
73+
headEpoch := head.Epoch()
74+
switchEpoch := mani.BootstrapEpoch - mani.EC.Finality
75+
epochDelay := switchEpoch - headEpoch
76+
start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period)
77+
return start
78+
}
7879

79-
if err := m.dynamic.Start(ctx); err != nil {
80-
return err
81-
}
80+
var primaryManifest *Manifest
81+
// create a stopped timer
82+
timer := m.clock.Timer(time.Hour)
83+
timer.Stop()
8284

83-
log.Infof("starting the fusing manifest provider, will switch to the static manifest at %s", start)
85+
first := true
86+
for m.runningCtx.Err() == nil {
87+
if !first {
88+
m.clock.Sleep(5 * time.Second)
89+
first = false
90+
}
91+
92+
select {
93+
case primaryManifest = <-m.primary.ManifestUpdates():
94+
case <-m.runningCtx.Done():
95+
// we were stopped, clean exit
96+
return nil
97+
}
98+
99+
head, err := m.ec.GetHead(m.runningCtx)
100+
if err != nil {
101+
log.Errorf("failed to determine current head epoch: %w", err)
102+
continue
103+
}
104+
headEpoch := head.Epoch()
105+
// exit early if primaryManifest is relevant right now
106+
if primaryManifest != nil && headEpoch >= primaryManifest.BootstrapEpoch-primaryManifest.EC.Finality {
107+
m.manifestCh <- primaryManifest
108+
return nil
109+
}
110+
111+
if primaryManifest == nil {
112+
// init with stopped timer
113+
break
114+
}
115+
startTime := startTimeOfPriority(head, primaryManifest)
116+
log.Infof("starting the fusing manifest provider, will switch to the primary manifest at %s",
117+
startTime)
118+
if err != nil {
119+
log.Errorf("trying to compute start time: %w", err)
120+
continue
121+
}
122+
timer.Reset(m.clock.Until(startTime))
123+
break
124+
}
125+
126+
if m.runningCtx.Err() != nil {
127+
return nil
128+
}
84129

85-
m.errgrp.Go(func() error {
86-
dynamicUpdates := m.dynamic.ManifestUpdates()
87-
timer := m.clock.Timer(m.clock.Until(start))
88130
defer timer.Stop()
89131

90132
for m.runningCtx.Err() == nil {
91133
select {
134+
case primaryManifest = <-m.primary.ManifestUpdates():
135+
if primaryManifest == nil {
136+
timer.Stop()
137+
continue
138+
}
139+
head, err := m.ec.GetHead(m.runningCtx)
140+
if err != nil {
141+
log.Errorf("getting head in fusing manifest: %+v", err)
142+
}
143+
startTime := startTimeOfPriority(head, primaryManifest)
144+
if err != nil {
145+
log.Errorf("trying to compute start time: %+v", err)
146+
// set timer in one epoch, shouldn't happen but be defensive
147+
timer.Reset(primaryManifest.EC.Period)
148+
continue
149+
}
150+
151+
log.Infof("got new primaryManifest, will switch to the primary manifest at %s",
152+
startTime)
153+
timer.Reset(m.clock.Until(startTime))
92154
case <-timer.C:
155+
if primaryManifest == nil {
156+
// just a consistency check, timer might have fired before it was stopped
157+
continue
158+
}
159+
93160
// Make sure we're actually at the target epoch. This shouldn't be
94161
// an issue unless our clocks are really funky, the network is
95162
// behind, or we're in a lotus integration test
96163
// (https://github.com/filecoin-project/lotus/issues/12557).
97164
head, err := m.ec.GetHead(m.runningCtx)
165+
switchEpoch := primaryManifest.BootstrapEpoch - primaryManifest.EC.Finality
98166
switch {
99167
case err != nil:
100168
log.Errorw("failed to get head in fusing manifest provider", "error", err)
@@ -103,32 +171,26 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error {
103171
log.Infow("delaying fusing manifest switch-over because head is behind the target epoch",
104172
"head", head.Epoch(),
105173
"target epoch", switchEpoch,
106-
"bootstrap epoch", m.static.BootstrapEpoch,
174+
"bootstrap epoch", primaryManifest.BootstrapEpoch,
107175
)
108-
timer.Reset(m.static.EC.Period)
176+
timer.Reset(primaryManifest.EC.Period)
109177
continue
110178
}
111179

112180
log.Infow(
113-
"fusing to the static manifest, stopping the dynamic manifest provider",
114-
"network", m.static.NetworkName,
115-
"bootstrap epoch", m.static.BootstrapEpoch,
181+
"fusing to the primary manifest, stopping the secondary manifest provider",
182+
"network", primaryManifest.NetworkName,
183+
"bootstrap epoch", primaryManifest.BootstrapEpoch,
116184
"current epoch", head.Epoch(),
117185
)
118-
m.updateManifest(m.static)
119-
// Log any errors and move on. We don't bubble it because we don't
120-
// want to stop everything if shutting down the dynamic manifest
121-
// provider fails when switching over to a static manifest.
122-
if err := m.dynamic.Stop(context.Background()); err != nil {
123-
log.Errorw("failure when stopping dynamic manifest provider", "error", err)
124-
}
186+
m.updateManifest(primaryManifest)
125187
return nil
126-
case update := <-dynamicUpdates:
188+
case update := <-m.secondary.ManifestUpdates():
127189
m.updateManifest(update)
128190
case <-m.runningCtx.Done():
129191
}
130192
}
131-
return m.dynamic.Stop(context.Background())
193+
return nil
132194
})
133195

134196
return nil

manifest/fusing_provider_test.go

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
// Static manifest provider that doesn't allow any changes
15-
// in runtime to the initial manifest set in the provider
1614
type testManifestProvider chan *manifest.Manifest
1715

1816
func (testManifestProvider) Start(context.Context) error { return nil }
@@ -34,8 +32,12 @@ func TestFusingManifestProvider(t *testing.T) {
3432

3533
fakeEc := consensus.NewFakeEC(ctx)
3634
manifestCh := make(chan *manifest.Manifest, 10)
35+
priorityManifestCh := make(chan *manifest.Manifest, 1)
36+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
37+
priorityManifestCh <- initialManifest
38+
3739
prov, err := manifest.NewFusingManifestProvider(ctx,
38-
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
40+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
3941
require.NoError(t, err)
4042

4143
require.NoError(t, prov.Start(ctx))
@@ -95,10 +97,70 @@ func TestFusingManifestProviderStop(t *testing.T) {
9597

9698
fakeEc := consensus.NewFakeEC(ctx)
9799
manifestCh := make(chan *manifest.Manifest, 1)
100+
priorityManifestCh := make(chan *manifest.Manifest, 1)
101+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
102+
priorityManifestCh <- initialManifest
103+
98104
prov, err := manifest.NewFusingManifestProvider(ctx,
99-
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
105+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
100106
require.NoError(t, err)
101107

102108
require.NoError(t, prov.Start(ctx))
103109
require.NoError(t, prov.Stop(ctx))
104110
}
111+
112+
func TestFusingManifestProviderSwitchToPriority(t *testing.T) {
113+
ctx, cancel := context.WithCancel(context.Background())
114+
ctx, clk := clock.WithMockClock(ctx)
115+
t.Cleanup(cancel)
116+
117+
initialManifest := manifest.LocalDevnetManifest()
118+
initialManifest.BootstrapEpoch = 2000
119+
initialManifest.EC.Finality = 900
120+
121+
fakeEc := consensus.NewFakeEC(ctx)
122+
manifestCh := make(chan *manifest.Manifest, 10)
123+
124+
priorityManifestCh := make(chan *manifest.Manifest, 1)
125+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
126+
priorityManifestCh <- nil
127+
128+
prov, err := manifest.NewFusingManifestProvider(ctx,
129+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
130+
require.NoError(t, err)
131+
132+
require.NoError(t, prov.Start(ctx))
133+
priorityManifestCh <- initialManifest
134+
135+
// Create and push a dynamic manifest with bootstrap epoch < 1100
136+
dynamicManifest := *initialManifest
137+
dynamicManifest.BootstrapEpoch = 1000
138+
select {
139+
case manifestCh <- &dynamicManifest:
140+
default:
141+
t.Fatal("failed to enqueue dynamic manifest")
142+
}
143+
144+
select {
145+
case m := <-prov.ManifestUpdates():
146+
require.True(t, m.Equal(&dynamicManifest), "expected dynamic manifest")
147+
case <-time.After(time.Second):
148+
t.Fatal("expected a manifest update")
149+
}
150+
151+
// Add time to reach the priority manifest switch epoch
152+
clk.Add(initialManifest.EC.Period * 1200)
153+
for i := 0; i < 10; i++ {
154+
// fixes weird quirk with fake time
155+
// where the initial manifest doesn't get processed before the initial clk.Add
156+
// and the timer doesn't fire until another clk.Add
157+
clk.Add(1)
158+
}
159+
t.Logf("clck now: %s", clk.Now())
160+
select {
161+
case m := <-prov.ManifestUpdates():
162+
require.True(t, m.Equal(initialManifest), "expected to receive the priority manifest")
163+
case <-time.After(time.Second):
164+
t.Fatal("expected a manifest update")
165+
}
166+
}

0 commit comments

Comments
 (0)