@@ -14,6 +14,7 @@ import (
14
14
"github.com/ipfs/go-cid"
15
15
logging "github.com/ipfs/go-log/v2"
16
16
"github.com/puzpuzpuz/xsync/v2"
17
+ "github.com/samber/lo"
17
18
"golang.org/x/xerrors"
18
19
19
20
"github.com/filecoin-project/curio/harmony/harmonytask"
@@ -43,19 +44,19 @@ type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cach
43
44
}
44
45
*/
45
46
type SealCalls struct {
46
- sectors * storageProvider
47
+ Sectors * storageProvider
47
48
48
49
/*// externCalls cointain overrides for calling alternative sealing logic
49
50
externCalls ExternalSealer*/
50
51
}
51
52
52
53
func NewSealCalls (st * paths.Remote , ls * paths.Local , si paths.SectorIndex ) * SealCalls {
53
54
return & SealCalls {
54
- sectors : & storageProvider {
55
+ Sectors : & storageProvider {
55
56
storage : st ,
56
57
localStore : ls ,
57
58
sindex : si ,
58
- storageReservations : xsync .NewIntegerMapOf [harmonytask.TaskID , * StorageReservation ](),
59
+ storageReservations : xsync .NewIntegerMapOf [harmonytask.TaskID , [] * StorageReservation ](),
59
60
},
60
61
}
61
62
}
@@ -64,7 +65,7 @@ type storageProvider struct {
64
65
storage * paths.Remote
65
66
localStore * paths.Local
66
67
sindex paths.SectorIndex
67
- storageReservations * xsync.MapOf [harmonytask.TaskID , * StorageReservation ]
68
+ storageReservations * xsync.MapOf [harmonytask.TaskID , [] * StorageReservation ]
68
69
}
69
70
70
71
func (l * storageProvider ) AcquireSector (ctx context.Context , taskID * harmonytask.TaskID , sector storiface.SectorRef , existing , allocate storiface.SectorFileType , sealing storiface.PathType ) (fspaths , ids storiface.SectorPaths , release func (dontDeclare ... storiface.SectorFileType ), err error ) {
@@ -74,7 +75,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
74
75
var ok bool
75
76
var resv * StorageReservation
76
77
if taskID != nil {
77
- resv , ok = l .storageReservations .Load (* taskID )
78
+ resvs , rok := l .storageReservations .Load (* taskID )
79
+ if rok {
80
+ resv , ok = lo .Find (resvs , func (res * StorageReservation ) bool {
81
+ return res .SectorRef .ID () == sector .ID
82
+ })
83
+ }
78
84
}
79
85
if ok && resv != nil {
80
86
if resv .Alloc != allocate || resv .Existing != existing {
@@ -144,7 +150,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
144
150
}
145
151
146
152
func (sb * SealCalls ) GenerateSDR (ctx context.Context , taskID harmonytask.TaskID , into storiface.SectorFileType , sector storiface.SectorRef , ticket abi.SealRandomness , commDcid cid.Cid ) error {
147
- paths , pathIDs , releaseSector , err := sb .sectors .AcquireSector (ctx , & taskID , sector , storiface .FTNone , into , storiface .PathSealing )
153
+ paths , pathIDs , releaseSector , err := sb .Sectors .AcquireSector (ctx , & taskID , sector , storiface .FTNone , into , storiface .PathSealing )
148
154
if err != nil {
149
155
return xerrors .Errorf ("acquiring sector paths: %w" , err )
150
156
}
@@ -223,7 +229,7 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID
223
229
224
230
log .Debugw ("ensureOneCopy" , "sector" , sid , "type" , fileType , "keep" , keepIn )
225
231
226
- if err := sb .sectors .storage .Remove (ctx , sid , fileType , true , keepIn ); err != nil {
232
+ if err := sb .Sectors .storage .Remove (ctx , sid , fileType , true , keepIn ); err != nil {
227
233
return err
228
234
}
229
235
}
@@ -237,7 +243,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
237
243
return cid .Undef , cid .Undef , xerrors .Errorf ("make phase1 output: %w" , err )
238
244
}
239
245
240
- fspaths , pathIDs , releaseSector , err := sb .sectors .AcquireSector (ctx , task , sector , storiface .FTCache , storiface .FTSealed , storiface .PathSealing )
246
+ fspaths , pathIDs , releaseSector , err := sb .Sectors .AcquireSector (ctx , task , sector , storiface .FTCache , storiface .FTSealed , storiface .PathSealing )
241
247
if err != nil {
242
248
return cid .Undef , cid .Undef , xerrors .Errorf ("acquiring sector paths: %w" , err )
243
249
}
@@ -352,7 +358,7 @@ func (sb *SealCalls) GenerateSynthPoRep() {
352
358
}
353
359
354
360
func (sb * SealCalls ) PoRepSnark (ctx context.Context , sn storiface.SectorRef , sealed , unsealed cid.Cid , ticket abi.SealRandomness , seed abi.InteractiveSealRandomness ) ([]byte , error ) {
355
- vproof , err := sb .sectors .storage .GeneratePoRepVanillaProof (ctx , sn , sealed , unsealed , ticket , seed )
361
+ vproof , err := sb .Sectors .storage .GeneratePoRepVanillaProof (ctx , sn , sealed , unsealed , ticket , seed )
356
362
if err != nil {
357
363
return nil , xerrors .Errorf ("failed to generate vanilla proof: %w" , err )
358
364
}
@@ -498,7 +504,7 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof)
498
504
}
499
505
500
506
func (sb * SealCalls ) LocalStorage (ctx context.Context ) ([]storiface.StoragePath , error ) {
501
- return sb .sectors .localStore .Local (ctx )
507
+ return sb .Sectors .localStore .Local (ctx )
502
508
}
503
509
504
510
func changePathType (path string , newType storiface.SectorFileType ) (string , error ) {
@@ -549,7 +555,7 @@ func (sb *SealCalls) GenerateUnsealedSector(ctx context.Context, sector storifac
549
555
550
556
defer func () {
551
557
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
552
- if err := sb .sectors .sindex .StorageDeclareSector (ctx , storiface .ID (pathIDs .Unsealed ), sector .ID , storiface .FTUnsealed , true ); err != nil {
558
+ if err := sb .Sectors .sindex .StorageDeclareSector (ctx , storiface .ID (pathIDs .Unsealed ), sector .ID , storiface .FTUnsealed , true ); err != nil {
553
559
log .Errorf ("declare unsealed sector error: %s" , err )
554
560
}
555
561
}()
@@ -635,7 +641,7 @@ func TruncateAndMoveUnsealed(tempUnsealed, unsealed string, ssize abi.SectorSize
635
641
}
636
642
637
643
func (sb * SealCalls ) FinalizeSector (ctx context.Context , sector storiface.SectorRef , keepUnsealed bool ) error {
638
- sectorPaths , pathIDs , releaseSector , err := sb .sectors .AcquireSector (ctx , nil , sector , storiface .FTCache , storiface .FTNone , storiface .PathSealing )
644
+ sectorPaths , pathIDs , releaseSector , err := sb .Sectors .AcquireSector (ctx , nil , sector , storiface .FTCache , storiface .FTNone , storiface .PathSealing )
639
645
if err != nil {
640
646
return xerrors .Errorf ("acquiring sector paths: %w" , err )
641
647
}
@@ -694,11 +700,16 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
694
700
695
701
var opts []storiface.AcquireOption
696
702
if taskID != nil {
697
- resv , ok := sb .sectors .storageReservations .Load (* taskID )
703
+ resvs , ok := sb .Sectors .storageReservations .Load (* taskID )
698
704
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
699
705
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
700
706
// anyways, and before we start claiming other tasks.
701
707
if ok {
708
+ if len (resvs ) != 1 {
709
+ return xerrors .Errorf ("task %d has %d reservations, expected 1" , taskID , len (resvs ))
710
+ }
711
+ resv := resvs [0 ]
712
+
702
713
defer resv .Release ()
703
714
704
715
if resv .Alloc != storiface .FTNone {
@@ -712,13 +723,13 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
712
723
}
713
724
}
714
725
715
- err := sb .sectors .storage .MoveStorage (ctx , sector , toMove , opts ... )
726
+ err := sb .Sectors .storage .MoveStorage (ctx , sector , toMove , opts ... )
716
727
if err != nil {
717
728
return xerrors .Errorf ("moving storage: %w" , err )
718
729
}
719
730
720
731
for _ , fileType := range toMove .AllSet () {
721
- if err := sb .sectors .storage .RemoveCopies (ctx , sector .ID , fileType ); err != nil {
732
+ if err := sb .Sectors .storage .RemoveCopies (ctx , sector .ID , fileType ); err != nil {
722
733
return xerrors .Errorf ("rm copies (t:%s, s:%v): %w" , fileType , sector , err )
723
734
}
724
735
}
@@ -727,7 +738,7 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
727
738
}
728
739
729
740
func (sb * SealCalls ) sectorStorageType (ctx context.Context , sector storiface.SectorRef , ft storiface.SectorFileType ) (sectorFound bool , ptype storiface.PathType , err error ) {
730
- stores , err := sb .sectors .sindex .StorageFindSector (ctx , sector .ID , ft , 0 , false )
741
+ stores , err := sb .Sectors .sindex .StorageFindSector (ctx , sector .ID , ft , 0 , false )
731
742
if err != nil {
732
743
return false , "" , xerrors .Errorf ("finding sector: %w" , err )
733
744
}
@@ -746,7 +757,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec
746
757
747
758
// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
748
759
func (sb * SealCalls ) PreFetch (ctx context.Context , sector storiface.SectorRef , task * harmonytask.TaskID ) (fsPath , pathID storiface.SectorPaths , releaseSector func (... storiface.SectorFileType ), err error ) {
749
- fsPath , pathID , releaseSector , err = sb .sectors .AcquireSector (ctx , task , sector , storiface .FTCache , storiface .FTNone , storiface .PathSealing )
760
+ fsPath , pathID , releaseSector , err = sb .Sectors .AcquireSector (ctx , task , sector , storiface .FTCache , storiface .FTNone , storiface .PathSealing )
750
761
if err != nil {
751
762
return storiface.SectorPaths {}, storiface.SectorPaths {}, nil , xerrors .Errorf ("acquiring sector paths: %w" , err )
752
763
}
@@ -782,7 +793,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
782
793
}
783
794
784
795
func (sb * SealCalls ) SyntheticProofs (ctx context.Context , task * harmonytask.TaskID , sector storiface.SectorRef , sealed cid.Cid , unsealed cid.Cid , randomness abi.SealRandomness , pieces []abi.PieceInfo , keepUnsealed bool ) error {
785
- fspaths , pathIDs , releaseSector , err := sb .sectors .AcquireSector (ctx , task , sector , storiface .FTCache | storiface .FTSealed , storiface .FTNone , storiface .PathSealing )
796
+ fspaths , pathIDs , releaseSector , err := sb .Sectors .AcquireSector (ctx , task , sector , storiface .FTCache | storiface .FTSealed , storiface .FTNone , storiface .PathSealing )
786
797
if err != nil {
787
798
return xerrors .Errorf ("acquiring sector paths: %w" , err )
788
799
}
0 commit comments