Skip to content

Commit 4403ba0

Browse files
authored
AddPiece (#559)
* AddPiece Signed-off-by: elestrias <[email protected]> * AddPiece http connection Signed-off-by: elestrias <[email protected]> * integration to NullPieceData Signed-off-by: elestrias <[email protected]>
1 parent fb80af1 commit 4403ba0

File tree

9 files changed

+208
-12
lines changed

9 files changed

+208
-12
lines changed

core/api/rpc/json.hpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ namespace fc::api {
6060
using primitives::block::ElectionProof;
6161
using primitives::block::Ticket;
6262
using primitives::cid::getCidOfCbor;
63+
using primitives::piece::ReaderType;
6364
using primitives::piece::UnpaddedPieceSize;
6465
using primitives::sector::PoStProof;
6566
using primitives::sector::RegisteredPoStProof;
@@ -1363,8 +1364,8 @@ namespace fc::api {
13631364
v = std::move(_v);
13641365
}
13651366

1366-
// can be generic
1367-
ENCODE(gsl::span<const PieceInfo>) {
1367+
template <typename T>
1368+
ENCODE(gsl::span<T>) {
13681369
Value j{rapidjson::kArrayType};
13691370
j.Reserve(v.size(), allocator);
13701371
for (const auto &elem : v) {
@@ -2120,6 +2121,20 @@ namespace fc::api {
21202121
}
21212122
}
21222123

2124+
ENCODE(MetaPieceData) {
2125+
Value j{rapidjson::kObjectType};
2126+
Set(j, "Type", v.type.toString());
2127+
Set(j, "Info", v.info);
2128+
return j;
2129+
}
2130+
2131+
DECODE(MetaPieceData) {
2132+
std::string type;
2133+
Get(j, "Type", type);
2134+
v.type = ReaderType::fromString(type);
2135+
Get(j, "Info", v.info);
2136+
}
2137+
21232138
template <typename T,
21242139
typename = std::enable_if_t<!std::is_same_v<T, uint8_t>>>
21252140
ENCODE(std::vector<T>) {

core/api/rpc/wsc.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,15 @@ namespace fc::api::rpc {
3434
OUTCOME_TRY(
3535
port,
3636
address.getFirstValueForProtocol(libp2p::multi::Protocol::Code::TCP));
37+
return connect(ip, port, target, token);
38+
}
39+
40+
outcome::result<void> Client::connect(const std::string &host,
41+
const std::string &port,
42+
const std::string &target,
43+
const std::string &token) {
3744
boost::system::error_code ec;
38-
socket.next_layer().connect({boost::asio::ip::make_address(ip),
45+
socket.next_layer().connect({boost::asio::ip::make_address(host),
3946
boost::lexical_cast<uint16_t>(port)},
4047
ec);
4148
if (ec) {
@@ -48,7 +55,7 @@ namespace fc::api::rpc {
4855
"Bearer " + token);
4956
}));
5057
}
51-
socket.handshake(ip, target, ec);
58+
socket.handshake(host, target, ec);
5259
if (ec) {
5360
return ec;
5461
}

core/api/rpc/wsc.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ namespace fc::api::rpc {
4141
outcome::result<void> connect(const Multiaddress &address,
4242
const std::string &target,
4343
const std::string &token);
44+
45+
outcome::result<void> connect(const std::string &host,
46+
const std::string &port,
47+
const std::string &target,
48+
const std::string &token);
49+
4450
void call(Request &&req, ResultCb &&cb);
4551
void _chan(uint64_t id, ChanCb &&cb);
4652
void _error(const std::error_code &error);

core/api/setup_common.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace fc::api {
2828

2929
return Bytes(token.begin(), token.end());
3030
};
31+
3132
api->AuthVerify =
3233
[verifier{std::move(verifier)},
3334
logger](auto token) -> outcome::result<std::vector<Permission>> {

core/api/worker_api.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace fc::api {
1515
using primitives::jwt::kAdminPermission;
16+
using primitives::piece::MetaPieceData;
1617
using primitives::piece::PieceInfo;
1718
using primitives::piece::UnpaddedByteIndex;
1819
using primitives::piece::UnpaddedPieceSize;
@@ -30,7 +31,13 @@ namespace fc::api {
3031
using sector_storage::SectorFileType;
3132

3233
struct WorkerApi {
33-
// TODO(ortyomka): [FIL-344] add AddPiece function
34+
API_METHOD(AddPiece,
35+
kAdminPermission,
36+
CallId,
37+
SectorRef,
38+
std::vector<UnpaddedPieceSize>,
39+
UnpaddedPieceSize,
40+
MetaPieceData)
3441

3542
API_METHOD(Fetch,
3643
kAdminPermission,
@@ -102,6 +109,7 @@ namespace fc::api {
102109

103110
template <typename A, typename F>
104111
void visit(const WorkerApi &, A &&a, const F &f) {
112+
f(a.AddPiece);
105113
f(a.Fetch);
106114
f(a.FinalizeSector);
107115
f(a.Info);

core/primitives/piece/impl/piece_data.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,27 @@ namespace fc::primitives::piece {
6161
other.is_null_data_ = false;
6262
return *this;
6363
}
64+
65+
int PieceData::release(){
66+
assert(!is_null_data_);
67+
const int temp = fd_;
68+
fd_ = kUnopenedFileDescriptor;
69+
return temp;
70+
}
71+
72+
const std::array<std::string, 3> ReaderType::types{"undefined", "null", "push"};
73+
74+
const std::string ReaderType::toString() const {
75+
return types.at(reader_type);
76+
}
77+
78+
ReaderType ReaderType::fromString(const std::string &type){
79+
for(size_t i = 0; i < types.size(); i++){
80+
if(type == types.at(i)){
81+
return ReaderType(Type{i});
82+
}
83+
}
84+
return ReaderType(Type::kUndefined);
85+
}
86+
6487
} // namespace fc::primitives::piece

core/primitives/piece/piece_data.hpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
#pragma once
77

88
#include <fcntl.h>
9+
#include <array>
910
#include <string>
11+
#include <boost/optional.hpp>
1012

1113
namespace fc::primitives::piece {
1214

@@ -29,6 +31,8 @@ namespace fc::primitives::piece {
2931

3032
bool isOpened() const;
3133

34+
[[nodiscard]] int release();
35+
3236
bool isNullData() const;
3337

3438
static PieceData makeNull();
@@ -40,4 +44,30 @@ namespace fc::primitives::piece {
4044
bool is_null_data_;
4145
};
4246

47+
class ReaderType {
48+
public:
49+
enum Type : uint64_t {
50+
kUndefined = 0,
51+
kNullReader = 1,
52+
kPushStreamReader = 2
53+
} reader_type;
54+
55+
explicit ReaderType(Type type) : reader_type(type){};
56+
57+
static const std::array<std::string, 3> types;
58+
59+
const std::string toString() const;
60+
61+
static ReaderType fromString(const std::string &type);
62+
};
63+
64+
class MetaPieceData {
65+
public:
66+
MetaPieceData(std::string uuid, ReaderType::Type type)
67+
: info(std::move(uuid)), type(ReaderType(type)){};
68+
MetaPieceData():type(ReaderType::fromString("undefined")){};
69+
std::string info;
70+
ReaderType type;
71+
};
72+
4373
} // namespace fc::primitives::piece

core/sector_storage/impl/remote_worker.cpp

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,44 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6+
#include <boost/beast/http.hpp>
7+
#include <boost/uuid/uuid_generators.hpp>
8+
#include <boost/uuid/uuid_io.hpp>
9+
#include "primitives/jwt/jwt.hpp"
10+
11+
#include "common/uri_parser/uri_parser.hpp"
12+
#include "drand/impl/http.cpp"
13+
#include "primitives/piece/piece_data.hpp"
614
#include "sector_storage/impl/remote_worker.hpp"
715

816
namespace fc::sector_storage {
17+
using drand::http::ClientSession;
18+
using primitives::piece::MetaPieceData;
19+
using primitives::piece::ReaderType;
20+
namespace beast = boost::beast;
21+
namespace net = boost::asio;
22+
using tcp = net::ip::tcp;
23+
namespace uuids = boost::uuids;
24+
namespace http = boost::beast::http;
25+
using common::HttpUri;
26+
using primitives::jwt::kAdminPermission;
927

1028
outcome::result<std::shared_ptr<RemoteWorker>>
1129
RemoteWorker::connectRemoteWorker(io_context &context,
1230
const std::shared_ptr<CommonApi> &api,
1331
const Multiaddress &address) {
14-
auto token = "stub"; // get token from common api
15-
32+
OUTCOME_TRY(token, api->AuthNew({kAdminPermission}));
1633
struct make_unique_enabler : public RemoteWorker {
17-
make_unique_enabler(io_context &context) : RemoteWorker{context} {};
34+
explicit make_unique_enabler(io_context &context)
35+
: RemoteWorker{context} {};
1836
};
1937

2038
std::unique_ptr<RemoteWorker> r_worker =
2139
std::make_unique<make_unique_enabler>(context);
2240

2341
r_worker->wsc_.setup(r_worker->api_);
2442

25-
OUTCOME_TRY(r_worker->wsc_.connect(address, "/rpc/v0", token));
43+
OUTCOME_TRY(r_worker->wsc_.connect(address, "/rpc/v0", std::string(token.begin(), token.end())));
2644

2745
return std::move(r_worker);
2846
}
@@ -41,15 +59,100 @@ namespace fc::sector_storage {
4159
return api_.Paths();
4260
}
4361

44-
RemoteWorker::RemoteWorker(io_context &context) : wsc_(context) {}
62+
RemoteWorker::RemoteWorker(io_context &context)
63+
: wsc_(context), io_(context) {}
64+
65+
struct PieceDataSender {
66+
explicit PieceDataSender(io_context &io)
67+
: resolver_{io}, stream_{net::make_strand(io)} {}
68+
PieceDataSender(const PieceDataSender &) = delete;
69+
PieceDataSender(PieceDataSender &&) = delete;
70+
~PieceDataSender() {
71+
boost::system::error_code ec;
72+
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
73+
}
74+
PieceDataSender &operator=(const PieceDataSender &) = delete;
75+
PieceDataSender &operator=(PieceDataSender &&) = delete;
76+
77+
static void send(
78+
int fd,
79+
io_context &io,
80+
const std::string &host,
81+
const std::string &port,
82+
const std::string &target,
83+
const std::uint64_t piece_size,
84+
const std::function<void(outcome::result<std::string>)> &cb) {
85+
auto sender{std::make_shared<PieceDataSender>(io)};
86+
boost::system::error_code error;
87+
boost::beast::file_posix fp;
88+
fp.native_handle(fd);
89+
90+
sender->file_req_.body().reset(std::move(fp), error);
91+
sender->file_req_.method(http::verb::post);
92+
sender->file_req_.target(target);
93+
sender->file_req_.set(http::field::host, host);
94+
sender->file_req_.set(http::field::content_length, piece_size);
95+
sender->resolver_.async_resolve(
96+
host, port, [sender, MOVE(cb)](auto &&ec, auto &&iterator) {
97+
EC_CB();
98+
sender->stream_.async_connect(
99+
iterator, [sender, MOVE(cb)](auto &&ec, auto &&) {
100+
EC_CB();
101+
http::async_write(
102+
sender->stream_,
103+
sender->file_req_,
104+
[sender, MOVE(cb)](auto &&ec, auto &&) {
105+
EC_CB();
106+
http::async_read(
107+
sender->stream_,
108+
sender->buffer_,
109+
sender->res_,
110+
[sender, MOVE(cb)](auto &&ec, auto &&) {
111+
EC_CB();
112+
cb(std::move(sender->res_.body()));
113+
});
114+
});
115+
});
116+
});
117+
}
118+
119+
private:
120+
http::request<http::file_body> file_req_;
121+
tcp::resolver resolver_;
122+
beast::tcp_stream stream_;
123+
beast::flat_buffer buffer_;
124+
http::response<http::string_body> res_;
125+
};
45126

46127
outcome::result<CallId> RemoteWorker::addPiece(
47128
const SectorRef &sector,
48129
gsl::span<const UnpaddedPieceSize> piece_sizes,
49130
const UnpaddedPieceSize &new_piece_size,
50131
PieceData piece_data) {
51-
return WorkerErrors::kUnsupportedCall; // TODO(ortyomka): [FIL-344] add
52-
// functionality
132+
MetaPieceData meta_data =
133+
piece_data.isNullData()
134+
? MetaPieceData(std::to_string(new_piece_size),
135+
ReaderType::Type::kNullReader)
136+
: MetaPieceData(uuids::to_string(uuids::random_generator()()),
137+
ReaderType::Type::kPushStreamReader);
138+
if (!piece_data.isNullData()) {
139+
PieceDataSender::send(
140+
piece_data.release(),
141+
io_,
142+
host_,
143+
port_,
144+
"/rpc/streams/v0/push/" + meta_data.info,
145+
new_piece_size,
146+
[](const outcome::result<std::string> &res) {
147+
if (res.has_error()) {
148+
spdlog::error("Transfer of pieces was finished with error: {}", res.value());
149+
} else {
150+
spdlog::info("Transfer of pieces was finished with response {}",
151+
res.value());
152+
}
153+
});
154+
}
155+
return api_.AddPiece(sector, std::vector<UnpaddedPieceSize>(piece_sizes.begin(), piece_sizes.end()), new_piece_size, meta_data);
53156
}
54157

55158
outcome::result<CallId> RemoteWorker::sealPreCommit1(

core/sector_storage/impl/remote_worker.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,8 @@ namespace fc::sector_storage {
8686

8787
WorkerApi api_;
8888
Client wsc_;
89+
io_context & io_;
90+
std::string host_;
91+
std::string port_;
8992
};
9093
} // namespace fc::sector_storage

0 commit comments

Comments
 (0)