Skip to content

Commit 0fc76a7

Browse files
committed
Make address queries cancellable.
1 parent ad2dc36 commit 0fc76a7

File tree

2 files changed

+170
-58
lines changed

2 files changed

+170
-58
lines changed

include/bitcoin/node/protocols/protocol_explore.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
#include <atomic>
2323
#include <memory>
2424
#include <optional>
25+
#include <set>
2526
#include <bitcoin/node/define.hpp>
2627
#include <bitcoin/node/protocols/protocol_html.hpp>
2728

2829
namespace libbitcoin {
2930
namespace node {
3031

32+
// TODO: establish a place for endpoint types.
33+
using point_set = std::set<system::chain::point>;
34+
using outpoint_set = std::set<system::chain::outpoint>;
35+
3136
class BCN_API protocol_explore
3237
: public node::protocol_html,
3338
protected network::tracker<protocol_explore>
@@ -146,6 +151,26 @@ class BCN_API protocol_explore
146151
const system::hash_cptr& hash) NOEXCEPT;
147152

148153
private:
154+
using outpoints_cptr = std::shared_ptr<outpoint_set>;
155+
using balance_handler = std::function<void(code, uint8_t, uint64_t)>;
156+
using address_handler = std::function<void(code, uint8_t, outpoints_cptr)>;
157+
158+
void do_get_address(uint8_t media, const system::hash_cptr& hash,
159+
const address_handler& handler) NOEXCEPT;
160+
void do_get_address_confirmed(uint8_t media, const system::hash_cptr& hash,
161+
const address_handler& handler) NOEXCEPT;
162+
void do_get_address_unconfirmed(uint8_t media,
163+
const system::hash_cptr& hash,
164+
const address_handler& handler) NOEXCEPT;
165+
166+
void complete_get_address(const code& ec, uint8_t media,
167+
const outpoints_cptr& set) NOEXCEPT;
168+
169+
void do_get_address_balance(uint8_t media, const system::hash_cptr& hash,
170+
const balance_handler& handler) NOEXCEPT;
171+
void complete_get_address_balance(const code& ec, uint8_t media,
172+
const uint64_t balance) NOEXCEPT;
173+
149174
void inject(boost::json::value& out, std::optional<uint32_t> height,
150175
const database::header_link& link) const NOEXCEPT;
151176

src/protocols/protocol_explore.cpp

Lines changed: 145 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <algorithm>
2222
#include <optional>
2323
#include <ranges>
24-
#include <set>
2524
#include <utility>
2625
#include <bitcoin/node/define.hpp>
2726
#include <bitcoin/node/parse/parse.hpp>
@@ -41,9 +40,6 @@ using namespace network::messages::peer;
4140
using namespace std::placeholders;
4241
using namespace boost::json;
4342

44-
using point_set = std::set<chain::point>;
45-
using outpoint_set = std::set<chain::outpoint>;
46-
4743
DEFINE_JSON_TO_TAG(point_set)
4844
{
4945
point_set out{};
@@ -941,119 +937,180 @@ bool protocol_explore::handle_get_output_spenders(const code& ec,
941937
return true;
942938
}
943939

940+
// handle_get_address
941+
// ----------------------------------------------------------------------------
942+
944943
bool protocol_explore::handle_get_address(const code& ec, interface::address,
945944
uint8_t, uint8_t media, const hash_cptr& hash) NOEXCEPT
946945
{
947946
if (stopped(ec))
948947
return false;
949948

950-
const auto& query = archive();
951-
if (!query.address_enabled())
949+
if (!archive().address_enabled())
952950
{
953951
send_not_implemented();
954952
return true;
955953
}
956954

957-
// TODO: post queries to thread (both stopping() and this are stranded).
955+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
956+
PARALLEL(do_get_address, media, hash, std::move(complete));
957+
return true;
958+
}
959+
960+
// private
961+
void protocol_explore::do_get_address(uint8_t media, const hash_cptr& hash,
962+
const address_handler& handler) NOEXCEPT
963+
{
964+
// Not stranded, query is threadsafe.
965+
const auto& query = archive();
958966

967+
// TODO: push into database as single call, generalize outpoint_set.
968+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
969+
// TODO: change query to return code to differentiate cancel vs. integrity.
959970
database::output_links outputs{};
960971
if (!query.to_address_outputs(stopping_, outputs, *hash))
961972
{
962-
send_internal_server_error(database::error::integrity);
963-
return true;
973+
handler(network::error::operation_canceled, {}, {});
974+
return;
964975
}
965976

966-
if (outputs.empty())
977+
const auto set = to_shared<outpoint_set>();
978+
for (const auto& output: outputs)
967979
{
968-
send_not_found();
969-
return true;
980+
if (stopping_)
981+
{
982+
handler(network::error::operation_canceled, {}, {});
983+
return;
984+
}
985+
986+
set->insert(query.get_spent(output));
970987
}
988+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
971989

972-
outpoint_set out{};
973-
for (const auto& output: outputs)
974-
out.insert(query.get_spent(output));
990+
handler(network::error::success, media, set);
991+
}
992+
993+
// This is shared by the tree get_address.. methods.
994+
void protocol_explore::complete_get_address(const code& ec, uint8_t media,
995+
const outpoints_cptr& set) NOEXCEPT
996+
{
997+
BC_ASSERT(stranded());
998+
999+
// This suppresses the error response resulting from cancelation.
1000+
if (stopped())
1001+
return;
9751002

976-
const auto size = out.size() * chain::outpoint::serialized_size();
1003+
if (ec)
1004+
{
1005+
send_internal_server_error(ec);
1006+
return;
1007+
}
1008+
1009+
if (set->empty())
1010+
{
1011+
send_not_found();
1012+
return;
1013+
}
1014+
1015+
const auto size = set->size() * chain::outpoint::serialized_size();
9771016
switch (media)
9781017
{
9791018
case data:
980-
send_chunk(to_bin_array(out, size));
981-
return true;
1019+
send_chunk(to_bin_array(*set, size));
1020+
return;
9821021
case text:
983-
send_text(to_hex_array(out, size));
984-
return true;
1022+
send_text(to_hex_array(*set, size));
1023+
return;
9851024
case json:
986-
send_json(value_from(out), two * size);
987-
return true;
1025+
send_json(value_from(*set), two * size);
1026+
return;
9881027
}
9891028

9901029
send_not_found();
991-
return true;
9921030
}
9931031

1032+
// handle_get_address_confirmed
1033+
// ----------------------------------------------------------------------------
1034+
9941035
bool protocol_explore::handle_get_address_confirmed(const code& ec,
9951036
interface::address_confirmed, uint8_t, uint8_t media,
9961037
const hash_cptr& hash) NOEXCEPT
9971038
{
9981039
if (stopped(ec))
9991040
return false;
10001041

1001-
const auto& query = archive();
1002-
if (!query.address_enabled())
1042+
if (!archive().address_enabled())
10031043
{
10041044
send_not_implemented();
10051045
return true;
10061046
}
10071047

1008-
// TODO: post queries to thread (both stopping() and this are stranded).
1048+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
1049+
PARALLEL(do_get_address_confirmed, media, hash, std::move(complete));
1050+
return true;
1051+
}
1052+
1053+
// private
1054+
void protocol_explore::do_get_address_confirmed(uint8_t media,
1055+
const hash_cptr& hash, const address_handler& handler) NOEXCEPT
1056+
{
1057+
// Not stranded, query is threadsafe.
1058+
const auto& query = archive();
10091059

1060+
// TODO: push into database as single call, generalize outpoint_set.
1061+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1062+
// TODO: change query to return code to differentiate cancel vs. integrity.
10101063
database::output_links outputs{};
10111064
if (!query.to_confirmed_unspent_outputs(stopping_, outputs, *hash))
10121065
{
1013-
send_internal_server_error(database::error::integrity);
1014-
return true;
1066+
handler(network::error::operation_canceled, {}, {});
1067+
return;
10151068
}
10161069

1017-
if (outputs.empty())
1070+
const auto set = to_shared<outpoint_set>();
1071+
for (const auto& output : outputs)
10181072
{
1019-
send_not_found();
1020-
return true;
1021-
}
1022-
1023-
outpoint_set out{};
1024-
for (const auto& output: outputs)
1025-
out.insert(query.get_spent(output));
1073+
if (stopping_)
1074+
{
1075+
handler(network::error::operation_canceled, {}, {});
1076+
return;
1077+
}
10261078

1027-
const auto size = out.size() * chain::outpoint::serialized_size();
1028-
switch (media)
1029-
{
1030-
case data:
1031-
send_chunk(to_bin_array(out, size));
1032-
return true;
1033-
case text:
1034-
send_text(to_hex_array(out, size));
1035-
return true;
1036-
case json:
1037-
send_json(value_from(out), two * size);
1038-
return true;
1079+
set->insert(query.get_spent(output));
10391080
}
1081+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
10401082

1041-
send_not_found();
1042-
return true;
1083+
handler(network::error::success, media, set);
10431084
}
10441085

1086+
// handle_get_address_unconfirmed
1087+
// ----------------------------------------------------------------------------
1088+
10451089
bool protocol_explore::handle_get_address_unconfirmed(const code& ec,
1046-
interface::address_unconfirmed, uint8_t, uint8_t, const hash_cptr&) NOEXCEPT
1090+
interface::address_unconfirmed, uint8_t, uint8_t media,
1091+
const hash_cptr& hash) NOEXCEPT
10471092
{
10481093
if (stopped(ec))
10491094
return false;
10501095

10511096
// TODO: there are currently no unconfirmed txs.
1052-
10531097
send_not_implemented();
10541098
return true;
1099+
1100+
address_handler complete = BIND(complete_get_address, _1, _2, _3);
1101+
PARALLEL(do_get_address_unconfirmed, media, hash, std::move(complete));
1102+
return true;
10551103
}
10561104

1105+
void protocol_explore::do_get_address_unconfirmed(uint8_t media,
1106+
const system::hash_cptr&, const address_handler& handler) NOEXCEPT
1107+
{
1108+
handler(network::error::success, media, {});
1109+
}
1110+
1111+
// handle_get_address_balance
1112+
// ----------------------------------------------------------------------------
1113+
10571114
bool protocol_explore::handle_get_address_balance(const code& ec,
10581115
interface::address_balance, uint8_t, uint8_t media,
10591116
const hash_cptr& hash) NOEXCEPT
@@ -1068,30 +1125,60 @@ bool protocol_explore::handle_get_address_balance(const code& ec,
10681125
return true;
10691126
}
10701127

1071-
// TODO: post queries to thread (both stopping() and this are stranded).
1128+
balance_handler complete = BIND(complete_get_address_balance, _1, _2, _3);
1129+
PARALLEL(do_get_address_balance, media, hash, std::move(complete));
1130+
return true;
1131+
}
1132+
1133+
void protocol_explore::do_get_address_balance(uint8_t media,
1134+
const system::hash_cptr& hash, const balance_handler& handler) NOEXCEPT
1135+
{
1136+
// Not stranded, query is threadsafe.
1137+
const auto& query = archive();
10721138

1139+
// TODO: push into database as single call, generalize outpoint_set.
1140+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1141+
// TODO: change query to return code to differentiate cancel vs. integrity.
10731142
uint64_t balance{};
10741143
if (!query.get_confirmed_balance(stopping_, balance, *hash))
10751144
{
10761145
send_internal_server_error(database::error::integrity);
1077-
return true;
1146+
return;
1147+
}
1148+
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1149+
1150+
handler(network::error::success, media, balance);
1151+
}
1152+
1153+
void protocol_explore::complete_get_address_balance(const code& ec,
1154+
uint8_t media, uint64_t balance) NOEXCEPT
1155+
{
1156+
BC_ASSERT(stranded());
1157+
1158+
// This suppresses the error response resulting from cancelation.
1159+
if (stopped())
1160+
return;
1161+
1162+
if (ec)
1163+
{
1164+
send_internal_server_error(ec);
1165+
return;
10781166
}
10791167

10801168
switch (media)
10811169
{
10821170
case data:
10831171
send_chunk(to_little_endian_size(balance));
1084-
return true;
1172+
return;
10851173
case text:
10861174
send_text(encode_base16(to_little_endian_size(balance)));
1087-
return true;
1175+
return;
10881176
case json:
10891177
send_json(balance, two * sizeof(balance));
1090-
return true;
1178+
return;
10911179
}
10921180

10931181
send_not_found();
1094-
return true;
10951182
}
10961183

10971184
// private

0 commit comments

Comments
 (0)