Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/bitcoin/network/channels/channel_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class BCT_API channel_http
dispatcher_.subscribe(std::forward<signature>(handler));
}

// TODO: network.minimum_buffer is being overloaded here.
/// response_buffer_ is initialized to default size, see set_buffer().
/// Uses peer config for timeouts if not specified via other construct.
/// Construct client channel to encapsulate and communicate on the socket.
inline channel_http(const logger& log, const socket::ptr& socket,
uint64_t identifier, const settings_t& settings,
Expand All @@ -68,25 +68,26 @@ class BCT_API channel_http
/// Resume reading from the socket (requires strand).
void resume() NOEXCEPT override;

/// http is half-duplex, so reads must wait until send is completed.
/// Must be called (only once) from protocol message handler (if no stop).
virtual void read_request() NOEXCEPT;

/// Serialize and write response to peer (requires strand).
/// Serialize and write http response to peer (requires strand).
/// Completion handler is always invoked on the channel strand.
void send(http::response&& response, result_handler&& handler) NOEXCEPT;

protected:
/// Stranded handler invoked from stop().
void stopping(const code& ec) NOEXCEPT override;

/// Read request buffer (not thread safe).
virtual http::flat_buffer& request_buffer() NOEXCEPT;

/// Initial http read handler (stranded).
virtual void handle_read_request(const code& ec, size_t bytes,
const http::request_cptr& request) NOEXCEPT;

private:
void do_stop(const code& ec) NOEXCEPT;

void set_buffer(http::response& response) NOEXCEPT;
void handle_send(const code& ec, size_t bytes, http::response_ptr&,
const result_handler& handler) NOEXCEPT;
Expand Down
62 changes: 12 additions & 50 deletions include/bitcoin/network/channels/channel_ws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,78 +29,40 @@
namespace libbitcoin {
namespace network {

/// Websocket tcp/ip channel, uses channel_http for upgrade/multiplex.
/// Abstract base websocket tcp/ip channel, on base http channel.
class BCT_API channel_ws
: public channel_http, protected tracker<channel_ws>
{
public:
typedef std::shared_ptr<channel_ws> ptr;
using options_t = settings_t::websocket_server;

/// Subscribe to messages post-upgrade (requires strand).
/// Event handler is always invoked on the channel strand.
template <class Message>
inline void subscribe(auto&& ) NOEXCEPT
{
BC_ASSERT(stranded());
////using message_handler = distributor_ws::handler<Message>;
////ws_distributor_.subscribe(std::forward<message_handler>(handler));
}

/// Serialize and write websocket message to peer (requires strand).
/// Completion handler is always invoked on the channel strand.
inline void send(system::data_chunk&& message, bool binary,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
BC_ASSERT(upgraded_);
using namespace std::placeholders;

// TODO: Serialize message.
const auto ptr = system::move_shared(std::move(message));
count_handler complete = std::bind(&channel_ws::handle_send,
shared_from_base<channel_ws>(), _1, _2, ptr,
std::move(handler));

if (!ptr)
{
complete(error::bad_alloc, {});
return;
}

// TODO: serialize message to send.
// TODO: websocket is full duplex, so writes must be queued.
ws_write(asio::const_buffer{ ptr->data(), ptr->size() },
binary, std::move(complete));
}

protected:
inline channel_ws(const logger& log, const socket::ptr& socket,
uint64_t identifier, const settings_t& settings,
const options_t& options) NOEXCEPT
: channel_http(log, socket, identifier, settings, options),
////distributor_(socket->strand()),
tracker<channel_ws>(log)
{
}

/// Half-duplex http until upgraded to full-duplex websockets.
/// Reads are never buffered, restart the reader.
void read_request() NOEXCEPT override;

protected:
/// Pre-upgrade http read.
void handle_read_request(const code& ec, size_t bytes,
const http::request_cptr& request) NOEXCEPT override;

/// Post-upgrade websocket read.
virtual void handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT;

private:
inline void handle_send(const code& ec, size_t, const system::chunk_ptr&,
const result_handler& handler) NOEXCEPT
{
if (ec) stop(ec);
handler(ec);
}
/// Dispatch websocket buffer via derived handlers (override to handle).
/// Override to handle dispatch, must invoke read_request() on complete.
virtual void dispatch_websocket(const http::flat_buffer& buffer,
size_t bytes) NOEXCEPT;

// These are protected by strand.
////distributor_rest distributor_;
private:
// This is protected by strand.
bool upgraded_{ false };
};

Expand Down
29 changes: 15 additions & 14 deletions src/channels/channel_ws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ void channel_ws::read_request() NOEXCEPT
// upgraded
// ----------------------------------------------------------------------------

void channel_ws::handle_read_websocket(const code& ec,
size_t) NOEXCEPT
void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT
{
BC_ASSERT(stranded());

Expand All @@ -78,8 +77,18 @@ void channel_ws::handle_read_websocket(const code& ec,
return;
}

// TODO: deserialize message from request_buffer and dispatch.
////distributor_.notify(message);
dispatch_websocket(request_buffer(), bytes);
request_buffer().consume(bytes);
}

void channel_ws::dispatch_websocket(const http::flat_buffer&,
size_t LOG_ONLY(bytes)) NOEXCEPT
{
LOGA("Websocket read of " << bytes << " bytes unhandled ["
<< authority() << "]");

// Restart reader.
read_request();
}

// pre-upgrade
Expand All @@ -92,7 +101,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes,

if (upgraded_)
{
LOGA("Websocket is in upgraded state [" << authority() << "]");
LOGF("Http request in websocket state [" << authority() << "]");
stop(network::error::operation_failed);
return;
}
Expand All @@ -105,15 +114,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes,

upgraded_ = true;
LOGA("Websocket upgraded [" << authority() << "]");

const std::string welcome{ "Websocket libbitcoin/4.0" };
send(to_chunk(welcome), false, [this](const code& ec) NOEXCEPT
{
// handle_send alread stops channel on ec.
// One and only one handler of message must restart read loop.
// In half duplex this happens only after send (ws full duplex).
if (!ec) read_request();
});
read_request();
}

BC_POP_WARNING()
Expand Down
Loading