Skip to content

Commit 836d615

Browse files
committed
[#28665] DocDB: Add CRC to RPC response
Summary: It could happen that RPC call response could be corrupted on lower layers. Which could lead to data actual corruption. To avoid this we add CRC checksum like it was already done for RPC requests to filter out such responses. **Upgrade/Rollback safety:** Changed CRC field ids, so different versions would not see CRC from each other. So CRC would not be checked between old and new version. Jira: DB-18359 Test Plan: Jenkins Reviewers: mlillibridge Reviewed By: mlillibridge Subscribers: kannan, hbhanawat, rthallam, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D46647
1 parent bb329fc commit 836d615

File tree

9 files changed

+91
-40
lines changed

9 files changed

+91
-40
lines changed

src/yb/rpc/constants.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,9 @@
3434

3535
#include <stdint.h>
3636

37-
namespace yb {
38-
namespace rpc {
37+
namespace yb::rpc {
3938

4039
// There is a 4-byte length prefix before any packet.
41-
const uint8_t kMsgLengthPrefixLength = 4;
40+
constexpr size_t kMsgLengthPrefixLength = 4;
4241

43-
} // namespace rpc
44-
} // namespace yb
42+
} // namespace yb::rpc

src/yb/rpc/outbound_call.cc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -403,13 +403,7 @@ Status OutboundCall::SetRequestParam(
403403
}
404404
RETURN_NOT_OK(SerializeMessage(req, req_size, buffer_, sidecars_size, header_size));
405405
if (use_crc) {
406-
auto crc = crc::Crc64c(buffer_.udata() + header_size, message_size, 0);
407-
if (sidecars_size) {
408-
sidecars_->buffer().IterateBlocks([&crc](Slice block) {
409-
crc = crc::Crc64c(block.data(), block.size(), crc);
410-
});
411-
}
412-
LittleEndian::Store32(buffer_.udata() + header_size - 4, static_cast<uint32_t>(crc));
406+
StoreCrc(buffer_, header_size, message_size, sidecars_.get());
413407
}
414408

415409
if (method_metrics_) {

src/yb/rpc/rpc-test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,6 +1184,7 @@ void TestMaxSizeRpcResponse(CalculatorServiceProxy* proxy) {
11841184
ResponseHeader resp_header;
11851185
resp_header.set_call_id(1);
11861186
resp_header.set_is_error(false);
1187+
resp_header.set_crc(0);
11871188

11881189
const size_t header_pb_len = resp_header.ByteSize();
11891190
const size_t header_tot_len = OutboundCall::HeaderTotalLength(header_pb_len);

src/yb/rpc/rpc_header.proto

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ message RemoteMethodPB {
5656

5757
// The header for the RPC request frame.
5858
message RequestHeader {
59+
reserved 6;
60+
5961
// A sequence number that is sent back in the Response. Hadoop specifies a uint32 and
6062
// casts it to a signed int. That is counterintuitive, so we use an int32 instead.
6163
// Allowed values (inherited from Hadoop):
@@ -78,11 +80,13 @@ message RequestHeader {
7880

7981
optional AshMetadataPB metadata = 5;
8082

81-
// CRC32C for message body.
82-
optional fixed32 crc = 6;
83+
// CRC32C for message body and header without CRC field. Use 15 as max 1 byte tag.
84+
optional fixed32 crc = 15;
8385
}
8486

8587
message ResponseHeader {
88+
reserved 4;
89+
8690
required int32 call_id = 1;
8791

8892
// If this is set, then this is an error response and the
@@ -95,7 +99,8 @@ message ResponseHeader {
9599
// is the first byte after the bytes for this protobuf.
96100
repeated uint32 sidecar_offsets = 3;
97101

98-
optional fixed32 crc = 4;
102+
// CRC32C for message body and header without CRC field. Use 15 as max 1 byte tag.
103+
optional fixed32 crc = 15;
99104
}
100105

101106
// An emtpy message. Since CQL RPC server bypasses protobuf to handle requests and responses but

src/yb/rpc/serialization.cc

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
#include "yb/rpc/call_data.h"
4545
#include "yb/rpc/rpc_header.pb.h"
46+
#include "yb/rpc/sidecars.h"
4647

4748
#include "yb/util/crc.h"
4849
#include "yb/util/faststring.h"
@@ -57,8 +58,10 @@ using google::protobuf::MessageLite;
5758
using google::protobuf::io::CodedInputStream;
5859
using google::protobuf::io::CodedOutputStream;
5960

60-
namespace yb {
61-
namespace rpc {
61+
namespace yb::rpc {
62+
63+
// tag + 4 bytes values
64+
constexpr size_t kCrcFieldSize = 5;
6265

6366
size_t SerializedMessageSize(size_t body_size, size_t additional_size) {
6467
auto full_size = body_size + additional_size;
@@ -133,7 +136,7 @@ Status SerializeHeader(const MessageLite& header,
133136
return Status::OK();
134137
}
135138

136-
Result<RefCntBuffer> SerializeRequest(
139+
Result<std::pair<RefCntBuffer, size_t>> SerializeResponse(
137140
size_t body_size, size_t additional_size, const google::protobuf::Message& header,
138141
AnyMessageConstPtr body) {
139142
auto message_size = SerializedMessageSize(body_size, additional_size);
@@ -143,7 +146,7 @@ Result<RefCntBuffer> SerializeRequest(
143146
header, message_size + additional_size, &result, message_size, &header_size));
144147

145148
RETURN_NOT_OK(SerializeMessage(body, body_size, result, additional_size, header_size));
146-
return result;
149+
return std::pair{result, header_size};
147150
}
148151

149152
bool SkipField(uint8_t type, CodedInputStream* in) {
@@ -267,11 +270,12 @@ Result<Slice> ParseYBHeader(Slice buf, Header* parsed_header) {
267270
in.PopLimit(l);
268271

269272
if (auto crc = GetCrc(*parsed_header)) {
270-
auto offset = in.CurrentPosition();
271-
auto msg_crc = crc::Crc32c(buf.data() + offset, buf.size() - offset);
272-
if (msg_crc != *crc) {
273+
crc::Crc32Accumulator msg_crc;
274+
msg_crc.Feed(buf.Prefix(in.CurrentPosition() - kCrcFieldSize));
275+
msg_crc.Feed(buf.WithoutPrefix(in.CurrentPosition()));
276+
if (msg_crc.result() != *crc) {
273277
auto status = STATUS_FORMAT(
274-
Corruption, "Invalid CRC $0 for request $1", msg_crc, *parsed_header);
278+
Corruption, "Invalid CRC $0 for $1", msg_crc.result(), *parsed_header);
275279
LOG(DFATAL) << status;
276280
return status;
277281
}
@@ -411,5 +415,20 @@ Status ParseMetadataFromSharedMemory(
411415
return ParseMetadata(metadata, out);
412416
}
413417

414-
} // namespace rpc
415-
} // namespace yb
418+
void StoreCrc(
419+
const RefCntBuffer& buffer, size_t header_size, size_t message_size, Sidecars* sidecars) {
420+
crc::Crc32Accumulator crc;
421+
// 5 last bytes for CRC.
422+
crc.Feed(
423+
buffer.AsSlice().Prefix(header_size - kCrcFieldSize).WithoutPrefix(kMsgLengthPrefixLength));
424+
crc.Feed(buffer.udata() + header_size, message_size);
425+
if (sidecars) {
426+
sidecars->buffer().IterateBlocks([&crc](Slice block) {
427+
crc.Feed(block.data(), block.size());
428+
});
429+
}
430+
// Expect CRC to be located at header end.
431+
LittleEndian::Store32(buffer.udata() + header_size - 4, crc.result());
432+
}
433+
434+
} // namespace yb::rpc

src/yb/rpc/serialization.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class Status;
6060

6161
namespace rpc {
6262

63-
Result<RefCntBuffer> SerializeRequest(
63+
Result<std::pair<RefCntBuffer, size_t>> SerializeResponse(
6464
size_t body_size, size_t additional_size, const google::protobuf::Message& header,
6565
AnyMessageConstPtr body);
6666

@@ -105,5 +105,8 @@ Result<ParsedRemoteMethod> ParseRemoteMethod(const Slice& buf);
105105
Status ParseMetadata(Slice buf, AnyMessagePtr out);
106106
Status ParseMetadataFromSharedMemory(uint8_t** input, size_t length, AnyMessagePtr out);
107107

108+
void StoreCrc(
109+
const RefCntBuffer& buffer, size_t header_size, size_t message_size, Sidecars* sidecars);
110+
108111
} // namespace rpc
109112
} // namespace yb

src/yb/rpc/yb_rpc.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "yb/rpc/rpc_introspection.pb.h"
3030
#include "yb/rpc/serialization.h"
3131

32+
#include "yb/util/crc.h"
3233
#include "yb/util/debug/trace_event.h"
3334
#include "yb/util/flags.h"
3435
#include "yb/util/format.h"
@@ -49,6 +50,7 @@ DEFINE_test_flag(uint64, yb_inbound_big_calls_parse_delay_ms, false,
4950
"rpc_throttle_threshold_bytes");
5051

5152
DECLARE_bool(rpc_dump_all_traces);
53+
DECLARE_bool(rpc_enable_crc);
5254
DECLARE_bool(ysql_yb_enable_ash);
5355
DECLARE_int32(print_trace_every);
5456
DECLARE_int32(rpc_slow_query_threshold_ms);
@@ -325,9 +327,19 @@ Status YBInboundCall::SerializeResponseBuffer(AnyMessageConstPtr response, bool
325327
ResponseHeader resp_hdr;
326328
resp_hdr.set_call_id(header_.call_id);
327329
resp_hdr.set_is_error(!is_success);
330+
auto use_crc = FLAGS_rpc_enable_crc;
331+
if (use_crc) {
332+
resp_hdr.set_crc(0);
333+
}
328334
sidecars_.MoveOffsetsTo(body_size, resp_hdr.mutable_sidecar_offsets());
329335

330-
response_buf_ = VERIFY_RESULT(SerializeRequest(body_size, sidecars_.size(), resp_hdr, response));
336+
size_t header_size;
337+
std::tie(response_buf_, header_size) = VERIFY_RESULT(SerializeResponse(
338+
body_size, sidecars_.size(), resp_hdr, response));
339+
if (use_crc) {
340+
StoreCrc(response_buf_, header_size, response_buf_.size() - header_size, &sidecars_);
341+
}
342+
331343
response_data_memory_usage_ = response_buf_.size();
332344

333345
return Status::OK();

src/yb/util/crc.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@
3636
#include "yb/gutil/once.h"
3737
#include "yb/util/debug/leakcheck_disabler.h"
3838

39-
namespace yb {
40-
namespace crc {
39+
namespace yb::crc {
4140

4241
using debug::ScopedLeakCheckDisabler;
4342

@@ -58,13 +57,12 @@ Crc* GetCrc32cInstance() {
5857
uint32_t Crc32c(const void* data, size_t length) {
5958
uint64_t crc32 = 0;
6059
GetCrc32cInstance()->Compute(data, length, &crc32);
61-
return static_cast<uint32_t>(crc32); // Only uses lower 32 bits.
60+
// Only uses lower 32 bits, since top 32 bits are always zero.
61+
return static_cast<uint32_t>(crc32);
6262
}
6363

64-
uint64_t Crc64c(const void* data, size_t length, uint64_t start) {
65-
GetCrc32cInstance()->Compute(data, length, &start);
66-
return start;
64+
void Crc32Accumulator::Feed(const void* data, size_t length) {
65+
GetCrc32cInstance()->Compute(data, length, &state_);
6766
}
6867

69-
} // namespace crc
70-
} // namespace yb
68+
} // namespace yb::crc

src/yb/util/crc.h

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@
3636

3737
#include <crcutil/interface.h>
3838

39-
namespace yb {
40-
namespace crc {
39+
#include "yb/util/slice.h"
40+
41+
namespace yb::crc {
4142

4243
typedef crcutil_interface::CRC Crc;
4344

@@ -47,7 +48,27 @@ Crc* GetCrc32cInstance();
4748
// Helper function to simply calculate a CRC32C of the given data.
4849
uint32_t Crc32c(const void* data, size_t length);
4950

50-
uint64_t Crc64c(const void* data, size_t length, uint64_t start);
51+
inline uint32_t Crc32c(Slice slice) {
52+
return Crc32c(slice.data(), slice.size());
53+
}
54+
55+
class Crc32Accumulator {
56+
public:
57+
explicit Crc32Accumulator(uint64_t state = 0) : state_(state) {}
58+
59+
void Feed(Slice slice) {
60+
Feed(slice.data(), slice.size());
61+
}
62+
63+
void Feed(const void* data, size_t length);
64+
65+
uint32_t result() const {
66+
return static_cast<uint32_t>(state_);
67+
}
68+
69+
private:
70+
// CRC32C has 64 bits state, but top 32 bits are always zero.
71+
uint64_t state_;
72+
};
5173

52-
} // namespace crc
53-
} // namespace yb
74+
} // namespace yb::crc

0 commit comments

Comments
 (0)