Skip to content

Commit f73cb46

Browse files
authored
Feature/market data store (#476)
* SectorBlocks migration to Datastore Signed-off-by: elestrias <[email protected]> * Code upgrade, new test for duplicates Signed-off-by: elestrias <[email protected]> * sectorblocks pr fixes, test fixes * Fixes Signed-off-by: elestrias <[email protected]> * logger update Signed-off-by: elestrias <[email protected]>
1 parent 2f2cd4b commit f73cb46

File tree

7 files changed

+92
-15
lines changed

7 files changed

+92
-15
lines changed

core/api/storage_miner/storage_api.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ namespace fc::api {
5858
PaddedPieceSize offset;
5959
PaddedPieceSize length;
6060
};
61+
CBOR_TUPLE(PieceLocation, sector_number, offset, length)
6162

6263
// TODO(ortyomka): [FIL-421] implement it
6364
struct ApiSectorInfo {

core/miner/main/main.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,8 @@ namespace fc {
452452
prefixed("stored_ask/"), napi, *config.actor)};
453453
auto piece_storage{std::make_shared<storage::piece::PieceStorageImpl>(
454454
prefixed("storage_provider/"))};
455-
auto sector_blocks{std::make_shared<sectorblocks::SectorBlocksImpl>(miner)};
455+
auto sector_blocks{std::make_shared<sectorblocks::SectorBlocksImpl>(
456+
miner, prefixed("sealedblocks/"))};
456457
auto chain_events{std::make_shared<ChainEventsImpl>(
457458
napi, ChainEventsImpl::IsDealPrecommited{})};
458459
OUTCOME_TRY(chain_events->init());

core/sectorblocks/blocks.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace fc::sectorblocks {
3434

3535
enum class SectorBlocksError {
3636
kNotFoundDeal = 1,
37+
kDealAlreadyExist = 2,
3738
};
3839

3940
} // namespace fc::sectorblocks

core/sectorblocks/impl/blocks_impl.cpp

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,18 @@
44
*/
55

66
#include "sectorblocks/impl/blocks_impl.hpp"
7+
#include "adt/uvarint_key.hpp"
8+
#include "codec/cbor/cbor_codec.hpp"
79

810
namespace fc::sectorblocks {
9-
SectorBlocksImpl::SectorBlocksImpl(std::shared_ptr<Miner> miner)
10-
: miner_{miner} {}
11+
using adt::UvarintKeyer;
12+
using codec::cbor::decode;
13+
using codec::cbor::encode;
14+
15+
SectorBlocksImpl::SectorBlocksImpl(
16+
std::shared_ptr<Miner> miner,
17+
std::shared_ptr<DataStore> datastore)
18+
: miner_(std::move(miner)), storage_(std::move(datastore)) {}
1119

1220
outcome::result<PieceAttributes> SectorBlocksImpl::addPiece(
1321
UnpaddedPieceSize size,
@@ -28,21 +36,42 @@ namespace fc::sectorblocks {
2836
PaddedPieceSize offset,
2937
UnpaddedPieceSize size) {
3038
std::lock_guard lock(mutex_);
31-
storage_[deal_id].push_back(PieceLocation{
39+
40+
const Buffer key = Buffer(UvarintKeyer::encode(deal_id));
41+
std::vector<PieceLocation> new_data;
42+
const auto new_piece = PieceLocation{
3243
.sector_number = sector,
3344
.offset = offset,
3445
.length = size.padded(),
35-
});
36-
return outcome::success();
46+
};
47+
48+
if (storage_->contains(key)) {
49+
OUTCOME_TRY(stored_data, storage_->get(key));
50+
OUTCOME_TRY(decoded_out, decode<std::vector<PieceLocation>>(stored_data));
51+
if (find(decoded_out.begin(), decoded_out.end(), new_piece)
52+
== decoded_out.end()) {
53+
decoded_out.push_back(new_piece);
54+
new_data = std::move(decoded_out);
55+
} else {
56+
return SectorBlocksError::kDealAlreadyExist;
57+
}
58+
} else {
59+
new_data = {new_piece};
60+
}
61+
62+
OUTCOME_TRY(encoded_in, encode(new_data));
63+
return storage_->put(key, encoded_in);
3764
}
3865

3966
outcome::result<std::vector<PieceLocation>> SectorBlocksImpl::getRefs(
4067
DealId deal_id) const {
41-
auto refs = storage_.find(deal_id);
42-
if (refs == storage_.end()) {
43-
return SectorBlocksError::kNotFoundDeal;
68+
const Buffer key = Buffer(UvarintKeyer::encode(deal_id));
69+
if (storage_->contains(key)) {
70+
OUTCOME_TRY(stored_data, storage_->get(key));
71+
OUTCOME_TRY(decoded_out, decode<std::vector<PieceLocation>>(stored_data));
72+
return std::move(decoded_out);
4473
}
45-
return refs->second;
74+
return SectorBlocksError::kNotFoundDeal;
4675
}
4776

4877
std::shared_ptr<Miner> SectorBlocksImpl::getMiner() const {
@@ -55,7 +84,10 @@ OUTCOME_CPP_DEFINE_CATEGORY(fc::sectorblocks, SectorBlocksError, e) {
5584
switch (e) {
5685
case (SectorBlocksError::kNotFoundDeal):
5786
return "SectorBlocks: not found";
87+
case(SectorBlocksError::kDealAlreadyExist):
88+
return "SectorBlocks: piece already exist in provided deal";
5889
default:
5990
return "SectorBlocks: unknown error";
91+
6092
}
6193
}

core/sectorblocks/impl/blocks_impl.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
#pragma once
77

88
#include "sectorblocks/blocks.hpp"
9+
#include "storage/buffer_map.hpp"
910

1011
namespace fc::sectorblocks {
1112
using primitives::SectorNumber;
1213
using primitives::piece::PaddedPieceSize;
14+
using DataStore = storage::PersistentBufferMap;
1315

1416
class SectorBlocksImpl : public SectorBlocks {
1517
public:
16-
SectorBlocksImpl(std::shared_ptr<Miner> miner);
18+
SectorBlocksImpl(std::shared_ptr<Miner> miner,
19+
std::shared_ptr<DataStore> datastore);
1720

1821
outcome::result<PieceAttributes> addPiece(
1922
UnpaddedPieceSize size,
@@ -31,11 +34,12 @@ namespace fc::sectorblocks {
3134
PaddedPieceSize offset,
3235
UnpaddedPieceSize size);
3336

37+
// TODO(@Elestrias):[FIL-423] Make deletion of expired deal associations;
38+
3439
std::shared_ptr<Miner> miner_;
3540

3641
std::mutex mutex_;
37-
std::map<DealId, std::vector<PieceLocation>>
38-
storage_; // TODO(ortyomka): [FIL-353] change to DataStore
42+
std::shared_ptr<DataStore> storage_;
3943
};
4044

4145
} // namespace fc::sectorblocks

test/core/sectorblocks/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ addtest(sectorblocks_test
99
target_link_libraries(sectorblocks_test
1010
base_fs_test
1111
sectorblocks
12+
in_memory_storage
1213
)

test/core/sectorblocks/sectorblocks_test.cpp

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,27 @@
88
#include <gmock/gmock.h>
99
#include <gtest/gtest.h>
1010

11+
#include "storage/in_memory/in_memory_storage.hpp"
1112
#include "testutil/mocks/miner/miner_mock.hpp"
1213
#include "testutil/outcome.hpp"
1314

1415
namespace fc::sectorblocks {
1516
using api::PieceLocation;
1617
using miner::MinerMock;
18+
using storage::InMemoryStorage;
1719
using testing::_;
1820

1921
class SectorBlocksTest : public ::testing::Test {
2022
protected:
2123
void SetUp() override {
2224
miner_ = std::make_unique<MinerMock>();
23-
24-
sector_blocks_ = std::make_unique<SectorBlocksImpl>(miner_);
25+
datastore_ = std::make_shared<InMemoryStorage>();
26+
sector_blocks_ = std::make_unique<SectorBlocksImpl>(miner_, datastore_);
2527
}
2628

2729
std::shared_ptr<MinerMock> miner_;
2830
std::shared_ptr<SectorBlocks> sector_blocks_;
31+
std::shared_ptr<InMemoryStorage> datastore_;
2932
};
3033

3134
/**
@@ -90,4 +93,38 @@ namespace fc::sectorblocks {
9093
ASSERT_THAT(refs, testing::ElementsAre(result_ref));
9194
}
9295

96+
/**
97+
* @given sectorblocks, deal, size, and path
98+
* @when try to add two duplicate pieces to the same deal_id
99+
* @then EXPECT_OUTCOME_ERROR
100+
*/
101+
TEST_F(SectorBlocksTest, DuplicatePiece) {
102+
const DealInfo deal{
103+
.publish_cid = boost::none,
104+
.deal_id = 1,
105+
.deal_schedule =
106+
{
107+
.start_epoch = 10,
108+
.end_epoch = 11,
109+
},
110+
.is_keep_unsealed = false,
111+
};
112+
const UnpaddedPieceSize size(127);
113+
const std::string path = "/some/temp/path";
114+
115+
const PieceAttributes piece{
116+
.sector = 1,
117+
.offset = PaddedPieceSize(0),
118+
.size = UnpaddedPieceSize(127),
119+
};
120+
121+
EXPECT_CALL(*miner_, doAddPieceToAnySector(size, _, deal))
122+
.WillOnce(testing::Return(outcome::success(piece)))
123+
.WillOnce(testing::Return(outcome::success(piece)));
124+
125+
EXPECT_OUTCOME_EQ(sector_blocks_->addPiece(size, path, deal), piece);
126+
EXPECT_OUTCOME_ERROR(SectorBlocksError::kDealAlreadyExist,
127+
sector_blocks_->addPiece(size, path, deal));
128+
}
129+
93130
} // namespace fc::sectorblocks

0 commit comments

Comments
 (0)