Skip to content

Commit cb74866

Browse files
committed
Generalize channel_rpc/protocol_rpc for notification write.
1 parent 91a43b1 commit cb74866

File tree

4 files changed

+107
-51
lines changed

4 files changed

+107
-51
lines changed

include/bitcoin/network/channels/channel_rpc.hpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,31 @@ class channel_rpc
6464
result_handler&& handler) NOEXCEPT;
6565
inline void send_result(rpc::value_t&& result, size_t size_hint,
6666
result_handler&& handler) NOEXCEPT;
67+
inline void send_notification(const rpc::string_t& method,
68+
rpc::params_t&& notification, size_t size_hint,
69+
result_handler&& handler) NOEXCEPT;
6770

6871
/// Resume reading from the socket (requires strand).
6972
inline void resume() NOEXCEPT override;
7073

7174
protected:
7275
/// Serialize and write response to client (requires strand).
7376
/// Completion handler is always invoked on the channel strand.
74-
inline void send(rpc::response_t&& message, size_t size_hint,
77+
template <typename Message>
78+
inline void send(Message&& message, size_t size_hint,
7579
result_handler&& handler) NOEXCEPT;
7680

81+
/// Size and assign response_buffer_ (value type is json-rpc::json).
82+
template <typename Message>
83+
inline rpc::message_ptr<Message> assign_message(Message&& message,
84+
size_t size_hint) NOEXCEPT;
85+
86+
/// Handle send<response> completion, invokes receive().
87+
template <typename Message>
88+
inline void handle_send(const code& ec, size_t bytes,
89+
const rpc::message_cptr<Message>& message,
90+
const result_handler& handler) NOEXCEPT;
91+
7792
/// Stranded handler invoked from stop().
7893
inline void stopping(const code& ec) NOEXCEPT override;
7994

@@ -83,22 +98,13 @@ class channel_rpc
8398
/// Override to dispatch request to subscribers by requested method.
8499
virtual inline void dispatch(const rpc::request_cptr& request) NOEXCEPT;
85100

86-
/// Size and assign response_buffer_ (value type is json-rpc::json).
87-
virtual inline rpc::response_ptr assign_message(rpc::response_t&& message,
88-
size_t size_hint) NOEXCEPT;
89-
90101
/// Must call after successful message handling if no stop.
91102
virtual inline void receive() NOEXCEPT;
92103

93104
/// Handle incoming messages.
94105
virtual inline void handle_receive(const code& ec, size_t bytes,
95106
const rpc::request_cptr& request) NOEXCEPT;
96107

97-
/// Handle send completion, invokes receive().
98-
virtual inline void handle_send(const code& ec, size_t bytes,
99-
const rpc::response_cptr& response,
100-
const result_handler& handler) NOEXCEPT;
101-
102108
private:
103109
// These are protected by strand.
104110
rpc::version version_;

include/bitcoin/network/impl/channels/channel_rpc.ipp

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -140,40 +140,15 @@ inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT
140140
// Send.
141141
// ----------------------------------------------------------------------------
142142

143-
TEMPLATE
144-
void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
145-
{
146-
send_error({ .code = ec.value(), .message = ec.message() },
147-
std::move(handler));
148-
}
149-
150-
TEMPLATE
151-
void CLASS::send_error(rpc::result_t&& error,
152-
result_handler&& handler) NOEXCEPT
153-
{
154-
BC_ASSERT(stranded());
155-
const auto hint = two * error.message.size();
156-
send({ .jsonrpc = version_, .id = identity_, .error = std::move(error) },
157-
hint, std::move(handler));
158-
}
159-
160-
TEMPLATE
161-
void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
162-
result_handler&& handler) NOEXCEPT
163-
{
164-
BC_ASSERT(stranded());
165-
send({ .jsonrpc = version_, .id = identity_, .result = std::move(result) },
166-
size_hint, std::move(handler));
167-
}
168-
169143
// protected
170144
TEMPLATE
171-
inline void CLASS::send(rpc::response_t&& model, size_t size_hint,
145+
template <typename Message>
146+
inline void CLASS::send(Message&& model, size_t size_hint,
172147
result_handler&& handler) NOEXCEPT
173148
{
174149
BC_ASSERT(stranded());
175150
const auto out = assign_message(std::move(model), size_hint);
176-
count_handler complete = std::bind(&CLASS::handle_send,
151+
count_handler complete = std::bind(&CLASS::handle_send<Message>,
177152
shared_from_base<CLASS>(), _1, _2, out, std::move(handler));
178153

179154
if (!out)
@@ -187,33 +162,96 @@ inline void CLASS::send(rpc::response_t&& model, size_t size_hint,
187162

188163
// protected
189164
TEMPLATE
165+
template <typename Message>
166+
inline rpc::message_ptr<Message> CLASS::assign_message(Message&& message,
167+
size_t size_hint) NOEXCEPT
168+
{
169+
BC_ASSERT(stranded());
170+
response_buffer_->max_size(size_hint);
171+
const auto ptr = system::to_shared<rpc::message_value<Message>>();
172+
ptr->message = std::move(message);
173+
ptr->buffer = response_buffer_;
174+
return ptr;
175+
}
176+
177+
// protected
178+
TEMPLATE
179+
template <typename Message>
190180
inline void CLASS::handle_send(const code& ec, size_t bytes,
191-
const rpc::response_cptr& response, const result_handler& handler) NOEXCEPT
181+
const rpc::message_cptr<Message>& message,
182+
const result_handler& handler) NOEXCEPT
192183
{
193184
BC_ASSERT(stranded());
194185
if (ec) stop(ec);
195186

196187
// Typically a noop, but handshake may pause channel here.
197188
handler(ec);
198189

199-
LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
200-
<< response->message.error.value_or(rpc::result_t{}).message);
190+
if constexpr (is_same_type<Message, rpc::response_t>)
191+
{
192+
LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
193+
<< message->message.error.value_or(rpc::result_t{}).message);
201194

202-
// Continue read loop (does not unpause or restart channel).
203-
receive();
195+
// Continue the read loop (does not unpause or restart).
196+
receive();
197+
}
198+
else
199+
{
200+
LOGA("Rpc notification: (" << bytes << ") bytes [" << endpoint() << "] "
201+
<< message->message.method);
202+
}
204203
}
205204

206-
// private
207205
TEMPLATE
208-
inline rpc::response_ptr CLASS::assign_message(rpc::response_t&& message,
209-
size_t size_hint) NOEXCEPT
206+
inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
207+
{
208+
send_error(
209+
{
210+
.code = ec.value(),
211+
.message = ec.message()
212+
},
213+
std::move(handler));
214+
}
215+
216+
TEMPLATE
217+
inline void CLASS::send_error(rpc::result_t&& error,
218+
result_handler&& handler) NOEXCEPT
210219
{
211220
BC_ASSERT(stranded());
212-
response_buffer_->max_size(size_hint);
213-
const auto ptr = system::to_shared<rpc::response>();
214-
ptr->message = std::move(message);
215-
ptr->buffer = response_buffer_;
216-
return ptr;
221+
const auto hint = two * error.message.size();
222+
send(rpc::response_t
223+
{
224+
.jsonrpc = version_,
225+
.id = identity_,
226+
.error = std::move(error)
227+
}, hint, std::move(handler));
228+
}
229+
230+
TEMPLATE
231+
inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
232+
result_handler&& handler) NOEXCEPT
233+
{
234+
BC_ASSERT(stranded());
235+
send(rpc::response_t
236+
{
237+
.jsonrpc = version_,
238+
.id = identity_,
239+
.result = std::move(result)
240+
}, size_hint, std::move(handler));
241+
}
242+
243+
TEMPLATE
244+
inline void CLASS::send_notification(const rpc::string_t& method,
245+
rpc::params_t&& notification, size_t size_hint,
246+
result_handler&& handler) NOEXCEPT
247+
{
248+
BC_ASSERT(stranded());
249+
send(rpc::request_t
250+
{
251+
.jsonrpc = version_,
252+
.method = method,
253+
.params = std::move(notification)
254+
}, size_hint, std::move(handler));
217255
}
218256

219257
BC_POP_WARNING()

include/bitcoin/network/impl/protocols/protocol_rpc.ipp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
6969
channel_->send_result(std::move(result), size_hint, std::move(handler));
7070
}
7171

72+
TEMPLATE
73+
inline void CLASS::send_notification(const rpc::string_t& method,
74+
rpc::params_t&& notification, size_t size_hint,
75+
result_handler&& handler) NOEXCEPT
76+
{
77+
channel_->send_notification(method, std::move(notification), size_hint,
78+
std::move(handler));
79+
}
80+
7281
} // namespace network
7382
} // namespace libbitcoin
7483

include/bitcoin/network/protocols/protocol_rpc.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class protocol_rpc
6161
result_handler&& handler) NOEXCEPT;
6262
virtual inline void send_result(rpc::value_t&& result, size_t size_hint,
6363
result_handler&& handler) NOEXCEPT;
64+
virtual inline void send_notification(const rpc::string_t& method,
65+
rpc::params_t&& notification, size_t size_hint,
66+
result_handler&& handler) NOEXCEPT;
6467

6568
/// Default noop completion handler.
6669
virtual inline void complete(const code&) NOEXCEPT {};

0 commit comments

Comments
 (0)