Skip to content

Commit ea715ad

Browse files
authored
fixes (#481)
Signed-off-by: turuslan <[email protected]>
1 parent c990990 commit ea715ad

File tree

7 files changed

+52
-20
lines changed

7 files changed

+52
-20
lines changed

core/miner/storage_fsm/impl/precommit_batcher_impl.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@ namespace fc::mining {
2929
: max_delay_(max_time),
3030
api_(std::move(api)),
3131
miner_address_(miner_address),
32+
scheduler_(scheduler),
3233
closest_cutoff_(max_time),
3334
fee_config_(std::move(fee_config)),
3435
address_selector_(address_selector) {
3536
cutoff_start_ = std::chrono::system_clock::now();
3637
logger_ = common::createLogger("batcher");
3738
logger_->info("Batcher has been started");
38-
handle_ = scheduler->scheduleWithHandle(
39+
reschedule(max_delay_);
40+
}
41+
42+
void PreCommitBatcherImpl::reschedule(std::chrono::milliseconds time) {
43+
handle_ = scheduler_->scheduleWithHandle(
3944
[&]() {
4045
std::unique_lock<std::mutex> locker(mutex_);
4146
const auto maybe_result = sendBatch();
@@ -45,9 +50,11 @@ namespace fc::mining {
4550
callbacks_.clear();
4651
cutoff_start_ = std::chrono::system_clock::now();
4752
closest_cutoff_ = max_delay_;
48-
handle_.reschedule(max_delay_);
53+
54+
// reschedule during scheduler callback, will not throw
55+
handle_.reschedule(max_delay_).value();
4956
},
50-
max_delay_);
57+
time);
5158
}
5259

5360
outcome::result<CID> PreCommitBatcherImpl::sendBatch() {
@@ -103,7 +110,7 @@ namespace fc::mining {
103110
callbacks_.clear();
104111
cutoff_start_ = std::chrono::system_clock::now();
105112
closest_cutoff_ = max_delay_;
106-
handle_.reschedule(max_delay_);
113+
reschedule(max_delay_);
107114
}
108115

109116
void PreCommitBatcherImpl::setPreCommitCutoff(const ChainEpoch &current_epoch,
@@ -131,7 +138,7 @@ namespace fc::mining {
131138
std::chrono::system_clock::now() - cutoff_start_)
132139
> temp_cutoff)) {
133140
cutoff_start_ = std::chrono::system_clock::now();
134-
handle_.reschedule(temp_cutoff);
141+
reschedule(temp_cutoff);
135142
closest_cutoff_ = temp_cutoff;
136143
}
137144
}

core/miner/storage_fsm/impl/precommit_batcher_impl.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ namespace fc::mining {
5757
std::chrono::milliseconds max_delay_;
5858
std::shared_ptr<FullNodeApi> api_;
5959
Address miner_address_;
60+
std::shared_ptr<Scheduler> scheduler_;
6061
Scheduler::Handle handle_;
6162
std::chrono::milliseconds closest_cutoff_;
6263
std::chrono::system_clock::time_point cutoff_start_;
@@ -71,6 +72,8 @@ namespace fc::mining {
7172
const SectorInfo &sector_info);
7273

7374
outcome::result<CID> sendBatch();
75+
76+
void reschedule(std::chrono::milliseconds time);
7477
};
7578

7679
} // namespace fc::mining

core/sector_storage/stores/impl/local_store.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,8 @@ namespace fc::sector_storage::stores {
542542
}
543543
}
544544

545-
handler_.reschedule(heartbeat_interval_);
545+
// reschedule during scheduler callback, will not throw
546+
handler_.reschedule(heartbeat_interval_).value();
546547
}
547548

548549
outcome::result<FsStat> LocalStoreImpl::Path::getStat(

core/storage/ipfs/graphsync/impl/network/peer_context.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,11 @@ namespace fc::storage::ipfs::graphsync {
419419
closeStream(std::move(stream), RS_TIMEOUT);
420420
}
421421

422+
// reschedule during scheduler callback, will not throw
422423
if (!streams_.empty() && max_expire_time > now) {
423-
timer_.reschedule(max_expire_time - now);
424+
timer_.reschedule(max_expire_time - now).value();
424425
} else {
425-
timer_.reschedule(kPeerCloseDelayMsec);
426+
timer_.reschedule(kPeerCloseDelayMsec).value();
426427
}
427428
}
428429

