diff --git a/include/bitcoin/network/channels/channel_http.hpp b/include/bitcoin/network/channels/channel_http.hpp index 2e75ed2f1..a9fda2cd6 100644 --- a/include/bitcoin/network/channels/channel_http.hpp +++ b/include/bitcoin/network/channels/channel_http.hpp @@ -52,8 +52,8 @@ class BCT_API channel_http dispatcher_.subscribe(std::forward(handler)); } + // TODO: network.minimum_buffer is being overloaded here. /// response_buffer_ is initialized to default size, see set_buffer(). - /// Uses peer config for timeouts if not specified via other construct. /// Construct client channel to encapsulate and communicate on the socket. inline channel_http(const logger& log, const socket::ptr& socket, uint64_t identifier, const settings_t& settings, @@ -68,11 +68,10 @@ class BCT_API channel_http /// Resume reading from the socket (requires strand). void resume() NOEXCEPT override; - /// http is half-duplex, so reads must wait until send is completed. /// Must be called (only once) from protocol message handler (if no stop). virtual void read_request() NOEXCEPT; - /// Serialize and write response to peer (requires strand). + /// Serialize and write http response to peer (requires strand). /// Completion handler is always invoked on the channel strand. void send(http::response&& response, result_handler&& handler) NOEXCEPT; @@ -80,13 +79,15 @@ class BCT_API channel_http /// Stranded handler invoked from stop(). void stopping(const code& ec) NOEXCEPT override; + /// Read request buffer (not thread safe). virtual http::flat_buffer& request_buffer() NOEXCEPT; + + /// Initial http read handler (stranded). virtual void handle_read_request(const code& ec, size_t bytes, const http::request_cptr& request) NOEXCEPT; private: void do_stop(const code& ec) NOEXCEPT; - void set_buffer(http::response& response) NOEXCEPT; void handle_send(const code& ec, size_t bytes, http::response_ptr&, const result_handler& handler) NOEXCEPT; diff --git a/include/bitcoin/network/channels/channel_ws.hpp b/include/bitcoin/network/channels/channel_ws.hpp index 10e4475d4..31237d15b 100644 --- a/include/bitcoin/network/channels/channel_ws.hpp +++ b/include/bitcoin/network/channels/channel_ws.hpp @@ -29,7 +29,7 @@ namespace libbitcoin { namespace network { -/// Websocket tcp/ip channel, uses channel_http for upgrade/multiplex. +/// Abstract base websocket tcp/ip channel, on base http channel. class BCT_API channel_ws : public channel_http, protected tracker { @@ -37,70 +37,32 @@ class BCT_API channel_ws typedef std::shared_ptr ptr; using options_t = settings_t::websocket_server; - /// Subscribe to messages post-upgrade (requires strand). - /// Event handler is always invoked on the channel strand. - template - inline void subscribe(auto&& ) NOEXCEPT - { - BC_ASSERT(stranded()); - ////using message_handler = distributor_ws::handler; - ////ws_distributor_.subscribe(std::forward(handler)); - } - - /// Serialize and write websocket message to peer (requires strand). - /// Completion handler is always invoked on the channel strand. - inline void send(system::data_chunk&& message, bool binary, - result_handler&& handler) NOEXCEPT - { - BC_ASSERT(stranded()); - BC_ASSERT(upgraded_); - using namespace std::placeholders; - - // TODO: Serialize message. - const auto ptr = system::move_shared(std::move(message)); - count_handler complete = std::bind(&channel_ws::handle_send, - shared_from_base(), _1, _2, ptr, - std::move(handler)); - - if (!ptr) - { - complete(error::bad_alloc, {}); - return; - } - - // TODO: serialize message to send. - // TODO: websocket is full duplex, so writes must be queued. - ws_write(asio::const_buffer{ ptr->data(), ptr->size() }, - binary, std::move(complete)); - } - +protected: inline channel_ws(const logger& log, const socket::ptr& socket, uint64_t identifier, const settings_t& settings, const options_t& options) NOEXCEPT : channel_http(log, socket, identifier, settings, options), - ////distributor_(socket->strand()), tracker(log) { } - /// Half-duplex http until upgraded to full-duplex websockets. + /// Reads are never buffered, restart the reader. void read_request() NOEXCEPT override; -protected: + /// Pre-upgrade http read. void handle_read_request(const code& ec, size_t bytes, const http::request_cptr& request) NOEXCEPT override; + + /// Post-upgrade websocket read. virtual void handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT; -private: - inline void handle_send(const code& ec, size_t, const system::chunk_ptr&, - const result_handler& handler) NOEXCEPT - { - if (ec) stop(ec); - handler(ec); - } + /// Dispatch websocket buffer via derived handlers (override to handle). + /// Override to handle dispatch, must invoke read_request() on complete. + virtual void dispatch_websocket(const http::flat_buffer& buffer, + size_t bytes) NOEXCEPT; - // These are protected by strand. - ////distributor_rest distributor_; +private: + // This is protected by strand. bool upgraded_{ false }; }; diff --git a/src/channels/channel_ws.cpp b/src/channels/channel_ws.cpp index bbedc7fcf..649a3cbae 100644 --- a/src/channels/channel_ws.cpp +++ b/src/channels/channel_ws.cpp @@ -54,8 +54,7 @@ void channel_ws::read_request() NOEXCEPT // upgraded // ---------------------------------------------------------------------------- -void channel_ws::handle_read_websocket(const code& ec, - size_t) NOEXCEPT +void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT { BC_ASSERT(stranded()); @@ -78,8 +77,18 @@ void channel_ws::handle_read_websocket(const code& ec, return; } - // TODO: deserialize message from request_buffer and dispatch. - ////distributor_.notify(message); + dispatch_websocket(request_buffer(), bytes); + request_buffer().consume(bytes); +} + +void channel_ws::dispatch_websocket(const http::flat_buffer&, + size_t LOG_ONLY(bytes)) NOEXCEPT +{ + LOGA("Websocket read of " << bytes << " bytes unhandled [" + << authority() << "]"); + + // Restart reader. + read_request(); } // pre-upgrade @@ -92,7 +101,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes, if (upgraded_) { - LOGA("Websocket is in upgraded state [" << authority() << "]"); + LOGF("Http request in websocket state [" << authority() << "]"); stop(network::error::operation_failed); return; } @@ -105,15 +114,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes, upgraded_ = true; LOGA("Websocket upgraded [" << authority() << "]"); - - const std::string welcome{ "Websocket libbitcoin/4.0" }; - send(to_chunk(welcome), false, [this](const code& ec) NOEXCEPT - { - // handle_send alread stops channel on ec. - // One and only one handler of message must restart read loop. - // In half duplex this happens only after send (ws full duplex). - if (!ec) read_request(); - }); + read_request(); } BC_POP_WARNING()