Skip to content

Commit cf0699a

Browse files
sector_storage SnapDeals methods (#582)
* sector_storage SnapDeals methods Signed-off-by: Alexey Chernyshov <[email protected]>
1 parent d61f480 commit cf0699a

File tree

10 files changed

+545
-437
lines changed

10 files changed

+545
-437
lines changed

core/api/storage_miner/storage_api.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,9 @@ namespace fc::api {
309309
f(a.ReturnUnsealPiece);
310310
f(a.ReturnReadPiece);
311311
f(a.ReturnFetch);
312+
f(a.ReturnReplicaUpdate);
313+
f(a.ReturnProveReplicaUpdate1);
314+
f(a.ReturnProveReplicaUpdate2);
312315
f(a.WorkerConnect);
313316
}
314317
} // namespace fc::api

core/api/worker_api.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ namespace fc::api {
134134
f(a.AddPiece);
135135
f(a.Fetch);
136136
f(a.FinalizeSector);
137+
f(a.ReplicaUpdate);
138+
f(a.ProveReplicaUpdate1);
139+
f(a.ProveReplicaUpdate2);
137140
f(a.Info);
138141
f(a.MoveStorage);
139142
f(a.Paths);

core/markets/retrieval/provider/impl/retrieval_provider_impl.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include "common/libp2p/peer/peer_info_helper.hpp"
1111
#include "markets/storage/types.hpp"
12+
#include "sector_storage/scheduler.hpp"
1213
#include "storage/piece/impl/piece_storage_error.hpp"
1314

1415
namespace fc::markets::retrieval::provider {
@@ -410,15 +411,29 @@ namespace fc::markets::retrieval::provider {
410411

411412
CID comm_d = sector_info->comm_d.get_value_or(CID());
412413

413-
OUTCOME_TRY(
414-
sealer_->readPieceSync(PieceData(output_path, O_WRONLY | O_CREAT),
415-
sector,
416-
UnpaddedByteIndex(offset),
417-
size,
418-
sector_info->ticket,
419-
comm_d));
414+
std::promise<outcome::result<bool>> wait;
420415

421-
return outcome::success();
416+
sealer_->readPiece(
417+
PieceData(output_path, O_WRONLY | O_CREAT),
418+
sector,
419+
UnpaddedByteIndex(offset),
420+
size,
421+
sector_info->ticket,
422+
comm_d,
423+
[&wait](outcome::result<bool> res) { wait.set_value(res); },
424+
sector_storage::kDefaultTaskPriority);
425+
426+
auto res = wait.get_future().get();
427+
428+
if (res.has_error()) {
429+
return res.error();
430+
}
431+
432+
if (res.value()) {
433+
return outcome::success();
434+
}
435+
436+
return ERROR_TEXT("cannot read piece");
422437
}
423438

424439
} // namespace fc::markets::retrieval::provider

core/sector_storage/impl/manager_impl.cpp

Lines changed: 123 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace fc::sector_storage {
4242
}
4343

4444
WorkerAction schedNothing() {
45-
return WorkerAction();
45+
return {};
4646
}
4747

4848
void addCachePathsForSectorSize(
@@ -465,7 +465,8 @@ namespace fc::sector_storage {
465465
const UnpaddedPieceSize &size,
466466
const SealRandomness &randomness,
467467
const CID &cid,
468-
const std::function<void(outcome::result<bool>)> &cb) {
468+
const std::function<void(outcome::result<bool>)> &cb,
469+
uint64_t priority) {
469470
OUTCOME_CB(auto lock,
470471
index_->storageLock(
471472
sector.id,
@@ -518,7 +519,7 @@ namespace fc::sector_storage {
518519
callbackWrapper([&wait](outcome::result<void> res) -> void {
519520
wait.set_value(res);
520521
}),
521-
kDefaultTaskPriority,
522+
priority,
522523
boost::none));
523524

524525
OUTCOME_CB1(wait.get_future().get());
@@ -545,40 +546,10 @@ namespace fc::sector_storage {
545546
return worker->readPiece(std::move(*output), sector, offset, size);
546547
},
547548
callbackWrapper(cb),
548-
kDefaultTaskPriority,
549+
priority,
549550
boost::none));
550551
}
551552

552-
outcome::result<bool> ManagerImpl::readPieceSync(
553-
PieceData output,
554-
const SectorRef &sector,
555-
UnpaddedByteIndex offset,
556-
const UnpaddedPieceSize &size,
557-
const SealRandomness &randomness,
558-
const CID &cid) {
559-
std::promise<outcome::result<bool>> wait;
560-
561-
readPiece(std::move(output),
562-
sector,
563-
offset,
564-
size,
565-
randomness,
566-
cid,
567-
[&wait](outcome::result<bool> res) { wait.set_value(res); });
568-
569-
auto res = wait.get_future().get();
570-
571-
if (res.has_error()) {
572-
return res.error();
573-
}
574-
575-
if (res.value()) {
576-
return outcome::success();
577-
}
578-
579-
return ManagerErrors::kCannotReadData;
580-
}
581-
582553
void ManagerImpl::sealPreCommit1(
583554
const SectorRef &sector,
584555
const SealRandomness &ticket,
@@ -621,25 +592,6 @@ namespace fc::sector_storage {
621592
work_id));
622593
}
623594

