Skip to content

Commit 83fa0c5

Browse files
authored
account for inter-piece padding in ingest (#305)
1 parent 4440241 commit 83fa0c5

File tree

2 files changed

+37
-11
lines changed

2 files changed

+37
-11
lines changed

market/deal_ingest_seal.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/filecoin-project/lotus/api"
2626
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
27+
"github.com/filecoin-project/lotus/chain/proofs"
2728
"github.com/filecoin-project/lotus/chain/types"
2829
lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece"
2930
)
@@ -55,7 +56,7 @@ type PieceIngesterApi interface {
5556

5657
type openSector struct {
5758
number abi.SectorNumber
58-
currentSize abi.PaddedPieceSize
59+
offset abi.UnpaddedPieceSize
5960
earliestStartEpoch abi.ChainEpoch
6061
index uint64
6162
openedAt *time.Time
@@ -143,7 +144,7 @@ func (p *PieceIngester) Seal() error {
143144
// 1. If sector is full
144145
// 2. We have been waiting for MaxWaitDuration
145146
// 3. StartEpoch is less than 8 hours // todo: make this config?
146-
if sector.currentSize == abi.PaddedPieceSize(p.sectorSize) {
147+
if sector.offset.Padded() == abi.PaddedPieceSize(p.sectorSize) {
147148
log.Debugf("start sealing sector %d of miner %d: %s", sector.number, p.miner.String(), "sector full")
148149
return true
149150
}
@@ -341,7 +342,12 @@ func (p *PieceIngester) allocateToExisting(ctx context.Context, piece lpiece.Pie
341342

342343
for _, sec := range openSectors {
343344
sec := sec
344-
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
345+
offset := sec.offset
346+
// Account for inter-piece padding
347+
_, padLength := proofs.GetRequiredPadding(offset.Padded(), psize)
348+
if offset.Padded()+padLength+psize < abi.PaddedPieceSize(p.sectorSize) {
349+
offset += padLength.Unpadded()
350+
345351
if vd.isVerified {
346352
sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch
347353
// Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more
@@ -352,7 +358,7 @@ func (p *PieceIngester) allocateToExisting(ctx context.Context, piece lpiece.Pie
352358
}
353359

354360
ret.Sector = sec.number
355-
ret.Offset = sec.currentSize
361+
ret.Offset = offset.Padded()
356362

357363
// Insert market deal to DB for the sector
358364
if piece.DealProposal != nil {
@@ -522,23 +528,31 @@ func (p *PieceIngester) getOpenSectors(tx *harmonydb.Tx) ([]*openSector, error)
522528
pi := pi
523529
sector, ok := sectorMap[pi.Sector]
524530
if !ok {
531+
// Consider padding
532+
var offset abi.UnpaddedPieceSize
533+
_, padLength := proofs.GetRequiredPadding(offset.Padded(), pi.Size)
534+
offset += padLength.Unpadded()
535+
offset += pi.Size.Unpadded()
525536
sectorMap[pi.Sector] = &openSector{
526537
number: pi.Sector,
527-
currentSize: pi.Size,
528538
earliestStartEpoch: getStartEpoch(pi.StartEpoch, 0),
529539
index: pi.Index,
530540
openedAt: pi.CreatedAt,
531541
latestEndEpoch: getEndEpoch(pi.EndEpoch, 0),
542+
offset: offset,
532543
}
533544
continue
534545
}
535-
sector.currentSize += pi.Size
536546
sector.earliestStartEpoch = getStartEpoch(pi.StartEpoch, sector.earliestStartEpoch)
537547
sector.latestEndEpoch = getEndEpoch(pi.EndEpoch, sector.earliestStartEpoch)
538548
if sector.index < pi.Index {
539549
sector.index = pi.Index
540550
}
541551
sector.openedAt = getOpenedAt(pi, sector.openedAt)
552+
// Consider padding
553+
_, padLength := proofs.GetRequiredPadding(sector.offset.Padded(), pi.Size)
554+
sector.offset += padLength.Unpadded()
555+
sector.offset += pi.Size.Unpadded()
542556
}
543557

544558
var os []*openSector

market/deal_ingest_snap.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
2626
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2727
verifregtypes "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
28+
"github.com/filecoin-project/lotus/chain/proofs"
2829
"github.com/filecoin-project/lotus/chain/types"
2930
lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece"
3031
)
@@ -110,7 +111,7 @@ func (p *PieceIngesterSnap) Seal() error {
110111
// 1. If sector is full
111112
// 2. We have been waiting for MaxWaitDuration
112113
// 3. StartEpoch is less than 8 hours // todo: make this config?
113-
if sector.currentSize == abi.PaddedPieceSize(p.sectorSize) {
114+
if sector.offset.Padded() == abi.PaddedPieceSize(p.sectorSize) {
114115
log.Debugf("start sealing sector %d of miner %d: %s", sector.number, p.miner.String(), "sector full")
115116
return true
116117
}
@@ -493,7 +494,10 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
493494

494495
for _, sec := range openSectors {
495496
sec := sec
496-
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
497+
offset := sec.offset
498+
// Account for inter-piece padding
499+
_, padLength := proofs.GetRequiredPadding(offset.Padded(), psize)
500+
if offset.Padded()+padLength+psize < abi.PaddedPieceSize(p.sectorSize) {
497501
if vd.isVerified {
498502
si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK)
499503
if err != nil {
@@ -515,7 +519,7 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
515519
}
516520

517521
ret.Sector = sec.number
518-
ret.Offset = sec.currentSize
522+
ret.Offset = offset.Padded()
519523

520524
// Insert DDO deal to DB for the sector
521525
cn, err := tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
@@ -659,23 +663,31 @@ func (p *PieceIngesterSnap) getOpenSectors(tx *harmonydb.Tx) ([]*openSector, err
659663
pi := pi
660664
sector, ok := sectorMap[pi.Sector]
661665
if !ok {
666+
// Consider padding
667+
var offset abi.UnpaddedPieceSize
668+
_, padLength := proofs.GetRequiredPadding(offset.Padded(), pi.Size)
669+
offset += padLength.Unpadded()
670+
offset += pi.Size.Unpadded()
662671
sectorMap[pi.Sector] = &openSector{
663672
number: pi.Sector,
664-
currentSize: pi.Size,
665673
earliestStartEpoch: getStartEpoch(pi.StartEpoch, 0),
666674
index: pi.Index,
667675
openedAt: pi.CreatedAt,
668676
latestEndEpoch: getEndEpoch(pi.EndEpoch, 0),
677+
offset: offset,
669678
}
670679
continue
671680
}
672-
sector.currentSize += pi.Size
673681
sector.earliestStartEpoch = getStartEpoch(pi.StartEpoch, sector.earliestStartEpoch)
674682
sector.latestEndEpoch = getEndEpoch(pi.EndEpoch, sector.earliestStartEpoch)
675683
if sector.index < pi.Index {
676684
sector.index = pi.Index
677685
}
678686
sector.openedAt = getOpenedAt(pi, sector.openedAt)
687+
// Consider padding
688+
_, padLength := proofs.GetRequiredPadding(sector.offset.Padded(), pi.Size)
689+
sector.offset += padLength.Unpadded()
690+
sector.offset += pi.Size.Unpadded()
679691
}
680692

681693
var os []*openSector

0 commit comments

Comments
 (0)