Skip to content

Commit 99d31f1

Browse files
authored
Make Snap sector picker deadline-aware (#173)
* snap: Avoid picking sectors in immutable deadlines * snap: Use row iterator for candidates * snap: Don't submit in immutable deadlines * snap: Clone rows when not needed
1 parent ffd07e1 commit 99d31f1

File tree

9 files changed

+219
-25
lines changed

9 files changed

+219
-25
lines changed

harmony/harmonydb/sql/20240425-sector_meta.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ CREATE TABLE sectors_meta (
2323
-- is_cc BOOLEAN NOT NULL DEFAULT (complex condition),
2424
-- expiration_epoch BIGINT, (null = not crawled)
2525

26+
-- Added in 20240826-sector-partition.sql
27+
-- deadline BIGINT, (null = not crawled)
28+
-- partition BIGINT, (null = not crawled)
29+
2630
PRIMARY KEY (sp_id, sector_num)
2731
);
2832

harmony/harmonydb/sql/20240611-snap-pipeline.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ CREATE TABLE sectors_snap_pipeline (
4747
-- failed_reason varchar(20) not null default '',
4848
-- failed_reason_msg text not null default '',
4949

50+
-- added in 20240826-sector-partition.sql
51+
-- when not null delays scheduling of the submit task
52+
-- submit_after TIMESTAMP WITH TIME ZONE,
53+
5054
FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_meta (sp_id, sector_num),
5155
PRIMARY KEY (sp_id, sector_number)
5256
);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE sectors_meta ADD COLUMN deadline BIGINT;
2+
ALTER TABLE sectors_meta ADD COLUMN partition BIGINT;
3+
4+
-- index on deadline/partition/spid/sectornum
5+
CREATE INDEX sectors_meta_deadline_partition_spid_sectornum_index ON sectors_meta(deadline, partition, sp_id, sector_num);
6+
7+
-- schedule delay in case the sector got into an immutable deadline
8+
ALTER TABLE sectors_snap_pipeline ADD COLUMN submit_after TIMESTAMP WITH TIME ZONE;
9+
10+
-- force sector metadata refresh
11+
DELETE FROM harmony_task_singletons WHERE task_name = 'SectorMetadata' and task_id IS NULL;

lib/curiochain/epoch.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package curiochain
2+
3+
import (
4+
"time"
5+
6+
"github.com/filecoin-project/go-state-types/abi"
7+
8+
"github.com/filecoin-project/lotus/build/buildconstants"
9+
"github.com/filecoin-project/lotus/chain/types"
10+
)
11+
12+
func EpochTime(curr *types.TipSet, e abi.ChainEpoch) time.Time {
13+
diff := int64(buildconstants.BlockDelaySecs) * int64(curr.Height()-e)
14+
curTs := curr.MinTimestamp() // unix seconds
15+
16+
return time.Unix(int64(curTs)+diff, 0)
17+
}

market/deal_ingest_seal.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/filecoin-project/go-padreader"
1717
"github.com/filecoin-project/go-state-types/abi"
1818
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
19+
"github.com/filecoin-project/go-state-types/dline"
1920
"github.com/filecoin-project/go-state-types/network"
2021

2122
"github.com/filecoin-project/curio/harmony/harmonydb"
@@ -46,6 +47,10 @@ type PieceIngesterApi interface {
4647
StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error)
4748
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
4849
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
50+
51+
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error)
52+
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error)
53+
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
4954
}
5055

