Skip to content

Commit 967d682

Browse files
authored
f3: Update participate loop to always participate (#302)
* f3: Update participate loop to always participate * f3: Correctly renew leases early * f3: Don't bother checking the manifest
1 parent 83fa0c5 commit 967d682

File tree

2 files changed

+35
-86
lines changed

2 files changed

+35
-86
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ require (
5454
github.com/ipfs/go-ipld-cbor v0.2.0
5555
github.com/ipfs/go-log/v2 v2.5.1
5656
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
57-
github.com/jpillora/backoff v1.0.0
5857
github.com/kelseyhightower/envconfig v1.4.0
5958
github.com/libp2p/go-buffer-pool v0.1.0
6059
github.com/manifoldco/promptui v0.9.0
@@ -206,6 +205,7 @@ require (
206205
github.com/jessevdk/go-flags v1.4.0 // indirect
207206
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
208207
github.com/josharian/intern v1.0.0 // indirect
208+
github.com/jpillora/backoff v1.0.0 // indirect
209209
github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect
210210
github.com/klauspost/compress v1.17.9 // indirect
211211
github.com/klauspost/cpuid/v2 v2.2.8 // indirect

tasks/f3/f3_task.go

Lines changed: 34 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ import (
66
"time"
77

88
logging "github.com/ipfs/go-log/v2"
9-
"github.com/jpillora/backoff"
109
"golang.org/x/xerrors"
1110

1211
"github.com/filecoin-project/go-address"
13-
"github.com/filecoin-project/go-f3/gpbft"
1412
"github.com/filecoin-project/go-f3/manifest"
1513

1614
"github.com/filecoin-project/curio/deps"
@@ -44,7 +42,6 @@ var log = logging.Logger("cf3")
4442
type F3ParticipationAPI interface {
4543
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
4644
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
47-
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
4845
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
4946
}
5047

@@ -94,31 +91,26 @@ func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done boo
9491
return false, xerrors.Errorf("failed to get participation ticket: %w", err)
9592
}
9693

97-
lease, participating, err := f.tryF3Participate(ctx, stillOwned, ticket)
98-
if err != nil {
99-
return false, xerrors.Errorf("failed to participate in F3: %w", err)
100-
}
101-
if !participating {
102-
return false, xerrors.Errorf("failed to participate in F3: not participating")
103-
}
104-
94+
// Store the ticket in the database
10595
_, err = f.db.Exec(ctx, "UPDATE f3_tasks SET previous_ticket = $1 WHERE task_id = $2", ticket, taskID)
10696
if err != nil {
10797
return false, xerrors.Errorf("failed to update previous ticket: %w", err)
10898
}
10999

110-
err = f.awaitLeaseExpiry(ctx, stillOwned, lease)
100+
// Start participation loop
101+
err = f.participateLoop(ctx, stillOwned, ticket)
111102
if err != nil {
112-
return false, xerrors.Errorf("failed to await lease expiry: %w", err)
103+
return false, xerrors.Errorf("failed during participation loop: %w", err)
113104
}
105+
// When participateLoop returns, we go back to get a new ticket
114106
}
115107

116108
return false, xerrors.Errorf("f3 task is background task")
117109
}
118110

119111
func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) {
120112
for stillOwned() {
121-
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, ParticipationLeaseTerm); {
113+
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, f.leaseTerm); {
122114
case ctx.Err() != nil:
123115
return api.F3ParticipationTicket{}, ctx.Err()
124116
case errors.Is(err, api.ErrF3Disabled):
@@ -136,99 +128,56 @@ func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned fun
136128
return api.F3ParticipationTicket{}, ctx.Err()
137129
}
138130

139-
func (f *F3Task) tryF3Participate(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
131+
func (f *F3Task) participateLoop(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) error {
132+
renewLeaseWithin := f.leaseTerm / 2
133+
var (
134+
haveLease bool
135+
)
140136
for stillOwned() {
141-
switch lease, err := f.api.F3Participate(ctx, ticket); {
137+
lease, err := f.api.F3Participate(ctx, ticket)
138+
switch {
142139
case ctx.Err() != nil:
143-
return api.F3ParticipationLease{}, false, ctx.Err()
140+
return ctx.Err()
144141
case errors.Is(err, api.ErrF3Disabled):
145142
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
146-
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
143+
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
147144
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
148145
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "err", err)
149-
return api.F3ParticipationLease{}, false, nil
146+
return nil // Return to get a new ticket
150147
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
151148
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "err", err)
152-
return api.F3ParticipationLease{}, false, nil
149+
return nil // Return to get a new ticket
153150
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
154151
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "err", err)
155152
time.Sleep(1 * time.Second)
156-
return api.F3ParticipationLease{}, false, nil
153+
return nil // Return to get a new ticket
157154
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
158-
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "err", err)
155+
log.Warnw("Node is not the issuer of F3 participation ticket. Miner may be load-balancing or node has changed. Retrying F3 participation.", "err", err)
159156
time.Sleep(1 * time.Second)
160157
continue
161158
case errors.Is(err, api.ErrF3NotReady):
162-
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "err", err)
159+
log.Warnw("F3 is not ready. Retrying F3 participation.", "err", err)
163160
time.Sleep(30 * time.Second)
164161
continue
165162
case err != nil:
166163
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "err", err)
167-
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
164+
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
165+
case lease.ValidityTerm <= renewLeaseWithin:
166+
return nil // Return to get a new ticket
168167
default:
169-
log.Infow("Successfully acquired F3 participation lease.",
170-
"issuer", lease.Issuer,
171-
"not-before", lease.FromInstance,
172-
"not-after", lease.ToInstance(),
173-
)
174-
return lease, true, nil
175-
}
176-
}
177-
return api.F3ParticipationLease{}, false, ctx.Err()
178-
}
179-
180-
func (f *F3Task) awaitLeaseExpiry(ctx context.Context, stillOwned func() bool, lease api.F3ParticipationLease) error {
181-
backoff := &backoff.Backoff{
182-
Min: 1 * time.Second,
183-
Max: 1 * time.Minute,
184-
Factor: 1.5,
185-
}
186-
187-
renewLeaseWithin := f.leaseTerm / 2
188-
for stillOwned() {
189-
manifest, err := f.api.F3GetManifest(ctx)
190-
switch {
191-
case errors.Is(err, api.ErrF3Disabled):
192-
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
193-
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
194-
case err != nil:
195-
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
196-
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
197-
return nil
198-
}
199-
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
200-
time.Sleep(backoff.Duration())
201-
continue
202-
case manifest == nil || manifest.NetworkName != lease.Network:
203-
// If we got an unexpected manifest, or no manifest, go back to the
204-
// beginning and try to get another ticket. Switching from having a manifest
205-
// to having no manifest can theoretically happen if the lotus node reboots
206-
// and has no static manifest.
207-
return nil
208-
}
209-
switch progress, err := f.api.F3GetProgress(ctx); {
210-
case errors.Is(err, api.ErrF3Disabled):
211-
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
212-
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
213-
case err != nil:
214-
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
215-
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
216-
return nil
168+
// Successfully participated
169+
if !haveLease {
170+
log.Infow("Successfully acquired F3 participation lease.",
171+
"issuer", lease.Issuer,
172+
"not-before", lease.FromInstance,
173+
"not-after", lease.ToInstance(),
174+
)
175+
haveLease = true
217176
}
218-
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
219-
time.Sleep(backoff.Duration())
220-
case progress.ID+renewLeaseWithin >= lease.ToInstance():
221-
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
222-
return nil
223-
default:
224-
remainingInstanceLease := lease.ToInstance() - progress.ID
225-
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
226-
if waitTime == 0 {
227-
waitTime = 100 * time.Millisecond
228-
}
229-
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
230-
time.Sleep(waitTime)
231177
}
178+
179+
log.Debugf("F3 participation lease is valid for further %d instances.", lease.ValidityTerm)
180+
time.Sleep(time.Second * 5)
232181
}
233182
return ctx.Err()
234183
}

0 commit comments

Comments
 (0)