Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions include/bitcoin/node/chasers/chaser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ class BCN_API chaser
#define SUBSCRIBE_EVENTS(method, ...) \
subscribe_events(BIND(method, __VA_ARGS__))

#define PARALLEL(method, ...) \
boost::asio::post(threadpool_.service(), BIND(method, __VA_ARGS__));

} // namespace node
} // namespace libbitcoin

Expand Down
8 changes: 8 additions & 0 deletions include/bitcoin/node/chasers/chaser_validate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class BCN_API chaser_validate
void stop() NOEXCEPT override;

protected:
/// Post a method in base or derived class in parallel (use PARALLEL).
template <class Derived, typename Method, typename... Args>
inline auto parallel(Method&& method, Args&&... args) NOEXCEPT
{
return boost::asio::post(threadpool_.service(),
BIND_THIS(method, args));
}

typedef network::race_unity<const code&, const database::tx_link&> race;

virtual bool handle_event(const code& ec, chase event_,
Expand Down
24 changes: 24 additions & 0 deletions include/bitcoin/node/protocols/protocol_explore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
#include <atomic>
#include <memory>
#include <optional>
#include <set>
#include <bitcoin/node/define.hpp>
#include <bitcoin/node/protocols/protocol_html.hpp>

namespace libbitcoin {
namespace node {

// TODO: establish a place for endpoint types.
using point_set = std::set<system::chain::point>;
using outpoint_set = std::set<system::chain::outpoint>;

class BCN_API protocol_explore
: public node::protocol_html,
protected network::tracker<protocol_explore>
Expand Down Expand Up @@ -146,6 +151,25 @@ class BCN_API protocol_explore
const system::hash_cptr& hash) NOEXCEPT;

private:
using balance_handler = std::function<void(code, uint8_t, uint64_t)>;
using address_handler = std::function<void(code, uint8_t, outpoint_set&&)>;

void do_get_address(uint8_t media, const system::hash_cptr& hash,
const address_handler& handler) NOEXCEPT;
void do_get_address_confirmed(uint8_t media, const system::hash_cptr& hash,
const address_handler& handler) NOEXCEPT;
void do_get_address_unconfirmed(uint8_t media,
const system::hash_cptr& hash,
const address_handler& handler) NOEXCEPT;

void complete_get_address(const code& ec, uint8_t media,
const outpoint_set& set) NOEXCEPT;

void do_get_address_balance(uint8_t media, const system::hash_cptr& hash,
const balance_handler& handler) NOEXCEPT;
void complete_get_address_balance(const code& ec, uint8_t media,
const uint64_t balance) NOEXCEPT;

void inject(boost::json::value& out, std::optional<uint32_t> height,
const database::header_link& link) const NOEXCEPT;

Expand Down
204 changes: 144 additions & 60 deletions src/protocols/protocol_explore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <algorithm>
#include <optional>
#include <ranges>
#include <set>
#include <utility>
#include <bitcoin/node/define.hpp>
#include <bitcoin/node/parse/parse.hpp>
Expand All @@ -41,9 +40,6 @@ using namespace network::messages::peer;
using namespace std::placeholders;
using namespace boost::json;

using point_set = std::set<chain::point>;
using outpoint_set = std::set<chain::outpoint>;

DEFINE_JSON_TO_TAG(point_set)
{
point_set out{};
Expand Down Expand Up @@ -555,8 +551,9 @@ bool protocol_explore::handle_get_tx(const code& ec, interface::tx, uint8_t,
return true;
}

bool protocol_explore::handle_get_tx_header(const code& ec, interface::tx_header,
uint8_t, uint8_t media, const hash_cptr& hash) NOEXCEPT
bool protocol_explore::handle_get_tx_header(const code& ec,
interface::tx_header, uint8_t, uint8_t media,
const hash_cptr& hash) NOEXCEPT
{
if (stopped(ec))
return false;
Expand Down Expand Up @@ -941,119 +938,178 @@ bool protocol_explore::handle_get_output_spenders(const code& ec,
return true;
}

// handle_get_address
// ----------------------------------------------------------------------------

bool protocol_explore::handle_get_address(const code& ec, interface::address,
uint8_t, uint8_t media, const hash_cptr& hash) NOEXCEPT
{
if (stopped(ec))
return false;

const auto& query = archive();
if (!query.address_enabled())
if (!archive().address_enabled())
{
send_not_implemented();
return true;
}

// TODO: post queries to thread (both stopping() and this are stranded).
address_handler complete = BIND(complete_get_address, _1, _2, _3);
PARALLEL(do_get_address, media, hash, std::move(complete));
return true;
}

// private
void protocol_explore::do_get_address(uint8_t media, const hash_cptr& hash,
const address_handler& handler) NOEXCEPT
{
// Not stranded, query is threadsafe.
const auto& query = archive();

// TODO: push into database as single call, generalize outpoint_set.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TODO: change query to return code to differentiate cancel vs. integrity.
database::output_links outputs{};
if (!query.to_address_outputs(stopping_, outputs, *hash))
{
send_internal_server_error(database::error::integrity);
return true;
handler(network::error::operation_canceled, {}, {});
return;
}

if (outputs.empty())
outpoint_set set{};
for (const auto& output: outputs)
{
send_not_found();
return true;
if (stopping_)
{
handler(network::error::operation_canceled, {}, {});
return;
}

set.insert(query.get_spent(output));
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

outpoint_set out{};
for (const auto& output: outputs)
out.insert(query.get_spent(output));
handler(network::error::success, media, std::move(set));
}

// This is shared by the tree get_address.. methods.
void protocol_explore::complete_get_address(const code& ec, uint8_t media,
const outpoint_set& set) NOEXCEPT
{
BC_ASSERT(stranded());

const auto size = out.size() * chain::outpoint::serialized_size();
if (stopped())
return;

if (ec)
{
send_internal_server_error(ec);
return;
}

if (set.empty())
{
send_not_found();
return;
}

const auto size = set.size() * chain::outpoint::serialized_size();
switch (media)
{
case data:
send_chunk(to_bin_array(out, size));
return true;
send_chunk(to_bin_array(set, size));
return;
case text:
send_text(to_hex_array(out, size));
return true;
send_text(to_hex_array(set, size));
return;
case json:
send_json(value_from(out), two * size);
return true;
send_json(value_from(set), two * size);
return;
}

send_not_found();
return true;
}

// handle_get_address_confirmed
// ----------------------------------------------------------------------------

bool protocol_explore::handle_get_address_confirmed(const code& ec,
interface::address_confirmed, uint8_t, uint8_t media,
const hash_cptr& hash) NOEXCEPT
{
if (stopped(ec))
return false;

const auto& query = archive();
if (!query.address_enabled())
if (!archive().address_enabled())
{
send_not_implemented();
return true;
}

// TODO: post queries to thread (both stopping() and this are stranded).
address_handler complete = BIND(complete_get_address, _1, _2, _3);
PARALLEL(do_get_address_confirmed, media, hash, std::move(complete));
return true;
}

// private
void protocol_explore::do_get_address_confirmed(uint8_t media,
const hash_cptr& hash, const address_handler& handler) NOEXCEPT
{
const auto& query = archive();

// TODO: push into database as single call, generalize outpoint_set.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TODO: change query to return code to differentiate cancel vs. integrity.
database::output_links outputs{};
if (!query.to_confirmed_unspent_outputs(stopping_, outputs, *hash))
{
send_internal_server_error(database::error::integrity);
return true;
handler(network::error::operation_canceled, {}, {});
return;
}

if (outputs.empty())
outpoint_set set{};
for (const auto& output : outputs)
{
send_not_found();
return true;
}

outpoint_set out{};
for (const auto& output: outputs)
out.insert(query.get_spent(output));
if (stopping_)
{
handler(network::error::operation_canceled, {}, {});
return;
}

const auto size = out.size() * chain::outpoint::serialized_size();
switch (media)
{
case data:
send_chunk(to_bin_array(out, size));
return true;
case text:
send_text(to_hex_array(out, size));
return true;
case json:
send_json(value_from(out), two * size);
return true;
set.insert(query.get_spent(output));
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

send_not_found();
return true;
handler(network::error::success, media, std::move(set));
}

// handle_get_address_unconfirmed
// ----------------------------------------------------------------------------

bool protocol_explore::handle_get_address_unconfirmed(const code& ec,
interface::address_unconfirmed, uint8_t, uint8_t, const hash_cptr&) NOEXCEPT
interface::address_unconfirmed, uint8_t, uint8_t media,
const hash_cptr& hash) NOEXCEPT
{
if (stopped(ec))
return false;

// TODO: there are currently no unconfirmed txs.

send_not_implemented();
return true;

address_handler complete = BIND(complete_get_address, _1, _2, _3);
PARALLEL(do_get_address_unconfirmed, media, hash, std::move(complete));
return true;
}

void protocol_explore::do_get_address_unconfirmed(uint8_t media,
const system::hash_cptr&, const address_handler& handler) NOEXCEPT
{
handler(network::error::success, media, {});
}

// handle_get_address_balance
// ----------------------------------------------------------------------------

bool protocol_explore::handle_get_address_balance(const code& ec,
interface::address_balance, uint8_t, uint8_t media,
const hash_cptr& hash) NOEXCEPT
Expand All @@ -1068,30 +1124,58 @@ bool protocol_explore::handle_get_address_balance(const code& ec,
return true;
}

// TODO: post queries to thread (both stopping() and this are stranded).
balance_handler complete = BIND(complete_get_address_balance, _1, _2, _3);
PARALLEL(do_get_address_balance, media, hash, std::move(complete));
return true;
}

void protocol_explore::do_get_address_balance(uint8_t media,
const system::hash_cptr& hash, const balance_handler& handler) NOEXCEPT
{
const auto& query = archive();

// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TODO: change query to return code to differentiate cancel vs. integrity.
uint64_t balance{};
if (!query.get_confirmed_balance(stopping_, balance, *hash))
{
send_internal_server_error(database::error::integrity);
return true;
return;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

handler(network::error::success, media, balance);
}

void protocol_explore::complete_get_address_balance(const code& ec,
uint8_t media, uint64_t balance) NOEXCEPT
{
BC_ASSERT(stranded());

// This suppresses the error response resulting from cancelation.
if (stopped())
return;

if (ec)
{
send_internal_server_error(ec);
return;
}

switch (media)
{
case data:
send_chunk(to_little_endian_size(balance));
return true;
return;
case text:
send_text(encode_base16(to_little_endian_size(balance)));
return true;
return;
case json:
send_json(balance, two * sizeof(balance));
return true;
return;
}

send_not_found();
return true;
}

// private
Expand Down
Loading