Skip to content

Commit 7dda4c0

Browse files
authored
Merge pull request #743 from evoskuil/master
Add notification writer to protocol_rpc.
2 parents 35d301c + cb74866 commit 7dda4c0

File tree

12 files changed

+278
-72
lines changed

12 files changed

+278
-72
lines changed

include/bitcoin/network/beast.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
#include <optional>
2525
#include <memory>
2626
#include <utility>
27+
28+
// The default is 4096, which hits the variant http_body.
29+
#define BOOST_BEAST_FILE_BUFFER_SIZE 1024
30+
2731
#include <boost/beast/core.hpp>
2832
#include <boost/beast/http.hpp>
2933
#include <boost/beast/websocket.hpp>

include/bitcoin/network/channels/channel_rpc.hpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,31 @@ class channel_rpc
6464
result_handler&& handler) NOEXCEPT;
6565
inline void send_result(rpc::value_t&& result, size_t size_hint,
6666
result_handler&& handler) NOEXCEPT;
67+
inline void send_notification(const rpc::string_t& method,
68+
rpc::params_t&& notification, size_t size_hint,
69+
result_handler&& handler) NOEXCEPT;
6770

6871
/// Resume reading from the socket (requires strand).
6972
inline void resume() NOEXCEPT override;
7073

7174
protected:
7275
/// Serialize and write response to client (requires strand).
7376
/// Completion handler is always invoked on the channel strand.
74-
inline void send(rpc::response_t&& message, size_t size_hint,
77+
template <typename Message>
78+
inline void send(Message&& message, size_t size_hint,
7579
result_handler&& handler) NOEXCEPT;
7680

81+
/// Size and assign response_buffer_ (value type is json-rpc::json).
82+
template <typename Message>
83+
inline rpc::message_ptr<Message> assign_message(Message&& message,
84+
size_t size_hint) NOEXCEPT;
85+
86+
/// Handle send<response> completion, invokes receive().
87+
template <typename Message>
88+
inline void handle_send(const code& ec, size_t bytes,
89+
const rpc::message_cptr<Message>& message,
90+
const result_handler& handler) NOEXCEPT;
91+
7792
/// Stranded handler invoked from stop().
7893
inline void stopping(const code& ec) NOEXCEPT override;
7994

@@ -83,22 +98,13 @@ class channel_rpc
8398
/// Override to dispatch request to subscribers by requested method.
8499
virtual inline void dispatch(const rpc::request_cptr& request) NOEXCEPT;
85100

86-
/// Size and assign response_buffer_ (value type is json-rpc::json).
87-
virtual inline rpc::response_ptr assign_message(rpc::response_t&& message,
88-
size_t size_hint) NOEXCEPT;
89-
90101
/// Must call after successful message handling if no stop.
91102
virtual inline void receive() NOEXCEPT;
92103

93104
/// Handle incoming messages.
94105
virtual inline void handle_receive(const code& ec, size_t bytes,
95106
const rpc::request_cptr& request) NOEXCEPT;
96107

97-
/// Handle send completion, invokes receive().
98-
virtual inline void handle_send(const code& ec, size_t bytes,
99-
const rpc::response_cptr& response,
100-
const result_handler& handler) NOEXCEPT;
101-
102108
private:
103109
// These are protected by strand.
104110
rpc::version version_;

include/bitcoin/network/impl/channels/channel_rpc.ipp

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -140,40 +140,15 @@ inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT
140140
// Send.
141141
// ----------------------------------------------------------------------------
142142

143-
TEMPLATE
144-
void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
145-
{
146-
send_error({ .code = ec.value(), .message = ec.message() },
147-
std::move(handler));
148-
}
149-
150-
TEMPLATE
151-
void CLASS::send_error(rpc::result_t&& error,
152-
result_handler&& handler) NOEXCEPT
153-
{
154-
BC_ASSERT(stranded());
155-
const auto hint = two * error.message.size();
156-
send({ .jsonrpc = version_, .id = identity_, .error = std::move(error) },
157-
hint, std::move(handler));
158-
}
159-
160-
TEMPLATE
161-
void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
162-
result_handler&& handler) NOEXCEPT
163-
{
164-
BC_ASSERT(stranded());
165-
send({ .jsonrpc = version_, .id = identity_, .result = std::move(result) },
166-
size_hint, std::move(handler));
167-
}
168-
169143
// protected
170144
TEMPLATE
171-
inline void CLASS::send(rpc::response_t&& model, size_t size_hint,
145+
template <typename Message>
146+
inline void CLASS::send(Message&& model, size_t size_hint,
172147
result_handler&& handler) NOEXCEPT
173148
{
174149
BC_ASSERT(stranded());
175150
const auto out = assign_message(std::move(model), size_hint);
176-
count_handler complete = std::bind(&CLASS::handle_send,
151+
count_handler complete = std::bind(&CLASS::handle_send<Message>,
177152
shared_from_base<CLASS>(), _1, _2, out, std::move(handler));
178153

179154
if (!out)
@@ -187,33 +162,96 @@ inline void CLASS::send(rpc::response_t&& model, size_t size_hint,
187162

188163
// protected
189164
TEMPLATE
165+
template <typename Message>
166+
inline rpc::message_ptr<Message> CLASS::assign_message(Message&& message,
167+
size_t size_hint) NOEXCEPT
168+
{
169+
BC_ASSERT(stranded());
170+
response_buffer_->max_size(size_hint);
171+
const auto ptr = system::to_shared<rpc::message_value<Message>>();
172+
ptr->message = std::move(message);
173+
ptr->buffer = response_buffer_;
174+
return ptr;
175+
}
176+
177+
// protected
178+
TEMPLATE
179+
template <typename Message>
190180
inline void CLASS::handle_send(const code& ec, size_t bytes,
191-
const rpc::response_cptr& response, const result_handler& handler) NOEXCEPT
181+
const rpc::message_cptr<Message>& message,
182+
const result_handler& handler) NOEXCEPT
192183
{
193184
BC_ASSERT(stranded());
194185
if (ec) stop(ec);
195186

196187
// Typically a noop, but handshake may pause channel here.
197188
handler(ec);
198189

199-
LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
200-
<< response->message.error.value_or(rpc::result_t{}).message);
190+
if constexpr (is_same_type<Message, rpc::response_t>)
191+
{
192+
LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
193+
<< message->message.error.value_or(rpc::result_t{}).message);
201194

202-
// Continue read loop (does not unpause or restart channel).
203-
receive();
195+
// Continue the read loop (does not unpause or restart).
196+
receive();
197+
}
198+
else
199+
{
200+
LOGA("Rpc notification: (" << bytes << ") bytes [" << endpoint() << "] "
201+
<< message->message.method);
202+
}
204203
}
205204

206-
// private
207205
TEMPLATE
208-
inline rpc::response_ptr CLASS::assign_message(rpc::response_t&& message,
209-
size_t size_hint) NOEXCEPT
206+
inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
207+
{
208+
send_error(
209+
{
210+
.code = ec.value(),
211+
.message = ec.message()
212+
},
213+
std::move(handler));
214+
}
215+
216+
TEMPLATE
217+
inline void CLASS::send_error(rpc::result_t&& error,
218+
result_handler&& handler) NOEXCEPT
210219
{
211220
BC_ASSERT(stranded());
212-
response_buffer_->max_size(size_hint);
213-
const auto ptr = system::to_shared<rpc::response>();
214-
ptr->message = std::move(message);
215-
ptr->buffer = response_buffer_;
216-
return ptr;
221+
const auto hint = two * error.message.size();
222+
send(rpc::response_t
223+
{
224+
.jsonrpc = version_,
225+
.id = identity_,
226+
.error = std::move(error)
227+
}, hint, std::move(handler));
228+
}
229+
230+
TEMPLATE
231+
inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
232+
result_handler&& handler) NOEXCEPT
233+
{
234+
BC_ASSERT(stranded());
235+
send(rpc::response_t
236+
{
237+
.jsonrpc = version_,
238+
.id = identity_,
239+
.result = std::move(result)
240+
}, size_hint, std::move(handler));
241+
}
242+
243+
TEMPLATE
244+
inline void CLASS::send_notification(const rpc::string_t& method,
245+
rpc::params_t&& notification, size_t size_hint,
246+
result_handler&& handler) NOEXCEPT
247+
{
248+
BC_ASSERT(stranded());
249+
send(rpc::request_t
250+
{
251+
.jsonrpc = version_,
252+
.method = method,
253+
.params = std::move(notification)
254+
}, size_hint, std::move(handler));
217255
}
218256

