Skip to content

Commit 6f2fb80

Browse files
authored
paych maker (#496)
Signed-off-by: turuslan <[email protected]>
1 parent 6023790 commit 6f2fb80

File tree

11 files changed

+389
-153
lines changed

11 files changed

+389
-153
lines changed

core/api/impl/paych_get.hpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include "api/full_node/node_api.hpp"
9+
#include "payment_channel_manager/impl/maker.hpp"
10+
11+
namespace fc::api {
12+
inline void implPaychGet(
13+
const std::shared_ptr<FullNodeApi> &api,
14+
const std::shared_ptr<paych_maker::PaychMaker> &maker) {
15+
api->PaychGet = [=](auto &&cb,
16+
const Address &from,
17+
const Address &to,
18+
const TokenAmount &amount) {
19+
maker->make({from, to}, amount, std::move(cb));
20+
};
21+
}
22+
} // namespace fc::api

core/node/main/builder.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <libp2p/protocol/kademlia/impl/validator_default.hpp>
2323

2424
#include "api/full_node/make.hpp"
25+
#include "api/impl/paych_get.hpp"
2526
#include "api/setup_common.hpp"
2627
#include "blockchain/block_validator/impl/block_validator_impl.hpp"
2728
#include "blockchain/impl/weight_calculator_impl.hpp"
@@ -631,6 +632,11 @@ namespace fc::node {
631632
o.market_discovery,
632633
o.retrieval_market_client,
633634
o.wallet_default_address);
635+
api::implPaychGet(
636+
o.api,
637+
std::make_shared<paych_maker::PaychMaker>(
638+
o.api,
639+
std::make_shared<storage::MapPrefix>("paych_maker/", o.kv_store)));
634640

635641
api::fillAuthApi(o.api, api_secret, log());
636642

core/payment_channel_manager/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#
55

66
add_library(payment_channel_manager
7+
impl/maker.cpp
78
impl/payment_channel_manager_impl.cpp
89
impl/payment_channel_manager_error.cpp
910
)
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "payment_channel_manager/impl/maker.hpp"
7+
8+
#include "api/full_node/node_api.hpp"
9+
#include "common/outcome_fmt.hpp"
10+
#include "vm/actor/builtin/v0/init/init_actor.hpp"
11+
#include "vm/actor/builtin/v0/payment_channel/payment_channel_actor.hpp"
12+
#include "vm/toolchain/toolchain.hpp"
13+
14+
namespace fc::paych_maker {
15+
using vm::actor::builtin::v0::init::Exec;
16+
using vm::message::UnsignedMessage;
17+
18+
inline auto &log() {
19+
static auto log{common::createLogger("PaychMaker")};
20+
return log;
21+
}
22+
23+
inline UnsignedMessage msgCreate(const PaychMaker::It &it,
24+
NetworkVersion network) {
25+
using vm::actor::builtin::v0::payment_channel::Construct;
26+
const auto &[from_to, queue]{*it};
27+
return {
28+
vm::actor::kInitAddress,
29+
from_to.first,
30+
{},
31+
*queue.row.waiting_amount,
32+
{},
33+
{},
34+
Exec::Number,
35+
codec::cbor::encode(
36+
Exec::Params{
37+
vm::toolchain::Toolchain::createAddressMatcher(network)
38+
->getPaymentChannelCodeId(),
39+
codec::cbor::encode(Construct::Params{
40+
from_to.first,
41+
from_to.second,
42+
})
43+
.value(),
44+
})
45+
.value(),
46+
};
47+
}
48+
49+
inline UnsignedMessage msgAdd(const PaychMaker::It &it) {
50+
const auto &[from_to, queue]{*it};
51+
return {
52+
*queue.row.actor,
53+
from_to.first,
54+
{},
55+
*queue.row.waiting_amount - queue.row.unused_amount,
56+
{},
57+
{},
58+
vm::actor::kSendMethodNumber,
59+
{},
60+
};
61+
}
62+
63+
inline void shift(Queue &queue) {
64+
assert(!queue.row.waiting_cid);
65+
assert(queue.waiting_cb.empty());
66+
std::swap(queue.waiting_cb, queue.pending_cb);
67+
queue.row.waiting_amount = std::move(queue.pending_amount);
68+
queue.pending_amount = {};
69+
}
70+
71+
Bytes Row::key(const FromTo &from_to) {
72+
Bytes key;
73+
append(key, encode(from_to.first));
74+
append(key, encode(from_to.second));
75+
return key;
76+
}
77+
78+
void Queue::save() {
79+
key.setCbor(row);
80+
}
81+
82+
const CID &UnusedCid::get() {
83+
if (!cid) {
84+
cid = key.getCbor<CID>();
85+
}
86+
return *cid;
87+
}
88+
89+
void UnusedCid::set(const CID &new_cid) {
90+
cid = new_cid;
91+
key.setCbor(new_cid);
92+
}
93+
94+
PaychMaker::PaychMaker(const ApiPtr &api, const MapPtr &kv)
95+
: api{api}, kv{kv}, unused_cid{{"unused_cid", kv}} {}
96+
97+
void PaychMaker::make(const FromTo &from_to,
98+
const TokenAmount &amount,
99+
Cb cb) {
100+
std::unique_lock lock{mutex};
101+
auto it{map.find(from_to)};
102+
const auto empty{it == map.end()};
103+
if (empty) {
104+
it = map.emplace(from_to, Queue{{Row::key(from_to), kv}}).first;
105+
}
106+
auto &queue{it->second};
107+
queue.pending_amount += amount;
108+
queue.pending_cb.emplace_back(std::move(cb));
109+
if (empty) {
110+
if (queue.key.has()) {
111+
queue.key.getCbor(queue.row);
112+
if (const auto &cid{queue.row.waiting_cid}) {
113+
lock.unlock();
114+
api->StateWaitMsg([=](auto &&_wait) { onWait(it, std::move(_wait)); },
115+
*cid,
116+
kMessageConfidence,
117+
api::kLookbackNoLimit,
118+
true);
119+
return;
120+
}
121+
} else {
122+
lock.unlock();
123+
api->StateNetworkVersion(
124+
[=](auto &&_network) { onNetwork(it, _network); }, {});
125+
return;
126+
}
127+
}
128+
lock.unlock();
129+
next(it);
130+
}
131+
132+
void PaychMaker::onNetwork(It it, outcome::result<NetworkVersion> _network) {
133+
std::unique_lock lock{mutex};
134+
auto &queue{it->second};
135+
assert(!queue.row.actor);
136+
assert(!queue.row.waiting_cid);
137+
if (!_network) {
138+
log()->error("StateNetworkVersion {:#}", _network.error());
139+
onError(it, _network.error());
140+
return;
141+
}
142+
const auto &network{_network.value()};
143+
shift(queue);
144+
const auto msg{msgCreate(it, network)};
145+
lock.unlock();
146+
api->MpoolPushMessage([=](auto &&_smsg) { onPush(it, std::move(_smsg)); },
147+
msg,
148+
api::kPushNoSpec);
149+
}
150+
151+
void PaychMaker::onPush(It it, outcome::result<SignedMessage> _smsg) {
152+
std::unique_lock lock{mutex};
153+
auto &queue{it->second};
154+
assert(!queue.row.waiting_cid);
155+
if (!_smsg) {
156+
log()->error("MpoolPushMessage {:#}", _smsg.error());
157+
queue.row.waiting_amount.reset();
158+
queue.save();
159+
onError(it, _smsg.error());
160+
return;
161+
}
162+
const auto &smsg{_smsg.value()};
163+
const auto cid{smsg.getCid()};
164+
queue.row.waiting_cid = cid;
165+
queue.save();
166+
lock.unlock();
167+
api->StateWaitMsg([=](auto &&_wait) { onWait(it, std::move(_wait)); },
168+
cid,
169+
kMessageConfidence,
170+
api::kLookbackNoLimit,
171+
true);
172+
}
173+
174+
void PaychMaker::onWait(It it, outcome::result<MsgWait> _wait) {
175+
std::unique_lock lock{mutex};
176+
auto &queue{it->second};
177+
const auto cid{*queue.row.waiting_cid};
178+
if (!_wait) {
179+
log()->error("StateWaitMsg {} {:#}", cid, _wait.error());
180+
onError(it, _wait.error());
181+
return;
182+
}
183+
const auto &wait{_wait.value()};
184+
if (wait.receipt.exit_code != vm::VMExitCode::kOk) {
185+
queue.row.waiting_amount.reset();
186+
queue.row.waiting_cid.reset();
187+
queue.save();
188+
onError(it, wait.receipt.exit_code);
189+
return;
190+
}
191+
if (!queue.row.actor) {
192+
auto _result{
193+
codec::cbor::decode<Exec::Result>(wait.receipt.return_value)};
194+
if (!_result) {
195+
log()->error("onWait result decode {}",
196+
common::hex_lower(wait.receipt.return_value));
197+
onError(it, _result.error());
198+
return;
199+
}
200+
const auto &result{_result.value()};
201+
queue.row.actor = result.robust_address;
202+
}
203+
queue.row.total_amount += *queue.row.waiting_amount;
204+
if (queue.waiting_cb.empty()) {
205+
queue.row.unused_amount += *queue.row.waiting_amount;
206+
log()->info("unused + {} = {}",
207+
*queue.row.waiting_amount,
208+
queue.row.unused_amount);
209+
} else if (auto reused{std::min<TokenAmount>(queue.row.unused_amount,
210+
*queue.row.waiting_amount)}) {
211+
queue.row.unused_amount -= reused;
212+
log()->info("unused - {} = {}", reused, queue.row.unused_amount);
213+
}
214+
queue.row.waiting_amount.reset();
215+
queue.row.waiting_cid.reset();
216+
queue.save();
217+
unused_cid.set(cid);
218+
AddChannelInfo result{*queue.row.actor, cid};
219+
for (const auto &cb : queue.waiting_cb) {
220+
cb(result);
221+
}
222+
queue.waiting_cb.clear();
223+
lock.unlock();
224+
next(it);
225+
}
226+
227+
void PaychMaker::next(It it) {
228+
std::unique_lock lock{mutex};
229+
auto &queue{it->second};
230+
if (!queue.row.actor || queue.row.waiting_cid || queue.row.waiting_amount
231+
|| queue.pending_cb.empty()) {
232+
return;
233+
}
234+
if (queue.row.unused_amount >= queue.pending_amount) {
235+
queue.row.unused_amount -= queue.pending_amount;
236+
queue.save();
237+
log()->info(
238+
"unused - {} = {}", queue.pending_amount, queue.row.unused_amount);
239+
queue.pending_amount = {};
240+
AddChannelInfo result{*queue.row.actor, unused_cid.get()};
241+
for (const auto &cb : queue.pending_cb) {
242+
cb(result);
243+
}
244+
queue.pending_cb.clear();
245+
return;
246+
}
247+
shift(queue);
248+
queue.save();
249+
const auto msg{msgAdd(it)};
250+
lock.unlock();
251+
api->MpoolPushMessage([=](auto &&_smsg) { onPush(it, std::move(_smsg)); },
252+
msg,
253+
api::kPushNoSpec);
254+
}
255+
256+
void PaychMaker::onError(It it, std::error_code ec) {
257+
auto &queue{it->second};
258+
for (const auto &cb : queue.waiting_cb) {
259+
cb(ec);
260+
}
261+
queue.waiting_cb.clear();
262+
queue.pending_amount = {};
263+
for (const auto &cb : queue.pending_cb) {
264+
cb(ec);
265+
}
266+
queue.pending_cb.clear();
267+
map.erase(it);
268+
}
269+
} // namespace fc::paych_maker

0 commit comments

Comments
 (0)