Skip to content

Commit 7d5a249

Browse files
magik6krvagg
authored andcommitted
feat: CC Sector scheduler (#582)
* sector sched UI * confirm btn * integrate into supraseal * fix table * make gen * don't start batch seal with too-few sectors * rename scheduler upsert rpc to edit
1 parent 0e39fd0 commit 7d5a249

File tree

6 files changed

+409
-8
lines changed

6 files changed

+409
-8
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE TABLE sectors_cc_scheduler (
2+
sp_id BIGINT NOT NULL,
3+
to_seal BIGINT NOT NULL,
4+
weight BIGINT NOT NULL DEFAULT 1000,
5+
duration_days BIGINT NOT NULL DEFAULT 182,
6+
enabled BOOLEAN NOT NULL DEFAULT TRUE,
7+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
8+
PRIMARY KEY (sp_id)
9+
);

tasks/sealsupra/task_supraseal.go

Lines changed: 134 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ import (
1616
"golang.org/x/xerrors"
1717

1818
"github.com/filecoin-project/go-address"
19+
"github.com/filecoin-project/go-bitfield"
1920
"github.com/filecoin-project/go-commp-utils/zerocomm"
2021
commcid "github.com/filecoin-project/go-fil-commcid"
2122
"github.com/filecoin-project/go-state-types/abi"
23+
"github.com/filecoin-project/go-state-types/builtin"
24+
miner12 "github.com/filecoin-project/go-state-types/builtin/v12/miner"
2225
"github.com/filecoin-project/go-state-types/crypto"
2326

2427
"github.com/filecoin-project/curio/harmony/harmonydb"
@@ -44,6 +47,7 @@ var log = logging.Logger("batchseal")
4447
type SupraSealNodeAPI interface {
4548
ChainHead(context.Context) (*types.TipSet, error)
4649
StateGetRandomnessFromTickets(context.Context, crypto.DomainSeparationTag, abi.ChainEpoch, []byte, types.TipSetKey) (abi.Randomness, error)
50+
StateMinerAllocated(context.Context, address.Address, types.TipSetKey) (*bitfield.BitField, error)
4751
}
4852

4953
type SupraSeal struct {
@@ -530,6 +534,12 @@ func (s *SupraSeal) TypeDetails() harmonytask.TaskTypeDetails {
530534
func (s *SupraSeal) Adder(taskFunc harmonytask.AddTaskFunc) {
531535
}
532536

537+
type sectorClaim struct {
538+
SpID int64 `db:"sp_id"`
539+
SectorNumber int64 `db:"sector_number"`
540+
TaskIDSDR *int64 `db:"task_id_sdr"`
541+
}
542+
533543
func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error {
534544
if s.slots.Available() == 0 {
535545
return nil
@@ -542,12 +552,8 @@ func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error {
542552

543553
taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
544554
// claim [sectors] pipeline entries
545-
var sectors []struct {
546-
SpID int64 `db:"sp_id"`
547-
SectorNumber int64 `db:"sector_number"`
548-
TaskIDSDR *int64 `db:"task_id_sdr"`
549-
}
550555

556+
var sectors []sectorClaim
551557
err := tx.Select(&sectors, `SELECT sp_id, sector_number, task_id_sdr FROM sectors_sdr_pipeline
552558
LEFT JOIN harmony_task ht on sectors_sdr_pipeline.task_id_sdr = ht.id
553559
WHERE after_sdr = FALSE AND (task_id_sdr IS NULL OR (ht.owner_id IS NULL AND ht.name = 'SDR')) LIMIT $1`, s.sectors)
@@ -558,9 +564,18 @@ func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error {
558564
log.Infow("got sectors, maybe schedule", "sectors", len(sectors), "s.sectors", s.sectors)
559565

560566
if len(sectors) != s.sectors {
561-
// not enough sectors to fill a batch
562-
log.Infow("not enough sectors to fill a batch", "sectors", len(sectors))
563-
return false, nil
567+
// not enough sectors to fill a batch, use CC scheduler
568+
log.Infow("not enough sectors to fill a batch, using CC scheduler", "sectors", len(sectors))
569+
addSectors, err := s.claimsFromCCScheduler(tx, int64(s.sectors-len(sectors)))
570+
if err != nil {
571+
return false, xerrors.Errorf("getting CC scheduler claims: %w", err)
572+
}
573+
sectors = append(sectors, addSectors...)
574+
log.Infow("got CC scheduler claims", "sectors", len(sectors))
575+
}
576+
577+
if len(sectors) != s.sectors {
578+
return false, xerrors.Errorf("not enough sectors to fill a batch %d != %d", len(sectors), s.sectors)
564579
}
565580

566581
// assign to pipeline entries, set task_id_sdr, task_id_tree_r, task_id_tree_c
@@ -585,6 +600,117 @@ func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error {
585600
return nil
586601
}
587602

603+
func (s *SupraSeal) claimsFromCCScheduler(tx *harmonydb.Tx, toSeal int64) ([]sectorClaim, error) {
604+
var enabledSchedules []struct {
605+
SpID int64 `db:"sp_id"`
606+
ToSeal int64 `db:"to_seal"`
607+
Weight int64 `db:"weight"`
608+
DurationDays int64 `db:"duration_days"`
609+
}
610+
611+
err := tx.Select(&enabledSchedules, `SELECT sp_id, to_seal, weight, duration_days FROM sectors_cc_scheduler WHERE enabled = TRUE AND weight > 0 ORDER BY weight DESC`)
612+
if err != nil {
613+
return nil, xerrors.Errorf("getting enabled schedules: %w", err)
614+
}
615+
616+
if len(enabledSchedules) == 0 {
617+
return nil, nil
618+
}
619+
620+
var totalWeight, totalToSeal int64
621+
for _, schedule := range enabledSchedules {
622+
totalWeight += schedule.Weight
623+
totalToSeal += schedule.ToSeal
624+
}
625+
626+
if totalToSeal < toSeal {
627+
log.Debugw("not enough sectors to fill a batch from CC scheduler", "totalToSeal", totalToSeal, "toSeal", toSeal)
628+
return nil, nil
629+
}
630+
631+
// Calculate proportional allocation based on weights
632+
var outClaims []sectorClaim
633+
remainingToSeal := toSeal
634+
635+
for i, schedule := range enabledSchedules {
636+
if remainingToSeal <= 0 {
637+
break
638+
}
639+
640+
// Calculate how many sectors this SP should get based on weight
641+
var sectorsForSP int64
642+
if i == len(enabledSchedules)-1 {
643+
// Last SP gets the remaining sectors
644+
sectorsForSP = remainingToSeal
645+
} else {
646+
// Proportional allocation based on weight
647+
sectorsForSP = (toSeal * schedule.Weight) / totalWeight
648+
if sectorsForSP > schedule.ToSeal {
649+
sectorsForSP = schedule.ToSeal
650+
}
651+
if sectorsForSP > remainingToSeal {
652+
sectorsForSP = remainingToSeal
653+
}
654+
}
655+
656+
if sectorsForSP == 0 {
657+
continue
658+
}
659+
660+
// Allocate sector numbers for this SP
661+
maddr, err := address.NewIDAddress(uint64(schedule.SpID))
662+
if err != nil {
663+
return nil, xerrors.Errorf("getting miner address for %d: %w", schedule.SpID, err)
664+
}
665+
666+
sectorNumbers, err := seal.AllocateSectorNumbers(context.Background(), s.api, tx, maddr, int(sectorsForSP))
667+
if err != nil {
668+
return nil, xerrors.Errorf("allocating sector numbers for %d: %w", schedule.SpID, err)
669+
}
670+
671+
// Create sector claims for allocated sectors
672+
for _, sectorNum := range sectorNumbers {
673+
outClaims = append(outClaims, sectorClaim{
674+
SpID: schedule.SpID,
675+
SectorNumber: int64(sectorNum),
676+
TaskIDSDR: nil, // New sector, no existing task
677+
})
678+
679+
userDuration := int64(schedule.DurationDays) * builtin.EpochsInDay
680+
681+
if miner12.MaxSectorExpirationExtension < userDuration {
682+
return nil, xerrors.Errorf("duration exceeds max allowed: %d > %d", userDuration, miner12.MaxSectorExpirationExtension)
683+
}
684+
if miner12.MinSectorExpiration > userDuration {
685+
return nil, xerrors.Errorf("duration is too short: %d < %d", userDuration, miner12.MinSectorExpiration)
686+
}
687+
688+
// Insert new sector into sectors_sdr_pipeline
689+
_, err := tx.Exec(`INSERT INTO sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs)
690+
VALUES ($1, $2, $3, $4)`,
691+
schedule.SpID, sectorNum, s.spt, userDuration)
692+
if err != nil {
693+
return nil, xerrors.Errorf("inserting new sector %d for SP %d: %w", sectorNum, schedule.SpID, err)
694+
}
695+
}
696+
697+
// Update the to_seal count for this SP
698+
_, err = tx.Exec(`UPDATE sectors_cc_scheduler SET to_seal = to_seal - $1 WHERE sp_id = $2`, sectorsForSP, schedule.SpID)
699+
if err != nil {
700+
return nil, xerrors.Errorf("updating to_seal for SP %d: %w", schedule.SpID, err)
701+
}
702+
703+
remainingToSeal -= sectorsForSP
704+
log.Debugw("allocated sectors from CC scheduler", "sp_id", schedule.SpID, "count", sectorsForSP, "remaining", remainingToSeal, "totalWeight", totalWeight, "totalToSeal", totalToSeal)
705+
}
706+
707+
if len(outClaims) != int(toSeal) {
708+
return nil, xerrors.Errorf("failed to allocate expected number of sectors: got %d, wanted %d", len(outClaims), toSeal)
709+
}
710+
711+
return outClaims, nil
712+
}
713+
588714
func (s *SupraSeal) taskToSectors(id harmonytask.TaskID) ([]ffi.SectorRef, error) {
589715
var sectors []ffi.SectorRef
590716

web/api/webrpc/sector.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,95 @@ func (a *WebRPC) SectorRestart(ctx context.Context, spid, id int64) error {
820820
return nil
821821
}
822822

823+
type SectorCCScheduler struct {
824+
SpID int64
825+
ToSeal int64
826+
Weight int64
827+
DurationDays int64
828+
Enabled bool
829+
830+
// computed
831+
SPAddress string
832+
SectorSize int64
833+
RequestedSize string
834+
}
835+
836+
func (a *WebRPC) SectorCCScheduler(ctx context.Context) ([]SectorCCScheduler, error) {
837+
var rows []struct {
838+
SpID int64 `db:"sp_id"`
839+
ToSeal int64 `db:"to_seal"`
840+
Weight int64 `db:"weight"`
841+
DurationDays int64 `db:"duration_days"`
842+
Enabled bool `db:"enabled"`
843+
}
844+
845+
err := a.deps.DB.Select(ctx, &rows, `SELECT sp_id, to_seal, weight, duration_days, enabled FROM sectors_cc_scheduler ORDER BY sp_id`)
846+
if err != nil {
847+
return nil, xerrors.Errorf("failed to list cc scheduler entries: %w", err)
848+
}
849+
850+
out := make([]SectorCCScheduler, 0, len(rows))
851+
for _, r := range rows {
852+
addr := must.One(address.NewIDAddress(uint64(r.SpID)))
853+
mi, err := a.deps.Chain.StateMinerInfo(ctx, addr, types.EmptyTSK)
854+
if err != nil {
855+
return nil, xerrors.Errorf("failed to get miner info for %s: %w", addr, err)
856+
}
857+
out = append(out, SectorCCScheduler{
858+
SpID: r.SpID,
859+
ToSeal: r.ToSeal,
860+
Weight: r.Weight,
861+
DurationDays: r.DurationDays,
862+
Enabled: r.Enabled,
863+
SPAddress: addr.String(),
864+
SectorSize: int64(mi.SectorSize),
865+
RequestedSize: types.SizeStr(big.Mul(big.NewInt(r.ToSeal), big.NewInt(int64(mi.SectorSize)))),
866+
})
867+
}
868+
return out, nil
869+
}
870+
871+
func (a *WebRPC) SectorCCSchedulerEdit(ctx context.Context, sp string, toSeal int64, weight int64, durationDays int64, enabled bool) error {
872+
spaddr, err := address.NewFromString(sp)
873+
if err != nil {
874+
return xerrors.Errorf("invalid sp address: %w", err)
875+
}
876+
spid, err := address.IDFromAddress(spaddr)
877+
if err != nil {
878+
return xerrors.Errorf("id from sp address: %w", err)
879+
}
880+
881+
if toSeal < 0 {
882+
return xerrors.Errorf("toSeal cannot be negative")
883+
}
884+
if weight <= 0 {
885+
return xerrors.Errorf("weight must be positive")
886+
}
887+
888+
_, err = a.deps.DB.Exec(ctx, `INSERT INTO sectors_cc_scheduler (sp_id, to_seal, weight, duration_days, enabled) VALUES ($1, $2, $3, $4, $5)
889+
ON CONFLICT (sp_id) DO UPDATE SET to_seal = EXCLUDED.to_seal, weight = EXCLUDED.weight, duration_days = EXCLUDED.duration_days, enabled = EXCLUDED.enabled`, spid, toSeal, weight, durationDays, enabled)
890+
if err != nil {
891+
return xerrors.Errorf("failed to upsert cc scheduler: %w", err)
892+
}
893+
return nil
894+
}
895+
896+
func (a *WebRPC) SectorCCSchedulerDelete(ctx context.Context, sp string) error {
897+
spaddr, err := address.NewFromString(sp)
898+
if err != nil {
899+
return xerrors.Errorf("invalid sp address: %w", err)
900+
}
901+
spid, err := address.IDFromAddress(spaddr)
902+
if err != nil {
903+
return xerrors.Errorf("id from sp address: %w", err)
904+
}
905+
_, err = a.deps.DB.Exec(ctx, `DELETE FROM sectors_cc_scheduler WHERE sp_id = $1`, spid)
906+
if err != nil {
907+
return xerrors.Errorf("failed to delete cc scheduler entry: %w", err)
908+
}
909+
return nil
910+
}
911+
823912
func derefOrZero[T any](a *T) T {
824913
if a == nil {
825914
return *new(T)

0 commit comments

Comments
 (0)