219257
BC_POP_WARNING()

include/bitcoin/network/impl/protocols/protocol_rpc.ipp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
6969
channel_->send_result(std::move(result), size_hint, std::move(handler));
7070
}
7171

72+
TEMPLATE
73+
inline void CLASS::send_notification(const rpc::string_t& method,
74+
rpc::params_t&& notification, size_t size_hint,
75+
result_handler&& handler) NOEXCEPT
76+
{
77+
channel_->send_notification(method, std::move(notification), size_hint,
78+
std::move(handler));
79+
}
80+
7281
} // namespace network
7382
} // namespace libbitcoin
7483

include/bitcoin/network/messages/http_body.hpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ using body_reader = std::variant
5151
rpc::reader
5252
>;
5353

54+
// TODO: file_writer is eating 4k stack for each type.
55+
// BOOST_BEAST_FILE_BUFFER_SIZE set to 1024 in beast.hpp.
5456
using empty_writer = http::empty_body::writer;
5557
using data_writer = http::chunk_body::writer;
5658
using file_writer = http::file_body::writer;
@@ -60,15 +62,15 @@ using string_writer = http::string_body::writer;
6062
using json_writer = http::json_body::writer;
6163
using body_writer = std::variant
6264
<
63-
std::monostate,
64-
empty_writer,
65-
data_writer,
66-
file_writer,
67-
span_writer,
68-
buffer_writer,
69-
string_writer,
70-
json_writer,
71-
rpc::writer
65+
std::monostate, // 1 byte
66+
empty_writer, // 1 byte
67+
data_writer, // 8 bytes
68+
file_writer, // 1,040 bytes! (4,112 bytes by default)
69+
span_writer, // 8 bytes
70+
buffer_writer, // 16 bytes
71+
string_writer, // 8 bytes
72+
json_writer, // 136 bytes!
73+
rpc::writer // 144 bytes!
7274
>;
7375

