Skip to content

Commit 87e1cc2

Browse files
committed
[rpc] Optimize RPC message reception by moving JSON data instead of copying it
1 parent e448308 commit 87e1cc2

File tree

7 files changed

+63
-42
lines changed

7 files changed

+63
-42
lines changed

src/messages/GenericMessageSender.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ class GenericMessageSender
9797
if (!request_fifo || request_fifo->empty())
9898
{
9999
// Execute call
100-
rapidjson::Document resp;
101-
resp.Parse("{}");
102-
if (m_rpc.call(action, payload, resp, m_timeout))
100+
rapidjson::Document rpc_frame;
101+
rapidjson::Value resp;
102+
if (m_rpc.call(action, payload, rpc_frame, resp, m_timeout))
103103
{
104104
// Convert response
105105
const char* error_code = nullptr;
106106
std::string error_message;
107-
resp_converter->setAllocator(&resp.GetAllocator());
107+
resp_converter->setAllocator(&rpc_frame.GetAllocator());
108108
if (resp_converter->fromJson(resp, response, error_code, error_message))
109109
{
110110
ret = CallResult::Ok;
@@ -149,14 +149,14 @@ class GenericMessageSender
149149
if (resp_converter)
150150
{
151151
// Execute call
152-
rapidjson::Document resp;
153-
resp.Parse("{}");
154-
if (m_rpc.call(action, request, resp, m_timeout))
152+
rapidjson::Document rpc_frame;
153+
rapidjson::Value resp;
154+
if (m_rpc.call(action, request, rpc_frame, resp, m_timeout))
155155
{
156156
// Convert response
157157
const char* error_code = nullptr;
158158
std::string error_message;
159-
resp_converter->setAllocator(&resp.GetAllocator());
159+
resp_converter->setAllocator(&rpc_frame.GetAllocator());
160160
if (resp_converter->fromJson(resp, response, error_code, error_message))
161161
{
162162
ret = CallResult::Ok;

src/rpc/IRpc.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@ class IRpc
5050
* @brief Call a remote action and wait for its response
5151
* @param action Remote action
5252
* @param payload JSON payload for the action
53+
* @param rpc_frame Full JSON response received
5354
* @param response JSON response received
5455
* @param timeout Response timeout
5556
* @return true if a response has been received, false otherwise
5657
*/
5758
virtual bool call(const std::string& action,
5859
const rapidjson::Document& payload,
59-
rapidjson::Document& response,
60+
rapidjson::Document& rpc_frame,
61+
rapidjson::Value& response,
6062
std::chrono::milliseconds timeout = std::chrono::seconds(2)) = 0;
6163

6264
/**

src/rpc/RpcBase.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ RpcBase::~RpcBase()
4545
stop();
4646
}
4747

48-
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, std::chrono::milliseconds) */
48+
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, rapidjson::Value&, std::chrono::milliseconds) */
4949
bool RpcBase::call(const std::string& action,
5050
const rapidjson::Document& payload,
51-
rapidjson::Document& response,
51+
rapidjson::Document& rpc_frame,
52+
rapidjson::Value& response,
5253
std::chrono::milliseconds timeout)
5354
{
5455
bool ret = false;
@@ -110,7 +111,8 @@ bool RpcBase::call(const std::string& action,
110111
// Extract response
111112
if (rpc_message)
112113
{
113-
response.CopyFrom(rpc_message->payload, response.GetAllocator());
114+
rpc_frame.Swap(rpc_message->rpc_frame);
115+
response.Swap(rpc_message->payload);
114116
delete rpc_message;
115117
}
116118
else
@@ -233,14 +235,14 @@ void RpcBase::processReceivedData(const void* data, size_t size)
233235
switch (msg_type)
234236
{
235237
case MessageType::CALL:
236-
valid = decodeCall(unique_id, rpc_frame[2], rpc_frame[3]);
238+
valid = decodeCall(unique_id, rpc_frame, rpc_frame[2], rpc_frame[3]);
237239
break;
238240
case MessageType::CALLRESULT:
239-
valid = decodeCallResult(unique_id, rpc_frame[2]);
241+
valid = decodeCallResult(unique_id, rpc_frame, rpc_frame[2]);
240242
break;
241243
case MessageType::CALLERROR:
242244
default:
243-
valid = decodeCallError(unique_id, rpc_frame[2], rpc_frame[3], rpc_frame[4]);
245+
valid = decodeCallError(unique_id, rpc_frame, rpc_frame[2], rpc_frame[3], rpc_frame[4]);
244246
break;
245247
}
246248
if (!valid)
@@ -283,15 +285,18 @@ bool RpcBase::send(const std::string& msg)
283285
}
284286

285287
/** @brief Decode a CALL message */
286-
bool RpcBase::decodeCall(const std::string& unique_id, const rapidjson::Value& action, const rapidjson::Value& payload)
288+
bool RpcBase::decodeCall(const std::string& unique_id,
289+
rapidjson::Document& rpc_frame,
290+
const rapidjson::Value& action,
291+
rapidjson::Value& payload)
287292
{
288293
bool ret = false;
289294

290295
// Check types
291296
if (action.IsString() && payload.IsObject())
292297
{
293298
// Add request to the queue
294-
RpcMessage* msg = new RpcMessage(unique_id, action.GetString(), payload);
299+
RpcMessage* msg = new RpcMessage(unique_id, action.GetString(), rpc_frame, payload);
295300
m_requests_queue.push(msg);
296301

297302
ret = true;
@@ -301,15 +306,15 @@ bool RpcBase::decodeCall(const std::string& unique_id, const rapidjson::Value& a
301306
}
302307

303308
/** @brief Decode a CALLRESULT message */
304-
bool RpcBase::decodeCallResult(const std::string& unique_id, const rapidjson::Value& payload)
309+
bool RpcBase::decodeCallResult(const std::string& unique_id, rapidjson::Document& rpc_frame, rapidjson::Value& payload)
305310
{
306311
bool ret = false;
307312

308313
// Check types
309314
if (payload.IsObject())
310315
{
311316
// Add result to the queue
312-
RpcMessage* msg = new RpcMessage(unique_id, payload);
317+
RpcMessage* msg = new RpcMessage(unique_id, rpc_frame, payload);
313318
m_results_queue.push(msg);
314319

315320
ret = true;
@@ -320,6 +325,7 @@ bool RpcBase::decodeCallResult(const std::string& unique_id, const rapidjson::Va
320325

321326
/** @brief Decode a CALLERROR message */
322327
bool RpcBase::decodeCallError(const std::string& unique_id,
328+
rapidjson::Document& rpc_frame,
323329
const rapidjson::Value& error,
324330
const rapidjson::Value& message,
325331
const rapidjson::Value& payload)
@@ -330,6 +336,7 @@ bool RpcBase::decodeCallError(const std::string& unique_id,
330336
if (error.IsString() && message.IsString() && payload.IsObject())
331337
{
332338
(void)unique_id;
339+
(void)rpc_frame;
333340
ret = true;
334341
}
335342

src/rpc/RpcBase.h

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ class RpcBase : public IRpc
4444

4545
// IRpc interface
4646

47-
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, std::chrono::milliseconds) */
47+
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, rapidjson::Value&, std::chrono::milliseconds) */
4848
bool call(const std::string& action,
4949
const rapidjson::Document& payload,
50-
rapidjson::Document& response,
50+
rapidjson::Document& rpc_frame,
51+
rapidjson::Value& response,
5152
std::chrono::milliseconds timeout = std::chrono::seconds(2)) override;
5253

5354
/** @copydoc void IRpc::registerListener(IListener&) */
@@ -86,18 +87,20 @@ class RpcBase : public IRpc
8687
/** @brief RPC message */
8788
struct RpcMessage
8889
{
89-
RpcMessage(const std::string& _unique_id, const char* _action, const rapidjson::Value& _payload)
90-
: unique_id(_unique_id), action(_action), payload()
90+
RpcMessage(const std::string& _unique_id, const char* _action, rapidjson::Document& _rpc_frame, rapidjson::Value& _payload)
91+
: unique_id(_unique_id), action(_action), rpc_frame(std::move(_rpc_frame)), payload()
9192
{
92-
payload.CopyFrom(_payload, payload.GetAllocator());
93+
payload.Swap(_payload);
9394
}
94-
RpcMessage(const std::string& _unique_id, const rapidjson::Value& _payload) : unique_id(_unique_id), action(), payload()
95+
RpcMessage(const std::string& _unique_id, rapidjson::Document& _rpc_frame, rapidjson::Value& _payload)
96+
: unique_id(_unique_id), action(), rpc_frame(std::move(_rpc_frame)), payload()
9597
{
96-
payload.CopyFrom(_payload, payload.GetAllocator());
98+
payload.Swap(_payload);
9799
}
98100
const std::string unique_id;
99101
const std::string action;
100-
rapidjson::Document payload;
102+
rapidjson::Document rpc_frame;
103+
rapidjson::Value payload;
101104
};
102105

103106
/** @brief RPC listener */
@@ -119,13 +122,17 @@ class RpcBase : public IRpc
119122
bool send(const std::string& msg);
120123

121124
/** @brief Decode a CALL message */
122-
bool decodeCall(const std::string& unique_id, const rapidjson::Value& action, const rapidjson::Value& payload);
125+
bool decodeCall(const std::string& unique_id,
126+
rapidjson::Document& rpc_frame,
127+
const rapidjson::Value& action,
128+
rapidjson::Value& payload);
123129

124130
/** @brief Decode a CALLRESULT message */
125-
bool decodeCallResult(const std::string& unique_id, const rapidjson::Value& payload);
131+
bool decodeCallResult(const std::string& unique_id, rapidjson::Document& rpc_frame, rapidjson::Value& payload);
126132

127133
/** @brief Decode a CALLERROR message */
128134
bool decodeCallError(const std::string& unique_id,
135+
rapidjson::Document& rpc_frame,
129136
const rapidjson::Value& error,
130137
const rapidjson::Value& message,
131138
const rapidjson::Value& payload);

tests/rpc/test_rpc.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,16 @@ TEST_SUITE("CALL messages")
182182
rapidjson::Document payload;
183183
payload.Parse(CALL_PAYLOAD);
184184

185-
rapidjson::Document response;
185+
rapidjson::Document rpc_frame;
186+
rapidjson::Value response;
186187
rapidjson::StringBuffer buffer;
187188
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
188189

189-
CHECK_FALSE(client.call(ACTION, payload, response, std::chrono::milliseconds(0)));
190+
CHECK_FALSE(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(0)));
190191
CHECK(websocket.sendCalled());
191192
CHECK_EQ(strcmp(reinterpret_cast<const char*>(websocket.sentData()), EXPECTED_CALL_MESSAGE_0), 0);
192193

193-
CHECK_FALSE(client.call(ACTION, payload, response, std::chrono::milliseconds(0)));
194+
CHECK_FALSE(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(0)));
194195
CHECK_EQ(strcmp(reinterpret_cast<const char*>(websocket.sentData()), EXPECTED_CALL_MESSAGE_1), 0);
195196

196197
std::thread response_thread(
@@ -199,7 +200,7 @@ TEST_SUITE("CALL messages")
199200
std::this_thread::sleep_for(std::chrono::milliseconds(25u));
200201
websocket.notifyDataReceived(EXPECTED_CALLRESULT_MESSAGE_2, strlen(EXPECTED_CALLRESULT_MESSAGE_2));
201202
});
202-
CHECK(client.call(ACTION, payload, response, std::chrono::milliseconds(50)));
203+
CHECK(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(50)));
203204
CHECK_EQ(strcmp(reinterpret_cast<const char*>(websocket.sentData()), EXPECTED_CALL_MESSAGE_2), 0);
204205
response.Accept(writer);
205206
CHECK_EQ(strcmp(buffer.GetString(), CALLRESULT_PAYLOAD), 0);
@@ -216,17 +217,18 @@ TEST_SUITE("CALL messages")
216217
websocket.setConnected();
217218

218219
rapidjson::Document payload;
219-
rapidjson::Document response;
220+
rapidjson::Document rpc_frame;
221+
rapidjson::Value response;
220222

221223
auto start = std::chrono::steady_clock::now();
222-
CHECK_FALSE(client.call(ACTION, payload, response, std::chrono::milliseconds(0)));
224+
CHECK_FALSE(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(0)));
223225
auto end = std::chrono::steady_clock::now();
224226
std::chrono::duration<double> diff = end - start;
225227
CHECK_LT(diff, std::chrono::milliseconds(5u));
226228
CHECK(websocket.sendCalled());
227229

228230
start = std::chrono::steady_clock::now();
229-
CHECK_FALSE(client.call(ACTION, payload, response, std::chrono::milliseconds(100)));
231+
CHECK_FALSE(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(100)));
230232
end = std::chrono::steady_clock::now();
231233
diff = end - start;
232234
CHECK_GT(diff, std::chrono::milliseconds(99u));
@@ -238,7 +240,7 @@ TEST_SUITE("CALL messages")
238240
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
239241
websocket.notifyDataReceived(EXPECTED_CALLRESULT_MESSAGE_2, strlen(EXPECTED_CALLRESULT_MESSAGE_2));
240242
});
241-
CHECK_FALSE(client.call(ACTION, payload, response, std::chrono::milliseconds(50)));
243+
CHECK_FALSE(client.call(ACTION, payload, rpc_frame, response, std::chrono::milliseconds(50)));
242244
response_thread.join();
243245
}
244246

tests/stubs/RpcStub.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ RpcStub::RpcStub() : m_connected(false), m_listener(nullptr), m_spy(nullptr), m_
2727
/** @brief Destructor */
2828
RpcStub::~RpcStub() { }
2929

30-
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, std::chrono::milliseconds) */
30+
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, rapidjson::Value&, std::chrono::milliseconds) */
3131
bool RpcStub::call(const std::string& action,
3232
const rapidjson::Document& payload,
33-
rapidjson::Document& response,
33+
rapidjson::Document& rpc_frame,
34+
rapidjson::Value& response,
3435
std::chrono::milliseconds timeout)
3536
{
3637
(void)timeout;
@@ -41,7 +42,8 @@ bool RpcStub::call(const std::string& action,
4142
rapidjson::Document* doc = new rapidjson::Document();
4243
doc->CopyFrom(payload, doc->GetAllocator());
4344
m_calls.emplace_back(action, doc);
44-
response.CopyFrom(m_response, response.GetAllocator());
45+
rpc_frame.CopyFrom(m_response, rpc_frame.GetAllocator());
46+
response.CopyFrom(m_response, rpc_frame.GetAllocator());
4547

4648
ret = !m_call_will_fail;
4749
}

tests/stubs/RpcStub.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ class RpcStub : public IRpc
4141
/** @copydoc bool IRpc::isConnected() */
4242
bool isConnected() const override { return m_connected; }
4343

44-
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&, std::chrono::milliseconds) */
44+
/** @copydoc bool IRpc::call(const std::string&, const rapidjson::Document&, rapidjson::Document&,rapidjson::Value&, std::chrono::milliseconds) */
4545
bool call(const std::string& action,
4646
const rapidjson::Document& payload,
47-
rapidjson::Document& response,
47+
rapidjson::Document& rpc_frame,
48+
rapidjson::Value& response,
4849
std::chrono::milliseconds timeout) override;
4950

5051
/** @copydoc void IRpc::registerListener(IListener&) */

0 commit comments

Comments
 (0)