624-
outcome::result<PreCommit1Output> ManagerImpl::sealPreCommit1Sync(
625-
const SectorRef &sector,
626-
const SealRandomness &ticket,
627-
const std::vector<PieceInfo> &pieces,
628-
uint64_t priority) {
629-
std::promise<outcome::result<PreCommit1Output>> wait;
630-
631-
sealPreCommit1(
632-
sector,
633-
ticket,
634-
pieces,
635-
[&wait](outcome::result<PreCommit1Output> res) -> void {
636-
wait.set_value(std::move(res));
637-
},
638-
priority);
639-
640-
return wait.get_future().get();
641-
}
642-
643595
void ManagerImpl::sealPreCommit2(
644596
const SectorRef &sector,
645597
const PreCommit1Output &pre_commit_1_output,
@@ -679,23 +631,6 @@ namespace fc::sector_storage {
679631
work_id))
680632
}
681633

682-
outcome::result<SectorCids> ManagerImpl::sealPreCommit2Sync(
683-
const SectorRef &sector,
684-
const PreCommit1Output &pre_commit_1_output,
685-
uint64_t priority) {
686-
std::promise<outcome::result<SectorCids>> wait;
687-
688-
sealPreCommit2(
689-
sector,
690-
pre_commit_1_output,
691-
[&wait](outcome::result<SectorCids> res) -> void {
692-
wait.set_value(std::move(res));
693-
},
694-
priority);
695-
696-
return wait.get_future().get();
697-
}
698-
699634
void ManagerImpl::sealCommit1(
700635
const SectorRef &sector,
701636
const SealRandomness &ticket,
@@ -738,27 +673,6 @@ namespace fc::sector_storage {
738673
work_id));
739674
}
740675

741-
outcome::result<Commit1Output> ManagerImpl::sealCommit1Sync(
742-
const SectorRef &sector,
743-
const SealRandomness &ticket,
744-
const InteractiveRandomness &seed,
745-
const std::vector<PieceInfo> &pieces,
746-
const SectorCids &cids,
747-
uint64_t priority) {
748-
std::promise<outcome::result<Commit1Output>> wait;
749-
sealCommit1(
750-
sector,
751-
ticket,
752-
seed,
753-
pieces,
754-
cids,
755-
[&wait](const outcome::result<Commit1Output> &res) -> void {
756-
wait.set_value(res);
757-
},
758-
priority);
759-
return wait.get_future().get();
760-
}
761-
762676
void ManagerImpl::sealCommit2(
763677
const SectorRef &sector,
764678
const Commit1Output &commit_1_output,
@@ -768,7 +682,7 @@ namespace fc::sector_storage {
768682
getWorkId(primitives::kTTCommit2,
769683
std::make_tuple(sector, commit_1_output)));
770684

771-
std::unique_ptr<TaskSelector> selector = std::make_unique<TaskSelector>();
685+
auto selector = std::make_unique<TaskSelector>();
772686

773687
OUTCOME_CB1(scheduler_->schedule(
774688
sector,
@@ -784,23 +698,6 @@ namespace fc::sector_storage {
784698
work_id));
785699
}
786700

787-
outcome::result<Proof> ManagerImpl::sealCommit2Sync(
788-
const SectorRef &sector,
789-
const Commit1Output &commit_1_output,
790-
uint64_t priority) {
791-
std::promise<outcome::result<Proof>> wait;
792-
793-
sealCommit2(
794-
sector,
795-
commit_1_output,
796-
[&wait](const outcome::result<Proof> &res) -> void {
797-
wait.set_value(res);
798-
},
799-
priority);
800-
801-
return wait.get_future().get();
802-
}
803-
804701
void ManagerImpl::finalizeSector(
805702
const SectorRef &sector,
806703
const gsl::span<const Range> &keep_unsealed,
@@ -899,19 +796,128 @@ namespace fc::sector_storage {
899796
boost::none));
900797
}
901798

