Skip to content

Commit aec1835

Browse files
authored
Update worker (#572)
* Add Update methods to workers Signed-off-by: ortyomka <[email protected]> * Add tests and format code Signed-off-by: ortyomka <[email protected]>
1 parent 6bb7b1b commit aec1835

File tree

17 files changed

+548
-14
lines changed

17 files changed

+548
-14
lines changed

core/api/full_node/node_api.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ namespace fc::api {
5858
using primitives::block::BlockTemplate;
5959
using primitives::block::BlockWithCids;
6060
using primitives::piece::PaddedPieceSize;
61+
using primitives::sector::ExtendedSectorInfo;
6162
using primitives::sector::RegisteredPoStProof;
6263
using primitives::sector::RegisteredSealProof;
63-
using primitives::sector::ExtendedSectorInfo;
6464
using primitives::tipset::HeadChange;
6565
using primitives::tipset::TipsetCPtr;
6666
using primitives::tipset::TipsetKey;

core/api/storage_miner/return_api.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,26 @@ namespace fc::api {
6868
const boost::optional<CallError> &call_error) {
6969
return scheduler->returnResult(call_id, {{}, call_error});
7070
};
71+
72+
api->ReturnReplicaUpdate =
73+
[=](const CallId &call_id,
74+
SectorCids cids,
75+
const boost::optional<CallError> &call_error) {
76+
return scheduler->returnResult(call_id, {cids, call_error});
77+
};
78+
79+
api->ReturnProveReplicaUpdate1 = [=](const CallId &call_id,
80+
Update1Output update_1_output,
81+
const boost::optional<CallError>
82+
&call_error) {
83+
return scheduler->returnResult(call_id, {update_1_output, call_error});
84+
};
85+
86+
api->ReturnProveReplicaUpdate2 =
87+
[=](const CallId &call_id,
88+
Proof proof,
89+
const boost::optional<CallError> &call_error) {
90+
return scheduler->returnResult(call_id, {proof, call_error});
91+
};
7192
}
7293
} // namespace fc::api

core/api/storage_miner/storage_api.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ namespace fc::api {
5454
using sector_storage::Proof;
5555
using sector_storage::Scheduler;
5656
using sector_storage::SectorCids;
57+
using sector_storage::Update1Output;
5758
using sector_storage::stores::FsStat;
5859
using sector_storage::stores::HealthReport;
5960
using sector_storage::stores::SectorIndex;
@@ -238,6 +239,24 @@ namespace fc::api {
238239
void,
239240
const CallId &,
240241
const boost::optional<CallError> &)
242+
API_METHOD(ReturnReplicaUpdate,
243+
jwt::kAdminPermission,
244+
void,
245+
const CallId &,
246+
SectorCids,
247+
const boost::optional<CallError> &)
248+
API_METHOD(ReturnProveReplicaUpdate1,
249+
jwt::kAdminPermission,
250+
void,
251+
const CallId &,
252+
Update1Output,
253+
const boost::optional<CallError> &)
254+
API_METHOD(ReturnProveReplicaUpdate2,
255+
jwt::kAdminPermission,
256+
void,
257+
const CallId &,
258+
Proof,
259+
const boost::optional<CallError> &)
241260

242261
API_METHOD(WorkerConnect, jwt::kAdminPermission, void, const std::string &);
243262
};

core/api/worker_api.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,27 @@ namespace fc::api {
5353
const SectorRef &,
5454
const std::vector<Range> &)
5555

56+
API_METHOD(ReplicaUpdate,
57+
kAdminPermission,
58+
CallId,
59+
const SectorRef &,
60+
const std::vector<PieceInfo> &)
61+
API_METHOD(ProveReplicaUpdate1,
62+
kAdminPermission,
63+
CallId,
64+
const SectorRef &,
65+
const CID &,
66+
const CID &,
67+
const CID &)
68+
API_METHOD(ProveReplicaUpdate2,
69+
kAdminPermission,
70+
CallId,
71+
const SectorRef &,
72+
const CID &,
73+
const CID &,
74+
const CID &,
75+
const Update1Output &)
76+
5677
API_METHOD(Info, kAdminPermission, primitives::WorkerInfo)
5778

