Skip to content

Commit 8cbc3e3

Browse files
authored
Merge pull request #659 from evoskuil/master
Change channel_ws to base (move concrete to node).
2 parents 79c0aa3 + 6bbd778 commit 8cbc3e3

File tree

3 files changed

+32
-68
lines changed

3 files changed

+32
-68
lines changed

include/bitcoin/network/channels/channel_http.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ class BCT_API channel_http
5252
dispatcher_.subscribe(std::forward<signature>(handler));
5353
}
5454

55+
// TODO: network.minimum_buffer is being overloaded here.
5556
/// response_buffer_ is initialized to default size, see set_buffer().
56-
/// Uses peer config for timeouts if not specified via other construct.
5757
/// Construct client channel to encapsulate and communicate on the socket.
5858
inline channel_http(const logger& log, const socket::ptr& socket,
5959
uint64_t identifier, const settings_t& settings,
@@ -68,25 +68,26 @@ class BCT_API channel_http
6868
/// Resume reading from the socket (requires strand).
6969
void resume() NOEXCEPT override;
7070

71-
/// http is half-duplex, so reads must wait until send is completed.
7271
/// Must be called (only once) from protocol message handler (if no stop).
7372
virtual void read_request() NOEXCEPT;
7473

75-
/// Serialize and write response to peer (requires strand).
74+
/// Serialize and write http response to peer (requires strand).
7675
/// Completion handler is always invoked on the channel strand.
7776
void send(http::response&& response, result_handler&& handler) NOEXCEPT;
7877

7978
protected:
8079
/// Stranded handler invoked from stop().
8180
void stopping(const code& ec) NOEXCEPT override;
8281

82+
/// Read request buffer (not thread safe).
8383
virtual http::flat_buffer& request_buffer() NOEXCEPT;
84+
85+
/// Initial http read handler (stranded).
8486
virtual void handle_read_request(const code& ec, size_t bytes,
8587
const http::request_cptr& request) NOEXCEPT;
8688

8789
private:
8890
void do_stop(const code& ec) NOEXCEPT;
89-
9091
void set_buffer(http::response& response) NOEXCEPT;
9192
void handle_send(const code& ec, size_t bytes, http::response_ptr&,
9293
const result_handler& handler) NOEXCEPT;

include/bitcoin/network/channels/channel_ws.hpp

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,78 +29,40 @@
2929
namespace libbitcoin {
3030
namespace network {
3131

32-
/// Websocket tcp/ip channel, uses channel_http for upgrade/multiplex.
32+
/// Abstract base websocket tcp/ip channel, on base http channel.
3333
class BCT_API channel_ws
3434
: public channel_http, protected tracker<channel_ws>
3535
{
3636
public:
3737
typedef std::shared_ptr<channel_ws> ptr;
3838
using options_t = settings_t::websocket_server;
3939

40-
/// Subscribe to messages post-upgrade (requires strand).
41-
/// Event handler is always invoked on the channel strand.
42-
template <class Message>
43-
inline void subscribe(auto&& ) NOEXCEPT
44-
{
45-
BC_ASSERT(stranded());
46-
////using message_handler = distributor_ws::handler<Message>;
47-
////ws_distributor_.subscribe(std::forward<message_handler>(handler));
48-
}
49-
50-
/// Serialize and write websocket message to peer (requires strand).
51-
/// Completion handler is always invoked on the channel strand.
52-
inline void send(system::data_chunk&& message, bool binary,
53-
result_handler&& handler) NOEXCEPT
54-
{
55-
BC_ASSERT(stranded());
56-
BC_ASSERT(upgraded_);
57-
using namespace std::placeholders;
58-
59-
// TODO: Serialize message.
60-
const auto ptr = system::move_shared(std::move(message));
61-
count_handler complete = std::bind(&channel_ws::handle_send,
62-
shared_from_base<channel_ws>(), _1, _2, ptr,
63-
std::move(handler));
64-
65-
if (!ptr)
66-
{
67-
complete(error::bad_alloc, {});
68-
return;
69-
}
70-
71-
// TODO: serialize message to send.
72-
// TODO: websocket is full duplex, so writes must be queued.
73-
ws_write(asio::const_buffer{ ptr->data(), ptr->size() },
74-
binary, std::move(complete));
75-
}
76-
40+
protected:
7741
inline channel_ws(const logger& log, const socket::ptr& socket,
7842
uint64_t identifier, const settings_t& settings,
7943
const options_t& options) NOEXCEPT
8044
: channel_http(log, socket, identifier, settings, options),
81-
////distributor_(socket->strand()),
8245
tracker<channel_ws>(log)
8346
{
8447
}
8548

86-
/// Half-duplex http until upgraded to full-duplex websockets.
49+
/// Reads are never buffered, restart the reader.
8750
void read_request() NOEXCEPT override;
8851

89-
protected:
52+
/// Pre-upgrade http read.
9053
void handle_read_request(const code& ec, size_t bytes,
9154
const http::request_cptr& request) NOEXCEPT override;
55+
56+
/// Post-upgrade websocket read.
9257
virtual void handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT;
9358

94-
private:
95-
inline void handle_send(const code& ec, size_t, const system::chunk_ptr&,
96-
const result_handler& handler) NOEXCEPT
97-
{
98-
if (ec) stop(ec);
99-
handler(ec);
100-
}
59+
/// Dispatch websocket buffer via derived handlers (override to handle).
60+
/// Override to handle dispatch, must invoke read_request() on complete.
61+
virtual void dispatch_websocket(const http::flat_buffer& buffer,
62+
size_t bytes) NOEXCEPT;
10163

102-
// These are protected by strand.
103-
////distributor_rest distributor_;
64+
private:
65+
// This is protected by strand.
10466
bool upgraded_{ false };
10567
};
10668

src/channels/channel_ws.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ void channel_ws::read_request() NOEXCEPT
5454
// upgraded
5555
// ----------------------------------------------------------------------------
5656

57-
void channel_ws::handle_read_websocket(const code& ec,
58-
size_t) NOEXCEPT
57+
void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT
5958
{
6059
BC_ASSERT(stranded());
6160

@@ -78,8 +77,18 @@ void channel_ws::handle_read_websocket(const code& ec,
7877
return;
7978
}
8079

81-
// TODO: deserialize message from request_buffer and dispatch.
82-
////distributor_.notify(message);
80+
dispatch_websocket(request_buffer(), bytes);
81+
request_buffer().consume(bytes);
82+
}
83+
84+
void channel_ws::dispatch_websocket(const http::flat_buffer&,
85+
size_t LOG_ONLY(bytes)) NOEXCEPT
86+
{
87+
LOGA("Websocket read of " << bytes << " bytes unhandled ["
88+
<< authority() << "]");
89+
90+
// Restart reader.
91+
read_request();
8392
}
8493

8594
// pre-upgrade
@@ -92,7 +101,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes,
92101

93102
if (upgraded_)
94103
{
95-
LOGA("Websocket is in upgraded state [" << authority() << "]");
104+
LOGF("Http request in websocket state [" << authority() << "]");
96105
stop(network::error::operation_failed);
97106
return;
98107
}
@@ -105,15 +114,7 @@ void channel_ws::handle_read_request(const code& ec, size_t bytes,
105114

106115
upgraded_ = true;
107116
LOGA("Websocket upgraded [" << authority() << "]");
108-
109-
const std::string welcome{ "Websocket libbitcoin/4.0" };
110-
send(to_chunk(welcome), false, [this](const code& ec) NOEXCEPT
111-
{
112-
// handle_send alread stops channel on ec.
113-
// One and only one handler of message must restart read loop.
114-
// In half duplex this happens only after send (ws full duplex).
115-
if (!ec) read_request();
116-
});
117+
read_request();
117118
}
118119

119120
BC_POP_WARNING()

0 commit comments

Comments
 (0)