Skip to content

Commit 383ef5a

Browse files
authored
Merge pull request #905 from evoskuil/master
Make address queries cancellable.
2 parents d22a8dd + 90ab65b commit 383ef5a

File tree

4 files changed

+176
-63
lines changed

4 files changed

+176
-63
lines changed

include/bitcoin/node/chasers/chaser.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,6 @@ class BCN_API chaser
163163
#define SUBSCRIBE_EVENTS(method, ...) \
164164
subscribe_events(BIND(method, __VA_ARGS__))
165165

166-
#define PARALLEL(method, ...) \
167-
boost::asio::post(threadpool_.service(), BIND(method, __VA_ARGS__));
168-
169166
} // namespace node
170167
} // namespace libbitcoin
171168

include/bitcoin/node/chasers/chaser_validate.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ class BCN_API chaser_validate
4242
void stop() NOEXCEPT override;
4343

4444
protected:
45+
/// Post a method in base or derived class in parallel (use PARALLEL).
46+
template <class Derived, typename Method, typename... Args>
47+
inline auto parallel(Method&& method, Args&&... args) NOEXCEPT
48+
{
49+
return boost::asio::post(threadpool_.service(),
50+
BIND_THIS(method, args));
51+
}
52+
4553
typedef network::race_unity<const code&, const database::tx_link&> race;
4654