902-
outcome::result<void> ManagerImpl::finalizeSectorSync(
799+
void ManagerImpl::replicaUpdate(
903800
const SectorRef &sector,
904-
const gsl::span<const Range> &keep_unsealed,
801+
const std::vector<PieceInfo> &pieces,
802+
const std::function<void(outcome::result<ReplicaUpdateOut>)> &cb,
905803
uint64_t priority) {
906-
std::promise<outcome::result<void>> waiter;
804+
logger_->debug("sector_storage::Manager is doing replica update");
805+
OUTCOME_CB(WorkId work_id,
806+
getWorkId(primitives::kTTReplicaUpdate,
807+
std::make_tuple(sector, pieces)));
808+
809+
OUTCOME_CB(auto lock,
810+
index_->storageLock(
811+
sector.id,
812+
SectorFileType::FTUnsealed | SectorFileType::FTSealed
813+
| SectorFileType::FTCache,
814+
SectorFileType::FTUpdate | SectorFileType::FTUpdateCache));
907815

908-
finalizeSector(
816+
auto selector = std::make_unique<AllocateSelector>(
817+
index_,
818+
SectorFileType::FTUpdate | SectorFileType::FTUpdateCache,
819+
PathType::kSealing);
820+
821+
OUTCOME_CB1(scheduler_->schedule(
909822
sector,
910-
keep_unsealed,
911-
[&waiter](outcome::result<void> res) -> void { waiter.set_value(res); },
912-
priority);
823+
primitives::kTTReplicaUpdate,
824+
std::move(selector),
825+
schedFetch(sector,
826+
SectorFileType::FTUnsealed | SectorFileType::FTSealed
827+
| SectorFileType::FTCache,
828+
PathType::kSealing,
829+
AcquireMode::kCopy),
830+
[sector, pieces, lock = std::move(lock)](
831+
const std::shared_ptr<Worker> &worker) -> outcome::result<CallId> {
832+
return worker->replicaUpdate(sector, pieces);
833+
},
834+
callbackWrapper(cb),
835+
priority,
836+
work_id));
837+
}
838+
839+
void ManagerImpl::proveReplicaUpdate1(
840+
const SectorRef &sector,
841+
const CID &sector_key,
842+
const CID &new_sealed,
843+
const CID &new_unsealed,
844+
const std::function<void(outcome::result<ReplicaVanillaProofs>)> &cb,
845+
uint64_t priority) {
846+
OUTCOME_CB(WorkId work_id,
847+
getWorkId(primitives::kTTProveReplicaUpdate1,
848+
std::make_tuple(
849+
sector, sector_key, new_sealed, new_unsealed)));
850+
851+
OUTCOME_CB(
852+
auto lock,
853+
index_->storageLock(sector.id,
854+
SectorFileType::FTSealed | SectorFileType::FTUpdate
855+
| SectorFileType::FTCache
856+
| SectorFileType::FTUpdateCache,
857+
SectorFileType::FTNone));
858+
859+
// NOTE: We set allowFetch to false in so that we always execute on a worker
860+
// with direct access to the data. We want to do that because this step is
861+
// generally very cheap / fast, and transferring data is not worth the
862+
// effort
863+
auto selector = std::make_unique<ExistingSelector>(
864+
index_,
865+
sector.id,
866+
SectorFileType::FTUpdate | SectorFileType::FTUpdateCache
867+
| SectorFileType::FTSealed | SectorFileType::FTCache,
868+
false);
869+
870+
OUTCOME_CB1(scheduler_->schedule(
871+
sector,
872+
primitives::kTTProveReplicaUpdate1,
873+
std::move(selector),
874+
schedFetch(sector,
875+
SectorFileType::FTSealed | SectorFileType::FTCache
876+
| SectorFileType::FTUpdate
877+
| SectorFileType::FTUpdateCache,
878+
PathType::kSealing,
879+
AcquireMode::kCopy),
880+
[sector, sector_key, new_sealed, new_unsealed, lock = std::move(lock)](
881+
const std::shared_ptr<Worker> &worker) -> outcome::result<CallId> {
882+
return worker->proveReplicaUpdate1(
883+
sector, sector_key, new_sealed, new_unsealed);
884+
},
885+
callbackWrapper(cb),
886+
priority,
887+
work_id));
888+
}
913889

914-
return waiter.get_future().get();
890+
void ManagerImpl::proveReplicaUpdate2(
891+
const SectorRef &sector,
892+
const CID &sector_key,
893+
const CID &new_sealed,
894+
const CID &new_unsealed,
895+
const Update1Output &update_1_output,
896+
const std::function<void(outcome::result<ReplicaUpdateProof>)> &cb,
897+
uint64_t priority) {
898+
OUTCOME_CB(WorkId work_id,
899+
getWorkId(primitives::kTTProveReplicaUpdate2,
900+
std::make_tuple(sector,
901+
sector_key,
902+
new_sealed,
903+
new_unsealed,
904+
update_1_output)));
905+
906+
auto selector = std::make_unique<TaskSelector>();
907+
908+
OUTCOME_CB1(scheduler_->schedule(
909+
sector,
910+
primitives::kTTProveReplicaUpdate2,
911+
std::move(selector),
912+
schedNothing(),
913+
[sector, sector_key, new_sealed, new_unsealed, update_1_output](
914+
const std::shared_ptr<Worker> &worker) -> outcome::result<CallId> {
915+
return worker->proveReplicaUpdate2(
916+
sector, sector_key, new_sealed, new_unsealed, update_1_output);
917+
},
918+
callbackWrapper(cb),
919+
priority,
920+
work_id));
915921
}
916922

917923
void ManagerImpl::addPiece(

0 commit comments

Comments
 (0)