5156
type openSector struct {

market/deal_ingest_snap.go

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ import (
3131

3232
const IdealEndEpochBuffer = 2 * builtin.EpochsInDay
3333

34+
// assuming that snap takes up to 20min to get to submitting the message we want to avoid sectors from deadlines which will
35+
// become immutable in the next 20min (40 epochs)
36+
// NOTE: Don't set this value to more than one deadline (60 epochs)
37+
var SnapImmutableDeadlineEpochsBuffer = abi.ChainEpoch(40)
38+
3439
type PieceIngesterSnap struct {
3540
ctx context.Context
3641
db *harmonydb.DB
@@ -276,6 +281,25 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
276281
}
277282
}
278283

284+
// non-mutable deadline is the current deadline and the next one. Doesn't matter if the current one was proven or not.
285+
286+
curDeadline, err := p.api.StateMinerProvingDeadline(ctx, p.miner, types.EmptyTSK)
287+
if err != nil {
288+
return api.SectorOffset{}, xerrors.Errorf("getting proving deadline: %w", err)
289+
}
290+
291+
dlIdxImmutableCur := curDeadline.Index
292+
dlIdxImmutableNext := (curDeadline.Index + 1) % curDeadline.WPoStPeriodDeadlines
293+
294+
// The deadline which might become mutable soon
295+
dlIdxImmutableNextNext := dlIdxImmutableNext
296+
epochsInDeadline := curDeadline.CurrentEpoch - curDeadline.Open // how many into the deadline we are
297+
epochsPerDeadline := curDeadline.WPoStProvingPeriod / abi.ChainEpoch(curDeadline.WPoStPeriodDeadlines)
298+
299+
if epochsInDeadline >= epochsPerDeadline-SnapImmutableDeadlineEpochsBuffer {
300+
dlIdxImmutableNextNext = (dlIdxImmutableNext + 1) % curDeadline.WPoStPeriodDeadlines
301+
}
302+
279303
// Allocation to open sector failed, create a new sector and add the piece to it
280304

281305
// TX
@@ -291,15 +315,15 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
291315

292316
var num *int64
293317
_, err = p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
294-
var candidates []struct {
318+
type CandidateSector struct {
295319
Sector int64 `db:"sector_num"`
296320
Expiration int64 `db:"expiration_epoch"`
297321
}
298322

299323
// maxExpiration = maybe(max sector expiration_epoch)
300324
// minExpiration = piece.DealSchedule.EndEpoch
301325
// ideal expiration = minExpiration + 2 days
302-
err = tx.Select(&candidates, `
326+
rows, err := tx.Query(`
303327
SELECT sm.sector_num, sm.expiration_epoch
304328
FROM sectors_meta sm
305329
LEFT JOIN sectors_snap_pipeline ssp on sm.sp_id = ssp.sp_id and sm.sector_num = ssp.sector_number
@@ -309,28 +333,83 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
309333
AND sm.expiration_epoch IS NOT NULL
310334
AND sm.expiration_epoch > $1
311335
AND ($2 = 0 OR sm.expiration_epoch < $2)
336+
AND deadline IS NOT NULL AND deadline NOT IN ($5, $6, $7)
312337
ORDER BY ABS(sm.expiration_epoch - ($1 + $3))
313-
LIMIT 10
314-
`, int64(piece.DealSchedule.EndEpoch), maxExpiration, IdealEndEpochBuffer, p.mid)
338+
`, int64(piece.DealSchedule.EndEpoch), maxExpiration, IdealEndEpochBuffer, p.mid, dlIdxImmutableCur, dlIdxImmutableNext, dlIdxImmutableNextNext)
315339
if err != nil {
316340
return false, xerrors.Errorf("allocating sector numbers: %w", err)
317341
}
342+
defer rows.Close()
318343

319-
if len(candidates) == 0 {
344+
deadlineCache := map[uint64][]api.Partition{}
345+
var tried int
346+
var bestCandidate *CandidateSector
347+
348+
for rows.Next() {
349+
var candidate CandidateSector
350+
err := rows.Scan(&candidate.Sector, &candidate.Expiration)
351+
if err != nil {
352+
return false, xerrors.Errorf("scanning row: %w", err)
353+
}
354+
tried++
355+
356+
sloc, err := p.api.StateSectorPartition(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
357+
if err != nil {
358+
return false, xerrors.Errorf("getting sector locations: %w", err)
359+
}
360+
361+
if _, ok := deadlineCache[sloc.Deadline]; !ok {
362+
dls, err := p.api.StateMinerPartitions(ctx, p.miner, sloc.Deadline, types.EmptyTSK)
363+
if err != nil {
364+
return false, xerrors.Errorf("getting partitions: %w", err)
365+
}
366+
367+
deadlineCache[sloc.Deadline] = dls
368+
}
369+
370+
dl := deadlineCache[sloc.Deadline]
371+
if len(dl) <= int(sloc.Partition) {
372+
return false, xerrors.Errorf("partition %d not found in deadline %d", sloc.Partition, sloc.Deadline)
373+
}
374+
part := dl[sloc.Partition]
375+
376+
active, err := part.ActiveSectors.IsSet(uint64(candidate.Sector))
377+
if err != nil {
378+
return false, xerrors.Errorf("checking active sectors: %w", err)
379+
}
380+
if !active {
381+
live, err1 := part.LiveSectors.IsSet(uint64(candidate.Sector))
382+
faulty, err2 := part.FaultySectors.IsSet(uint64(candidate.Sector))
383+
recovering, err3 := part.RecoveringSectors.IsSet(uint64(candidate.Sector))
384+
if err1 != nil || err2 != nil || err3 != nil {
385+
return false, xerrors.Errorf("checking sector status: %w, %w, %w", err1, err2, err3)
386+
}
387+
388+
log.Debugw("sector not active, skipping", "sector", candidate.Sector, "live", live, "faulty", faulty, "recovering", recovering)
389+
continue
390+
}
391+
392+
bestCandidate = &candidate
393+
break
394+
}
395+
396+
if err := rows.Err(); err != nil {
397+
return false, xerrors.Errorf("iterating rows: %w", err)
398+
}
399+
400+
rows.Close()
401+
402+
if bestCandidate == nil {
320403
minEpoch := piece.DealSchedule.EndEpoch
321404
maxEpoch := abi.ChainEpoch(maxExpiration)
322405

323406
minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay
324407
maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay
325408

326-
return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays)
409+
return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d (avoiding deadlines %d,%d,%d)", minEpoch, maxEpoch, minEpochDays, maxEpochDays, dlIdxImmutableCur, dlIdxImmutableNext, dlIdxImmutableNextNext)
327410
}
328411

329-
// todo - nice to have:
330-
// * check sector liveness
331-
// * check deadline mutable
332-
333-
candidate := candidates[0] // this one works best
412+
candidate := *bestCandidate
334413

335414
si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
336415
if err != nil {
@@ -356,6 +435,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
356435
"dealStartEpoch", piece.DealSchedule.StartEpoch,
357436
"dealEndEpoch", piece.DealSchedule.EndEpoch,
358437
"maxExpiration", maxExpiration,
438+
"avoidingDeadlines", []int{int(dlIdxImmutableCur), int(dlIdxImmutableNext), int(dlIdxImmutableNextNext)},
359439
)
360440

361441
_, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,

market/lmrpc/lmrpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig, ssiz
581581
}
582582

583583
if cfg.MaxQueueDealSector != 0 && waitDealSectors+sectors > cfg.MaxQueueDealSector {
584-
log.Infow("backpressure", "reason", "too many wait deal sectors", "wait_deal_sectors", waitDealSectors, "max", cfg.MaxQueueDealSector)
584+
log.Infow("backpressure", "reason", "too many wait deal sectors", "wait_deal_sectors", waitDealSectors, "parking-sectors", sectors, "parking-pieces", len(pieceSizes), "max", cfg.MaxQueueDealSector)
585585
return true, nil
586586
}
587587

tasks/metadata/task_sector_expirations.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const SectorMetadataRefreshInterval = 191 * time.Minute
2424

2525
type SectorMetadataNodeAPI interface {
2626
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
27+
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error)
2728
}
2829

2930
type SectorMetadata struct {
@@ -49,22 +50,26 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
4950
SectorNum uint64 `db:"sector_num"`
5051

5152
Expiration *uint64 `db:"expiration_epoch"`
53+
54+
Partition *uint64 `db:"partition"`
55+
Deadline *uint64 `db:"deadline"`
5256
}
5357

54-
if err := s.db.Select(ctx, &sectors, "select sp_id, sector_num, expiration_epoch from sectors_meta ORDER BY sp_id, sector_num"); err != nil {
58+
if err := s.db.Select(ctx, &sectors, "select sp_id, sector_num, expiration_epoch, partition, deadline from sectors_meta ORDER BY sp_id, sector_num"); err != nil {
5559
return false, xerrors.Errorf("get sector list: %w", err)
5660
}
5761

5862
astor := adt.WrapStore(ctx, cbor.NewCborStore(s.bstore))
5963
minerStates := map[abi.ActorID]miner.State{}
6064

6165
for _, sector := range sectors {
66+
maddr, err := address.NewIDAddress(sector.SpID)
67+
if err != nil {
68+
return false, xerrors.Errorf("creating miner address: %w", err)
69+
}
70+
6271
mstate, ok := minerStates[abi.ActorID(sector.SpID)]
6372
if !ok {
64-
maddr, err := address.NewIDAddress(sector.SpID)
65-
if err != nil {
66-
return false, xerrors.Errorf("creating miner address: %w", err)
67-
}
6873

6974
act, err := s.api.StateGetActor(ctx, maddr, types.EmptyTSK)
7075
if err != nil {
@@ -93,6 +98,20 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
9398
return false, xerrors.Errorf("updating sector expiration: %w", err)
9499
}
95100
}
101+
102+
if sector.Partition == nil || sector.Deadline == nil {
103+
loc, err := s.api.StateSectorPartition(ctx, maddr, abi.SectorNumber(sector.SectorNum), types.EmptyTSK)
104+
if err != nil {
105+
return false, xerrors.Errorf("getting sector partition: %w", err)
106+
}
107+
108+
if loc != nil {
109+
_, err := s.db.Exec(ctx, "update sectors_meta set partition = $1, deadline = $2 where sp_id = $3 and sector_num = $4", loc.Partition, loc.Deadline, sector.SpID, sector.SectorNum)
110+
if err != nil {
111+
return false, xerrors.Errorf("updating sector partition: %w", err)
112+
}
113+
}
114+
}
96115
}
97116

98117
return true, nil

0 commit comments

Comments
 (0)