7476
using empty_value = http::empty_body::value_type;

include/bitcoin/network/messages/rpc/body.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ using response_cptr = std::shared_ptr<const response>;
114114
using response_ptr = std::shared_ptr<response>;
115115
using writer = response_body::writer;
116116

117+
// Allows request to be sent (for notifications).
118+
template <typename Message>
119+
using message_value = typename body<Message>::value_type;
120+
template <typename Message>
121+
using message_ptr = std::shared_ptr<message_value<Message>>;
122+
template <typename Message>
123+
using message_cptr = std::shared_ptr<const message_value<Message>>;
124+
117125
} // namespace rpc
118126
} // namespace network
119127
} // namespace libbitcoin

include/bitcoin/network/net/proxy.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ class BCT_API proxy
143143
virtual void write(rpc::response& response,
144144
count_handler&& handler) NOEXCEPT;
145145

146+
/// Write rpc notification (request) to the socket (json buffer in body).
147+
virtual void write(rpc::request& notification,
148+
count_handler&& handler) NOEXCEPT;
149+
146150
/// WS (generic).
147151
/// -----------------------------------------------------------------------
148152

include/bitcoin/network/net/socket.hpp

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ class BCT_API socket
132132
virtual void rpc_write(rpc::response& response,
133133
count_handler&& handler) NOEXCEPT;
134134

135+
/// Write rpc notification to the socket, handler posted to socket strand.
136+
virtual void rpc_notify(rpc::request& notification,
137+
count_handler&& handler) NOEXCEPT;
138+
135139
/// WS (generic).
136140
/// -----------------------------------------------------------------------
137141

@@ -230,7 +234,7 @@ class BCT_API socket
230234
}
231235

232236
rpc::request& value;
233-
rpc::reader reader;
237+
rpc::request_body::reader reader;
234238
http::flat_buffer& buffer;
235239
};
236240

@@ -245,7 +249,21 @@ class BCT_API socket
245249
}
246250

247251
rpc::response& value;
248-
rpc::writer writer;
252+
rpc::response_body::writer writer;
253+
};
254+
255+
struct notify_rpc
256+
{
257+
typedef std::shared_ptr<notify_rpc> ptr;
258+
using out_buffer = rpc::writer::out_buffer;
259+
260+
notify_rpc(rpc::request& request) NOEXCEPT
261+
: value{ request }, writer{ value }
262+
{
263+
}
264+
265+
rpc::request& value;
266+
rpc::request_body::writer writer;
249267
};
250268

251269
// do
@@ -276,6 +294,8 @@ class BCT_API socket
276294
const count_handler& handler) NOEXCEPT;
277295
void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out,
278296
const count_handler& handler) NOEXCEPT;
297+
void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out,
298+
const count_handler& handler) NOEXCEPT;
279299

280300
// ws (generic)
281301
void do_ws_read(ref<http::flat_buffer> out,
@@ -322,6 +342,8 @@ class BCT_API socket
322342
const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT;
323343
void handle_rpc_write(boost_code ec, size_t size, size_t total,
324344
const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT;
345+
void handle_rpc_notify(boost_code ec, size_t size, size_t total,
346+
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;
325347

326348
// ws (generic)
327349
void handle_ws_read(const boost_code& ec, size_t size,

include/bitcoin/network/protocols/protocol_rpc.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class protocol_rpc
6161
result_handler&& handler) NOEXCEPT;
6262
virtual inline void send_result(rpc::value_t&& result, size_t size_hint,
6363
result_handler&& handler) NOEXCEPT;
64+
virtual inline void send_notification(const rpc::string_t& method,
65+
rpc::params_t&& notification, size_t size_hint,
66+
result_handler&& handler) NOEXCEPT;
6467

6568
/// Default noop completion handler.
6669
virtual inline void complete(const code&) NOEXCEPT {};

0 commit comments

Comments
 (0)