core/storage/mpool/mpool.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ namespace fc::storage::mpool {
460460
mpool->ipld = env_context.ipld;
461461
mpool->head_sub =
462462
chain_store->subscribeHeadChanges([=](const auto &changes) {
463+
std::unique_lock lock{mpool->mutex};
463464
for (const auto &change : changes) {
464465
auto res{mpool->onHeadChange(change)};
465466
if (!res) {
@@ -474,6 +475,7 @@ namespace fc::storage::mpool {
474475
}
475476

476477
std::vector<SignedMessage> MessagePool::pending() const {
478+
std::unique_lock lock{mutex};
477479
std::vector<SignedMessage> messages;
478480
for (auto &[addr, pending] : by_from) {
479481
for (auto &[nonce, message] : pending) {
@@ -506,6 +508,7 @@ namespace fc::storage::mpool {
506508

507509
outcome::result<std::vector<SignedMessage>> MessagePool::select(
508510
TipsetCPtr ts, double ticket_quality) const {
511+
std::unique_lock lock{mutex};
509512
OUTCOME_TRY(base_fee, ts->nextBaseFee(env_context.ipld));
510513
vm::runtime::Pricelist pricelist{ts->epoch()};
511514
OUTCOME_TRY(cached, env_context.interpreter_cache->get(ts->key));
@@ -577,6 +580,7 @@ namespace fc::storage::mpool {
577580
}
578581

579582
outcome::result<Nonce> MessagePool::nonce(const Address &from) const {
583+
std::unique_lock lock{mutex};
580584
assert(from.isKeyType());
581585
OUTCOME_TRY(interpeted, env_context.interpreter_cache->get(head->key));
582586
OUTCOME_TRY(actor,
@@ -593,6 +597,7 @@ namespace fc::storage::mpool {
593597

594598
outcome::result<void> MessagePool::estimate(
595599
UnsignedMessage &message, const TokenAmount &max_fee) const {
600+
std::unique_lock lock{mutex};
596601
assert(message.from.isKeyType());
597602
if (message.gas_limit == 0) {
598603
auto msg{message};
@@ -715,6 +720,11 @@ namespace fc::storage::mpool {
715720
}
716721

717722
outcome::result<void> MessagePool::add(const SignedMessage &message) {
723+
std::unique_lock lock{mutex};
724+
return addLocked(message);
725+
}
726+
727+
outcome::result<void> MessagePool::addLocked(const SignedMessage &message) {
718728
if (message.signature.isBls()) {
719729
bls_cache.insert(message.getCid(), message.signature);
720730
}
@@ -745,10 +755,10 @@ namespace fc::storage::mpool {
745755
} else {
746756
if (bls) {
747757
if (auto sig{bls_cache.get(cid)}) {
748-
OUTCOME_TRY(add({*msg, *sig}));
758+
OUTCOME_TRY(addLocked({*msg, *sig}));
749759
}
750760
} else {
751-
OUTCOME_TRY(add(*smsg));
761+
OUTCOME_TRY(addLocked(*smsg));
752762
}
753763
}
754764
return outcome::success();

core/storage/mpool/mpool.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ namespace fc::storage::mpool {
5555
int64_t max_blocks) const;
5656
outcome::result<TokenAmount> estimateGasPremium(int64_t max_blocks) const;
5757
outcome::result<void> add(const SignedMessage &message);
58+
outcome::result<void> addLocked(const SignedMessage &message);
5859
void remove(const Address &from, Nonce nonce);
5960
outcome::result<void> onHeadChange(const HeadChange &change);
6061
connection_t subscribe(const std::function<Subscriber> &subscriber) {
@@ -73,6 +74,7 @@ namespace fc::storage::mpool {
7374
boost::signals2::signal<Subscriber> signal;
7475
mutable std::default_random_engine generator;
7576
mutable std::normal_distribution<> distribution;
77+
mutable std::mutex mutex;
7678
};
7779

7880
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

core/vm/actor/cgo/actors.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
#include "vm/runtime/env.hpp"
1515
#include "vm/toolchain/toolchain.hpp"
1616

17-
#define RUNTIME_METHOD(name) \
18-
void rt_##name(const std::shared_ptr<Runtime> &, \
19-
CborDecodeStream &, \
20-
CborEncodeStream &); \
21-
CBOR_METHOD(name) { \
22-
rt_##name(runtimes.at(arg.get<size_t>()), arg, ret); \
23-
} \
24-
void rt_##name(const std::shared_ptr<Runtime> &rt, \
25-
CborDecodeStream &arg, \
17+
#define RUNTIME_METHOD(name) \
18+
void rt_##name(const std::shared_ptr<Runtime> &, \
19+
CborDecodeStream &, \
20+
CborEncodeStream &); \
21+
CBOR_METHOD(name) { \
22+
std::unique_lock runtimes_lock{runtimes_mutex}; \
23+
auto &rt{runtimes.at(arg.get<size_t>())}; \
24+
runtimes_lock.unlock(); \
25+
rt_##name(rt, arg, ret); \
26+
} \
27+
void rt_##name(const std::shared_ptr<Runtime> &rt, \
28+
CborDecodeStream &arg, \
2629
CborEncodeStream &ret)
2730

2831
namespace fc::vm::actor::cgo {
@@ -55,6 +58,7 @@ namespace fc::vm::actor::cgo {
5558
constexpr auto kFatal{VMExitCode::kFatal};
5659
constexpr auto kOk{VMExitCode::kOk};
5760

61+
static std::mutex runtimes_mutex;
5862
static std::map<size_t, std::shared_ptr<Runtime>> runtimes;
5963
static size_t next_runtime{0};
6064

@@ -64,17 +68,21 @@ namespace fc::vm::actor::cgo {
6468
outcome::result<Buffer> invoke(const CID &code,
6569
const std::shared_ptr<Runtime> &runtime) {
6670
CborEncodeStream arg;
71+
std::unique_lock runtimes_lock{runtimes_mutex};
6772
auto id{next_runtime++}; // TODO: mod
73+
runtimes.emplace(id, runtime);
74+
runtimes_lock.unlock();
6875
const auto &message{runtime->getMessage().get()};
6976
auto version{runtime->getNetworkVersion()};
7077
const auto &base_fee{runtime->execution()->env->tipset->getParentBaseFee()};
7178
arg << id << version << base_fee << message.from << message.to
7279
<< runtime->getCurrentEpoch() << message.value << code << message.method
7380
<< message.params;
74-
runtimes.emplace(id, runtime);
7581
const auto _ret{cgoCall<cgoActorsInvoke>(arg)};
7682
CborDecodeStream ret{_ret};
83+
runtimes_lock.lock();
7784
runtimes.erase(id);
85+
runtimes_lock.unlock();
7886
auto exit{ret.get<VMExitCode>()};
7987
if (exit != kOk) {
8088
return exit;

0 commit comments

Comments
 (0)