5879
API_METHOD(MoveStorage,

core/const.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ namespace fc {
216216
kUpgradeNorwegianHeight = -14;
217217
kUpgradeTurboHeight = -15;
218218
kUpgradeHyperdriveHeight = -16;
219-
kUpgradeChocolateHeight = INT64_MAX; // -17 in lotus
220-
kUpgradeOhSnapHeight = INT64_MAX; // -18 in lotus
219+
kUpgradeChocolateHeight = INT64_MAX; // -17 in lotus
220+
kUpgradeOhSnapHeight = INT64_MAX; // -18 in lotus
221221

222222
kBreezeGasTampingDuration = 0;
223223

core/primitives/sector_file/sector_file.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace fc::primitives::sector_file {
3737
return static_cast<SectorFileType>(int64_t(lhs) | rhs);
3838
}
3939

40-
constexpr size_t kSectorFileTypeBits{3};
40+
constexpr size_t kSectorFileTypeBits{5};
4141

4242
const std::vector<SectorFileType> kSectorFileTypes = {
4343
SectorFileType::FTUnsealed,
@@ -56,7 +56,6 @@ namespace fc::primitives::sector_file {
5656
{SectorFileType::FTCache, 141},
5757
{SectorFileType::FTUpdate, kOverheadDenominator},
5858
{SectorFileType::FTUpdateCache, kOverheadDenominator * 2},
59-
6059
};
6160

6261
const std::unordered_map<SectorFileType, uint64_t> kOverheadFinalized{

core/sector_storage/impl/local_worker.cpp

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,131 @@ namespace fc::sector_storage {
526526
});
527527
}
528528

529+
outcome::result<CallId> LocalWorker::replicaUpdate(
530+
const SectorRef &sector, const std::vector<PieceInfo> &pieces) {
531+
return asyncCall(
532+
sector,
533+
return_->ReturnReplicaUpdate,
534+
[=](Self self) -> outcome::result<SectorCids> {
535+
OUTCOME_TRY(
536+
response,
537+
self->acquireSector(
538+
sector,
539+
SectorFileType::FTUnsealed | SectorFileType::FTSealed
540+
| SectorFileType::FTCache,
541+
SectorFileType::FTUpdate | SectorFileType::FTUpdateCache,
542+
PathType::kSealing));
543+
auto _ = gsl::finally([&]() { response.release_function(); });
544+
545+
const auto &paths{response.paths};
546+
547+
OUTCOME_TRY(update_proof_type,
548+
getRegisteredUpdateProof(sector.proof_type));
549+
550+
boost::system::error_code ec;
551+
auto size = boost::filesystem::file_size(paths.sealed, ec);
552+
if (ec.failed()) {
553+
logger_->error("Cannot get file size: {}", ec.message());
554+
return ERROR_TEXT("Cannot get file size");
555+
}
556+
557+
int fd = open(paths.update.c_str(), O_RDWR | O_CREAT, 0644);
558+
if (fd == -1) {
559+
return ERROR_TEXT("Cannot create update file");
560+
}
561+
close(fd);
562+
563+
boost::filesystem::resize_file(paths.update, size, ec);
564+
if (ec.failed()) {
565+
logger_->error("Cannot resize update file: {}", ec.message());
566+
return ERROR_TEXT("Cannot resize update file");
567+
}
568+
569+
if (!boost::filesystem::create_directory(paths.update_cache)) {
570+
if (boost::filesystem::exists(paths.update_cache)) {
571+
boost::filesystem::remove_all(paths.update_cache, ec);
572+
if (ec.failed()) {
573+
logger_->error("Cannot create cache update dir: {}",
574+
ec.message());
575+
return ERROR_TEXT("Cannot create cache update dir");
576+
}
577+
if (!boost::filesystem::create_directory(response.paths.cache)) {
578+
logger_->error("Cannot create cache update dir: {}",
579+
ec.message());
580+
return ERROR_TEXT("Cannot create cache update dir");
581+
}
582+
} else {
583+
logger_->error("Cannot create cache update dir: {}",
584+
ec.message());
585+
return ERROR_TEXT("Cannot create cache update dir");
586+
}
587+
}
588+
589+
return self->proofs_->updateSeal(update_proof_type,
590+
paths.update,
591+
paths.update_cache,
592+
paths.sealed,
593+
paths.cache,
594+
paths.unsealed,
595+
gsl::span<const PieceInfo>(pieces));
596+
});
597+
}
598+
599+
outcome::result<CallId> LocalWorker::proveReplicaUpdate1(
600+
const SectorRef &sector,
601+
const CID &sector_key,
602+
const CID &new_sealed,
603+
const CID &new_unsealed) {
604+
return asyncCall(
605+
sector,
606+
return_->ReturnProveReplicaUpdate1,
607+
[=](Self self) -> outcome::result<Update1Output> {
608+
OUTCOME_TRY(response,
609+
self->acquireSector(sector,
610+
SectorFileType::FTSealed
611+
| SectorFileType::FTCache
612+
| SectorFileType::FTUpdate
613+
| SectorFileType::FTUpdateCache,
614+
SectorFileType::FTNone,
615+
PathType::kSealing));
616+
auto _ = gsl::finally([&]() { response.release_function(); });
617+
618+
const auto &paths{response.paths};
619+
620+
OUTCOME_TRY(update_proof_type,
621+
getRegisteredUpdateProof(sector.proof_type));
622+
623+
return self->proofs_->updateProve1(update_proof_type,
624+
sector_key,
625+
new_sealed,
626+
new_unsealed,
627+
paths.update,
628+
paths.update_cache,
629+
paths.sealed,
630+
paths.cache);
631+
});
632+
}
633+
634+
outcome::result<CallId> LocalWorker::proveReplicaUpdate2(
635+
const SectorRef &sector,
636+
const CID &sector_key,
637+
const CID &new_sealed,
638+
const CID &new_unsealed,
639+
const Update1Output &update_1_output) {
640+
return asyncCall(sector,
641+
return_->ReturnProveReplicaUpdate2,
642+
[=](Self self) -> outcome::result<Proof> {
643+
OUTCOME_TRY(update_proof_type,
644+
getRegisteredUpdateProof(sector.proof_type));
645+
646+
return self->proofs_->updateProve2(update_proof_type,
647+
sector_key,
648+
new_sealed,
649+
new_unsealed,
650+
update_1_output);
651+
});
652+
}
653+
529654
outcome::result<CallId> LocalWorker::moveStorage(const SectorRef &sector,
530655
SectorFileType types) {
531656
return asyncCall(sector,

core/sector_storage/impl/local_worker.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ namespace fc::sector_storage {
5757
const SectorRef &sector,
5858
const gsl::span<const Range> &keep_unsealed) override;
5959

60+
outcome::result<CallId> replicaUpdate(
61+
const SectorRef &sector, const std::vector<PieceInfo> &pieces) override;
62+
63+
outcome::result<CallId> proveReplicaUpdate1(
64+
const SectorRef &sector,
65+
const CID &sector_key,
66+
const CID &new_sealed,
67+
const CID &new_unsealed) override;
68+
69+
outcome::result<CallId> proveReplicaUpdate2(
70+
const SectorRef &sector,
71+
const CID &sector_key,
72+
const CID &new_sealed,
73+
const CID &new_unsealed,
74+
const Update1Output &update_1_output) override;
75+
6076
outcome::result<CallId> moveStorage(const SectorRef &sector,
6177
SectorFileType types) override;
6278

core/sector_storage/impl/manager_impl.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,9 @@ namespace fc::sector_storage {
231231
};
232232

233233
outcome::result<PubToPrivateResponse> publicSectorToPrivate(
234-
ActorId miner, gsl::span<const ExtendedSectorInfo> sector_info, bool winning);
234+
ActorId miner,
235+
gsl::span<const ExtendedSectorInfo> sector_info,
236+
bool winning);
235237

236238
std::shared_ptr<stores::SectorIndex> index_;
237239

core/sector_storage/impl/remote_worker.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ namespace fc::sector_storage {
4040

4141
r_worker->wsc_.setup(r_worker->api_);
4242

43-
OUTCOME_TRY(r_worker->wsc_.connect(address, "/rpc/v0", std::string(token.begin(), token.end())));
43+
OUTCOME_TRY(r_worker->wsc_.connect(
44+
address, "/rpc/v0", std::string(token.begin(), token.end())));
4445

4546
return std::move(r_worker);
4647
}
@@ -145,14 +146,19 @@ namespace fc::sector_storage {
145146
new_piece_size,
146147
[](const outcome::result<std::string> &res) {
147148
if (res.has_error()) {
148-
spdlog::error("Transfer of pieces was finished with error: {}", res.value());
149+
spdlog::error("Transfer of pieces was finished with error: {}",
150+
res.value());
149151
} else {
150152
spdlog::info("Transfer of pieces was finished with response {}",
151153
res.value());
152154
}
153155
});
154156
}
155-
return api_.AddPiece(sector, std::vector<UnpaddedPieceSize>(piece_sizes.begin(), piece_sizes.end()), new_piece_size, meta_data);
157+
return api_.AddPiece(
158+
sector,
159+
std::vector<UnpaddedPieceSize>(piece_sizes.begin(), piece_sizes.end()),
160+
new_piece_size,
161+
meta_data);
156162
}
157163

158164
outcome::result<CallId> RemoteWorker::sealPreCommit1(
@@ -187,6 +193,30 @@ namespace fc::sector_storage {
187193
sector, std::vector<Range>(keep_unsealed.begin(), keep_unsealed.end()));
188194
}
189195

196+
outcome::result<CallId> RemoteWorker::replicaUpdate(
197+
const SectorRef &sector, const std::vector<PieceInfo> &pieces) {
198+
return api_.ReplicaUpdate(sector, pieces);
199+
}
200+
201+
outcome::result<CallId> RemoteWorker::proveReplicaUpdate1(
202+
const SectorRef &sector,
203+
const CID &sector_key,
204+
const CID &new_sealed,
205+
const CID &new_unsealed) {
206+
return api_.ProveReplicaUpdate1(
207+
sector, sector_key, new_sealed, new_unsealed);
208+
}
209+
210+
outcome::result<CallId> RemoteWorker::proveReplicaUpdate2(
211+
const SectorRef &sector,
212+
const CID &sector_key,
213+
const CID &new_sealed,
214+
const CID &new_unsealed,
215+
const Update1Output &update_1_output) {
216+
return api_.ProveReplicaUpdate2(
217+
sector, sector_key, new_sealed, new_unsealed, update_1_output);
218+
}
219+
190220
outcome::result<CallId> RemoteWorker::moveStorage(const SectorRef &sector,
191221
SectorFileType types) {
192222
return api_.MoveStorage(sector, types);

0 commit comments

Comments
 (0)