4755
virtual bool handle_event(const code& ec, chase event_,

include/bitcoin/node/protocols/protocol_explore.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
#include <atomic>
2323
#include <memory>
2424
#include <optional>
25+
#include <set>
2526
#include <bitcoin/node/define.hpp>
2627
#include <bitcoin/node/protocols/protocol_html.hpp>
2728

2829
namespace libbitcoin {
2930
namespace node {
3031

32+
// TODO: establish a place for endpoint types.
33+
using point_set = std::set<system::chain::point>;
34+
using outpoint_set = std::set<system::chain::outpoint>;
35+
3136
class BCN_API protocol_explore
3237
: public node::protocol_html,
3338
protected network::tracker<protocol_explore>
@@ -146,6 +151,25 @@ class BCN_API protocol_explore
146151
const system::hash_cptr& hash) NOEXCEPT;
147152

148153
private:
154+
using balance_handler = std::function<void(code, uint8_t, uint64_t)>;
155+
using address_handler = std::function<void(code, uint8_t, outpoint_set&&)>;
156+
157+
void do_get_address(uint8_t media, const system::hash_cptr& hash,
158+
const address_handler& handler) NOEXCEPT;
159+
void do_get_address_confirmed(uint8_t media, const system::hash_cptr& hash,
160+
const address_handler& handler) NOEXCEPT;
161+
void do_get_address_unconfirmed(uint8_t media,
162+
const system::hash_cptr& hash,
163+
const address_handler& handler) NOEXCEPT;
164+
165+
void complete_get_address(const code& ec, uint8_t media,
166+
const outpoint_set& set) NOEXCEPT;
167+
168+
void do_get_address_balance(uint8_t media, const system::hash_cptr& hash,
169+
const balance_handler& handler) NOEXCEPT;
170+
void complete_get_address_balance(const code& ec, uint8_t media,
171+
const uint64_t balance) NOEXCEPT;
172+
149173
void inject(boost::json::value& out, std::optional<uint32_t> height,
150174
const database::header_link& link) const NOEXCEPT;
151175

src/protocols/protocol_explore.cpp

Lines changed: 144 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <algorithm>
2222
#include <optional>
2323
#include <ranges>
24-
#include <set>
2524
#include <utility>
2625
#include <bitcoin/node/define.hpp>
2726
#include <bitcoin/node/parse/parse.hpp>
@@ -41,9 +40,6 @@ using namespace network::messages::peer;
4140
using namespace std::placeholders;
4241
using namespace boost::json;
4342

44-
using point_set = std::set<chain::point>;
45-
using outpoint_set = std::set<chain::outpoint>;
46-
4743
DEFINE_JSON_TO_TAG(point_set)
4844
{
4945
point_set out{};
@@ -555,8 +551,9 @@ bool protocol_explore::handle_get_tx(const code& ec, interface::tx, uint8_t,
555551
return true;
556552
}
557553

558-
bool protocol_explore::handle_get_tx_header(const code& ec, interface::tx_header,
559-
uint8_t, uint8_t media, const hash_cptr& hash) NOEXCEPT
554+
bool protocol_explore::handle_get_tx_header(const code& ec,
555+
interface::tx_header, uint8_t, uint8_t media,
556+
const hash_cptr& hash) NOEXCEPT
560557
{
561558
if (stopped(ec))
562559
return false;
@@ -941,119 +938,178 @@ bool protocol_explore::handle_get_output_spenders(const code& ec,
941938
return true;
942939
}
943940

941+
// handle_get_address
942+
// ----------------------------------------------------------------------------
943+
944944
bool protocol_explore::handle_get_address(const code& ec, interface::address,
945945
uint8_t, uint8_t media, const hash_cptr& hash) NOEXCEPT
946946
{
947947
if (stopped(ec))
948948
return false;
949949

950-
const auto& query = archive();
951-
if (!query.address_enabled())
950+
if (!archive().address_enabled())
952951
{
953952
send_not_implemented();
954953
return true;
955954
}
956955

957-
// TODO: post queries to thread (both stopping() and this are stranded).
956+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
957+
PARALLEL(do_get_address, media, hash, std::move(complete));
958+
return true;
959+
}
958960

961+
// private
962+
void protocol_explore::do_get_address(uint8_t media, const hash_cptr& hash,
963+
const address_handler& handler) NOEXCEPT
964+
{
965+
// Not stranded, query is threadsafe.
966+
const auto& query = archive();
967+
968+
// TODO: push into database as single call, generalize outpoint_set.
969+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
970+
// TODO: change query to return code to differentiate cancel vs. integrity.
959971
database::output_links outputs{};
960972
if (!query.to_address_outputs(stopping_, outputs, *hash))
961973
{
962-
send_internal_server_error(database::error::integrity);
963-
return true;
974+
handler(network::error::operation_canceled, {}, {});
975+
return;
964976
}
965977

966-
if (outputs.empty())
978+
outpoint_set set{};
979+
for (const auto& output: outputs)
967980
{
968-
send_not_found();
969-
return true;
981+
if (stopping_)
982+
{
983+
handler(network::error::operation_canceled, {}, {});
984+
return;
985+
}
986+
987+
set.insert(query.get_spent(output));
970988
}
989+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
971990

972-
outpoint_set out{};
973-
for (const auto& output: outputs)
974-
out.insert(query.get_spent(output));
991+
handler(network::error::success, media, std::move(set));
992+
}
993+
994+
// This is shared by the tree get_address.. methods.
995+
void protocol_explore::complete_get_address(const code& ec, uint8_t media,
996+
const outpoint_set& set) NOEXCEPT
997+
{
998+
BC_ASSERT(stranded());
975999

976-
const auto size = out.size() * chain::outpoint::serialized_size();
1000+
if (stopped())
1001+
return;
1002+
1003+
if (ec)
1004+
{
1005+
send_internal_server_error(ec);
1006+
return;
1007+
}
1008+
1009+
if (set.empty())
1010+
{
1011+
send_not_found();
1012+
return;
1013+
}
1014+
1015+
const auto size = set.size() * chain::outpoint::serialized_size();
9771016
switch (media)
9781017
{
9791018
case data:
980-
send_chunk(to_bin_array(out, size));
981-
return true;
1019+
send_chunk(to_bin_array(set, size));
1020+
return;
9821021
case text:
983-
send_text(to_hex_array(out, size));
984-
return true;
1022+
send_text(to_hex_array(set, size));
1023+
return;
9851024
case json:
986-
send_json(value_from(out), two * size);
987-
return true;
1025+
send_json(value_from(set), two * size);
1026+
return;
9881027
}
9891028

9901029
send_not_found();
991-
return true;
9921030
}
9931031

1032+
// handle_get_address_confirmed
1033+
// ----------------------------------------------------------------------------
1034+
9941035
bool protocol_explore::handle_get_address_confirmed(const code& ec,
9951036
interface::address_confirmed, uint8_t, uint8_t media,
9961037
const hash_cptr& hash) NOEXCEPT
9971038
{
9981039
if (stopped(ec))
9991040
return false;
10001041

1001-
const auto& query = archive();
1002-
if (!query.address_enabled())
1042+
if (!archive().address_enabled())
10031043
{
10041044
send_not_implemented();
10051045
return true;
10061046
}
10071047

1008-
// TODO: post queries to thread (both stopping() and this are stranded).
1048+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
1049+
PARALLEL(do_get_address_confirmed, media, hash, std::move(complete));
1050+
return true;
1051+
}
1052+
1053+
// private
1054+
void protocol_explore::do_get_address_confirmed(uint8_t media,
1055+
const hash_cptr& hash, const address_handler& handler) NOEXCEPT
1056+
{
1057+
const auto& query = archive();
10091058

1059+
// TODO: push into database as single call, generalize outpoint_set.
1060+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1061+
// TODO: change query to return code to differentiate cancel vs. integrity.
10101062
database::output_links outputs{};
10111063
if (!query.to_confirmed_unspent_outputs(stopping_, outputs, *hash))
10121064
{
1013-
send_internal_server_error(database::error::integrity);
1014-
return true;
1065+
handler(network::error::operation_canceled, {}, {});
1066+
return;
10151067
}
10161068

1017-
if (outputs.empty())
1069+
outpoint_set set{};
1070+
for (const auto& output : outputs)
10181071
{
1019-
send_not_found();
1020-
return true;
1021-
}
1022-
1023-
outpoint_set out{};
1024-
for (const auto& output: outputs)
1025-
out.insert(query.get_spent(output));
1072+
if (stopping_)
1073+
{
1074+
handler(network::error::operation_canceled, {}, {});
1075+
return;
1076+
}
10261077

1027-
const auto size = out.size() * chain::outpoint::serialized_size();
1028-
switch (media)
1029-
{
1030-
case data:
1031-
send_chunk(to_bin_array(out, size));
1032-
return true;
1033-
case text:
1034-
send_text(to_hex_array(out, size));
1035-
return true;
1036-
case json:
1037-
send_json(value_from(out), two * size);
1038-
return true;
1078+
set.insert(query.get_spent(output));
10391079
}
1080+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
10401081

1041-
send_not_found();
1042-
return true;
1082+
handler(network::error::success, media, std::move(set));
10431083
}
10441084

1085+
// handle_get_address_unconfirmed
1086+
// ----------------------------------------------------------------------------
1087+
10451088
bool protocol_explore::handle_get_address_unconfirmed(const code& ec,
1046-
interface::address_unconfirmed, uint8_t, uint8_t, const hash_cptr&) NOEXCEPT
1089+
interface::address_unconfirmed, uint8_t, uint8_t media,
1090+
const hash_cptr& hash) NOEXCEPT
10471091
{
10481092
if (stopped(ec))
10491093
return false;
10501094

10511095
// TODO: there are currently no unconfirmed txs.
1052-
10531096
send_not_implemented();
10541097
return true;
1098+
1099+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
1100+
PARALLEL(do_get_address_unconfirmed, media, hash, std::move(complete));
1101+
return true;
1102+
}
1103+
1104+
void protocol_explore::do_get_address_unconfirmed(uint8_t media,
1105+
const system::hash_cptr&, const address_handler& handler) NOEXCEPT
1106+
{
1107+
handler(network::error::success, media, {});
10551108
}
10561109

1110+
// handle_get_address_balance
1111+
// ----------------------------------------------------------------------------
1112+
10571113
bool protocol_explore::handle_get_address_balance(const code& ec,
10581114
interface::address_balance, uint8_t, uint8_t media,
10591115
const hash_cptr& hash) NOEXCEPT
@@ -1068,30 +1124,58 @@ bool protocol_explore::handle_get_address_balance(const code& ec,
10681124
return true;
10691125
}
10701126

1071-
// TODO: post queries to thread (both stopping() and this are stranded).
1127+
balance_handler complete = BIND(complete_get_address_balance, _1, _2, _3);
1128+
PARALLEL(do_get_address_balance, media, hash, std::move(complete));
1129+
return true;
1130+
}
1131+
1132+
void protocol_explore::do_get_address_balance(uint8_t media,
1133+
const system::hash_cptr& hash, const balance_handler& handler) NOEXCEPT
1134+
{
1135+
const auto& query = archive();
10721136

1137+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1138+
// TODO: change query to return code to differentiate cancel vs. integrity.
10731139
uint64_t balance{};
10741140
if (!query.get_confirmed_balance(stopping_, balance, *hash))
10751141
{
10761142
send_internal_server_error(database::error::integrity);
1077-
return true;
1143+
return;
1144+
}
1145+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1146+
1147+
handler(network::error::success, media, balance);
1148+
}
1149+
1150+
void protocol_explore::complete_get_address_balance(const code& ec,
1151+
uint8_t media, uint64_t balance) NOEXCEPT
1152+
{
1153+
BC_ASSERT(stranded());
1154+
1155+
// This suppresses the error response resulting from cancelation.
1156+
if (stopped())
1157+
return;
1158+
1159+
if (ec)
1160+
{
1161+
send_internal_server_error(ec);
1162+
return;
10781163
}
10791164

10801165
switch (media)
10811166
{
10821167
case data:
10831168
send_chunk(to_little_endian_size(balance));
1084-
return true;
1169+
return;
10851170
case text:
10861171
send_text(encode_base16(to_little_endian_size(balance)));
1087-
return true;
1172+
return;
10881173
case json:
10891174
send_json(balance, two * sizeof(balance));
1090-
return true;
1175+
return;
10911176
}
10921177

10931178
send_not_found();
1094-
return true;
10951179
}
10961180

10971181
// private

0 commit comments

Comments
 (0)