Skip to content

Commit fe1a65e

Browse files
Feature/republish (#490)
mpool republish pending messages Signed-off-by: Alexey-N-Chernyshov <[email protected]>
1 parent 79c36ad commit fe1a65e

File tree

17 files changed

+485
-142
lines changed

17 files changed

+485
-142
lines changed

core/api/full_node/make.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ namespace fc::api {
333333
auto tag,
334334
auto epoch,
335335
auto &entropy) -> outcome::result<Randomness> {
336-
std::shared_lock ts_lock{*env_context.ts_branches_mutex};
336+
std::unique_lock ts_lock{*env_context.ts_branches_mutex};
337337
OUTCOME_TRY(ts_branch, TsBranch::make(ts_load, tipset_key, ts_main));
338338
return env_context.randomness->getRandomnessFromBeacon(
339339
ts_branch, tag, epoch, entropy);
@@ -343,7 +343,7 @@ namespace fc::api {
343343
auto tag,
344344
auto epoch,
345345
auto &entropy) -> outcome::result<Randomness> {
346-
std::shared_lock ts_lock{*env_context.ts_branches_mutex};
346+
std::unique_lock ts_lock{*env_context.ts_branches_mutex};
347347
OUTCOME_TRY(ts_branch, TsBranch::make(ts_load, tipset_key, ts_main));
348348
return env_context.randomness->getRandomnessFromTickets(
349349
ts_branch, tag, epoch, entropy);
@@ -353,7 +353,7 @@ namespace fc::api {
353353
};
354354
api->ChainGetTipSetByHeight =
355355
[=](auto height, auto tipset_key) -> outcome::result<TipsetCPtr> {
356-
std::shared_lock ts_lock{*env_context.ts_branches_mutex};
356+
std::unique_lock ts_lock{*env_context.ts_branches_mutex};
357357
if (tipset_key.cids().empty()) {
358358
tipset_key = chain_store->heaviestTipset()->key;
359359
}
@@ -589,7 +589,7 @@ namespace fc::api {
589589
OUTCOME_CB(auto context, tipsetContext(tipset_key, true));
590590
MiningBaseInfo info;
591591

592-
std::shared_lock ts_lock{*env_context.ts_branches_mutex};
592+
std::unique_lock ts_lock{*env_context.ts_branches_mutex};
593593
OUTCOME_CB(auto ts_branch,
594594
TsBranch::make(ts_load, tipset_key, ts_main));
595595
OUTCOME_CB(auto it, find(ts_branch, context.tipset->height()));
@@ -667,7 +667,8 @@ namespace fc::api {
667667
OUTCOME_TRY(signed_message,
668668
vm::message::MessageSignerImpl{key_store}.sign(message.from,
669669
message));
670-
OUTCOME_TRY(mpool->add(signed_message));
670+
OUTCOME_TRY(mpool->addLocal(signed_message));
671+
mpool->publish(signed_message);
671672
spdlog::info("MpoolPushMessage {}", signed_message.getCid());
672673
return std::move(signed_message);
673674
};
@@ -699,7 +700,7 @@ namespace fc::api {
699700
auto &tipset_key) -> outcome::result<InvocResult> {
700701
OUTCOME_TRY(context, tipsetContext(tipset_key));
701702

702-
std::shared_lock ts_lock{*env_context.ts_branches_mutex};
703+
std::unique_lock ts_lock{*env_context.ts_branches_mutex};
703704
OUTCOME_TRY(ts_branch, TsBranch::make(ts_load, tipset_key, ts_main));
704705
ts_lock.unlock();
705706

core/codec/cbor/fwd.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
namespace fc::codec::cbor {
1212
template <typename T>
13+
// NOLINTNEXTLINE(readability-redundant-declaration)
1314
outcome::result<Bytes> encode(const T &arg);
15+
1416
template <typename T>
17+
// NOLINTNEXTLINE(readability-redundant-declaration)
1518
outcome::result<T> decode(BytesIn input);
1619
} // namespace fc::codec::cbor

core/common/outcome_fmt.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct fmt::formatter<std::error_code, char, void> {
1919

2020
template <typename ParseContext>
2121
constexpr auto parse(ParseContext &ctx) {
22-
auto it{ctx.begin()};
22+
const auto *it{ctx.begin()};
2323
if (it != ctx.end() && *it == '#') {
2424
alt = true;
2525
++it;

core/const.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace fc {
1212
using primitives::ChainEpoch;
1313
using primitives::EpochDuration;
1414
using primitives::StoragePower;
15+
using primitives::TokenAmount;
1516

1617
constexpr int64_t kSecondsInHour{60 * 60};
1718
constexpr int64_t kSecondsInDay{24 * kSecondsInHour};
@@ -38,7 +39,7 @@ namespace fc {
3839
constexpr uint64_t kFilecoinPrecision{1000000000000000000};
3940
constexpr auto kGasLimitOverestimation{1.25};
4041
constexpr auto kMessageConfidence{5};
41-
constexpr auto kMinimumBaseFee{100};
42+
const TokenAmount kMinimumBaseFee{100};
4243
constexpr auto kPackingEfficiencyDenom{5};
4344
constexpr auto kPackingEfficiencyNum{4};
4445

core/node/main/builder.cpp

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,35 @@ namespace fc::node {
261261
timerLoop(scheduler, tick, cb);
262262
},
263263
tick);
264-
};
264+
}
265+
266+
/**
267+
* Creates and initializes message pool and sets timer for republishing.
268+
* @param node_objects
269+
* @return
270+
*/
271+
void createMessagePool(const Config &config, NodeObjects &o) {
272+
o.mpool = storage::mpool::MessagePool::create(o.env_context,
273+
o.ts_main,
274+
config.mpool_bls_cache_size,
275+
o.chain_store,
276+
o.pubsub_gate);
277+
// Republish pending messages
278+
// Delay from lotus
279+
// https://github.com/filecoin-project/lotus/blob/d9100981ada8b3186d906a4f4140b83a819d2299/chain/messagepool/messagepool.go#L58
280+
const auto republishTimeout{std::chrono::seconds(10 * kEpochDurationSeconds
281+
+ kPropagationDelaySecs)};
282+
timerLoop(o.scheduler, republishTimeout, [mpool{o.mpool}] {
283+
const auto res = mpool->republishPendingMessages();
284+
if (!res) {
285+
log()->error("Mpool republish error: {:#}", res.error());
286+
}
287+
});
288+
// batch message publishing with delay kRepublishBatchDelay
289+
timerLoop(o.scheduler,
290+
storage::mpool::kRepublishBatchDelay,
291+
[mpool{o.mpool}] { mpool->publishFromQueue(); });
292+
}
265293

266294
/**
267295
* Creates and intialises Storage Market Client
@@ -306,6 +334,7 @@ namespace fc::node {
306334
return outcome::success();
307335
}
308336

337+
// NOLINTNEXTLINE(readability-function-cognitive-complexity)
309338
outcome::result<NodeObjects> createNodeObjects(Config &config) {
310339
NodeObjects o;
311340

@@ -524,8 +553,7 @@ namespace fc::node {
524553

525554
log()->debug("Creating API...");
526555

527-
o.mpool = storage::mpool::MessagePool::create(
528-
o.env_context, o.ts_main, config.mpool_bls_cache_size, o.chain_store);
556+
createMessagePool(config, o);
529557

530558
auto msg_waiter = storage::blockchain::MsgWaiter::create(
531559
o.ts_load, o.ipld, o.io_context, o.chain_store);

core/node/main/main.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,6 @@ namespace fc {
277277
res.error());
278278
}
279279
})};
280-
o.api->MpoolPushMessage = [&, impl{o.api->MpoolPushMessage}](auto &arg1,
281-
auto &arg2) {
282-
auto res{impl(arg1, arg2)};
283-
if (res) {
284-
o.pubsub_gate->publish(res.value());
285-
}
286-
return res;
287-
};
288280

289281
Metrics metrics{o, start_time};
290282

core/storage/buffer_map.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
namespace fc::storage {
2323

24-
// TODO: BytesIn as key or remove all of them
24+
// TODO(ortyomka): change `K` to `cow_t<K>` in interfaces
2525
using BufferMap = face::GenericMap<Bytes, Bytes>;
2626

2727
using BufferBatch = face::WriteBatch<Bytes, Bytes>;

core/storage/mpool/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ add_library(mpool
66
)
77
target_link_libraries(mpool
88
message
9+
sync
910
)

0 commit comments

Comments
 (0)