Skip to content

Commit 6c54b75

Browse files
authored
Add gRPC frame error handling (#19)
1 parent fd163f5 commit 6c54b75

File tree

7 files changed

+162
-27
lines changed

7 files changed

+162
-27
lines changed

src/include/grpc_transcoding/message_reader.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,22 @@ class MessageReader {
7272
// NextMessage() again.
7373
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream> NextMessage();
7474

75+
::google::protobuf::util::Status Status() const { return status_; }
76+
7577
// Returns true if the stream has ended (this is permanent); otherwise returns
7678
// false.
77-
bool Finished() const { return finished_; }
79+
bool Finished() const { return finished_ || !status_.ok(); }
7880

7981
private:
8082
TranscoderInputStream* in_;
8183
// The size of the current message.
82-
unsigned int current_message_size_;
84+
uint32_t current_message_size_;
8385
// Whether we have read the current message size or not
8486
bool have_current_message_size_;
8587
// Are we all done?
8688
bool finished_;
89+
// Status
90+
::google::protobuf::util::Status status_;
8791

8892
MessageReader(const MessageReader&) = delete;
8993
MessageReader& operator=(const MessageReader&) = delete;

src/include/grpc_transcoding/transcoder_input_stream.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ class TranscoderInputStream
2626
public:
2727
// returns the number of bytes available to read at the moment.
2828
virtual int64_t BytesAvailable() const = 0;
29+
30+
// returns if the stream is finished. i.e. No more data will be added.
31+
virtual bool Finished() const = 0;
2932
};
3033

3134
} // namespace transcoding

src/message_reader.cc

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,9 @@ bool ReadStream(pbio::ZeroCopyInputStream* stream, unsigned char* buffer,
6060
return true;
6161
}
6262

63-
// Determines whether the stream is finished or not.
64-
bool IsStreamFinished(pbio::ZeroCopyInputStream* stream) {
65-
int size = 0;
66-
const void* data = nullptr;
67-
if (!stream->Next(&data, &size)) {
68-
return true;
69-
} else {
70-
stream->BackUp(size);
71-
return false;
72-
}
73-
}
74-
7563
// A helper function to extract the size from a gRPC wire format message
7664
// delimiter - see http://www.grpc.io/docs/guides/wire.html.
77-
unsigned DelimiterToSize(const unsigned char* delimiter) {
65+
uint32_t DelimiterToSize(const unsigned char* delimiter) {
7866
unsigned size = 0;
7967
// Bytes 1-4 are big-endian 32-bit message size
8068
size = size | static_cast<unsigned>(delimiter[1]);
@@ -90,9 +78,9 @@ unsigned DelimiterToSize(const unsigned char* delimiter) {
9078
} // namespace
9179

9280
std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
93-
if (finished_) {
81+
if (Finished()) {
9482
// The stream has ended
95-
return std::unique_ptr<pbio::ZeroCopyInputStream>();
83+
return nullptr;
9684
}
9785

9886
// Check if we have the current message size. If not try to read it.
@@ -101,35 +89,52 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
10189
if (in_->BytesAvailable() < static_cast<pb::int64>(kDelimiterSize)) {
10290
// We don't have 5 bytes available to read the length of the message.
10391
// Find out whether the stream is finished and return false.
104-
finished_ = IsStreamFinished(in_);
105-
return std::unique_ptr<pbio::ZeroCopyInputStream>();
92+
finished_ = in_->Finished();
93+
if (finished_ && in_->BytesAvailable() != 0) {
94+
status_ = google::protobuf::util::Status(
95+
google::protobuf::util::error::INTERNAL,
96+
"Incomplete gRPC frame header received");
97+
}
98+
return nullptr;
10699
}
107100

108101
// Try to read the delimiter
109102
unsigned char delimiter[kDelimiterSize] = {0};
110103
if (!ReadStream(in_, delimiter, sizeof(delimiter))) {
111104
finished_ = true;
112-
return std::unique_ptr<pbio::ZeroCopyInputStream>();
105+
return nullptr;
106+
}
107+
108+
if (delimiter[0] != 0) {
109+
status_ = google::protobuf::util::Status(
110+
google::protobuf::util::error::INTERNAL,
111+
"Unsupported gRPC frame flag: " + std::to_string(delimiter[0]));
112+
return nullptr;
113113
}
114114

115115
current_message_size_ = DelimiterToSize(delimiter);
116116
have_current_message_size_ = true;
117117
}
118118

119119
if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
120+
if (in_->Finished()) {
121+
status_ = google::protobuf::util::Status(
122+
google::protobuf::util::error::INTERNAL,
123+
"Incomplete gRPC frame expected size: " +
124+
std::to_string(current_message_size_) + " actual size: " +
125+
std::to_string(in_->BytesAvailable()));
126+
}
120127
// We don't have a full message
121-
return std::unique_ptr<pbio::ZeroCopyInputStream>();
128+
return nullptr;
122129
}
123130

124-
// We have a message! Use LimitingInputStream to wrap the input stream and
125-
// limit it to current_message_size_ bytes to cover only the current message.
126-
auto result = std::unique_ptr<pbio::ZeroCopyInputStream>(
127-
new pbio::LimitingInputStream(in_, current_message_size_));
128-
129131
// Reset the have_current_message_size_ for the next message
130132
have_current_message_size_ = false;
131133

132-
return result;
134+
// We have a message! Use LimitingInputStream to wrap the input stream and
135+
// limit it to current_message_size_ bytes to cover only the current message.
136+
return std::unique_ptr<pbio::ZeroCopyInputStream>(
137+
new pbio::LimitingInputStream(in_, current_message_size_));
133138
}
134139

135140
} // namespace transcoding

src/message_stream.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class InputStreamOverMessageStream : public TranscoderInputStream {
8282
return static_cast<int64_t>(message_.size() - position_);
8383
}
8484

85+
bool Finished() const { return src_->Finished(); }
86+
8587
private:
8688
// Updates the current message and creates an ArrayInputStream over it.
8789
void ReadNextMessage() {

src/response_to_json_translator.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,14 @@ bool ResponseToJsonTranslator::NextMessage(std::string* message) {
4444
// All done
4545
return false;
4646
}
47+
4748
// Try to read a message
4849
auto proto_in = reader_.NextMessage();
50+
status_ = reader_.Status();
51+
if (!status_.ok()) {
52+
return false;
53+
}
54+
4955
if (proto_in) {
5056
std::string json_out;
5157
if (TranslateMessage(proto_in.get(), &json_out)) {

test/message_reader_test.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class MessageReaderTestRun {
9595
}
9696
// Read the message
9797
auto stream = reader_->NextMessage();
98+
EXPECT_TRUE(reader_->Status().ok());
9899
if (!stream) {
99100
ADD_FAILURE() << "No message available" << std::endl;
100101
return false;
@@ -296,13 +297,15 @@ TEST_F(MessageReaderTest, DirectTest) {
296297
// Nothing yet
297298
EXPECT_EQ(nullptr, reader.NextMessage().get());
298299
EXPECT_FALSE(reader.Finished());
300+
EXPECT_TRUE(reader.Status().ok());
299301

300302
// Add the delimiter for the first message
301303
input_stream.AddChunk(SizeToDelimiter(message1.size()));
302304

303305
// Still nothing
304306
EXPECT_EQ(nullptr, reader.NextMessage().get());
305307
EXPECT_FALSE(reader.Finished());
308+
EXPECT_TRUE(reader.Status().ok());
306309

307310
// Add the message itself
308311
input_stream.AddChunk(message1);
@@ -313,6 +316,7 @@ TEST_F(MessageReaderTest, DirectTest) {
313316
EXPECT_EQ(message1, ReadAllFromStream(message1_stream.get()));
314317
EXPECT_EQ(nullptr, reader.NextMessage().get());
315318
EXPECT_FALSE(reader.Finished());
319+
EXPECT_TRUE(reader.Status().ok());
316320

317321
// Add part of the message2
318322
input_stream.AddChunk(SizeToDelimiter(message2.size()));
@@ -321,6 +325,7 @@ TEST_F(MessageReaderTest, DirectTest) {
321325
// No message should be available
322326
EXPECT_EQ(nullptr, reader.NextMessage().get());
323327
EXPECT_FALSE(reader.Finished());
328+
EXPECT_TRUE(reader.Status().ok());
324329

325330
// Add the rest of the second message
326331
input_stream.AddChunk(message2.substr(10));
@@ -331,6 +336,7 @@ TEST_F(MessageReaderTest, DirectTest) {
331336
EXPECT_EQ(message2, ReadAllFromStream(message2_stream.get()));
332337
EXPECT_EQ(nullptr, reader.NextMessage().get());
333338
EXPECT_FALSE(reader.Finished());
339+
EXPECT_TRUE(reader.Status().ok());
334340

335341
// Adding both message3 & message4 in one shot and Finish the stream
336342
input_stream.AddChunk(SizeToDelimiter(message3.size()));
@@ -341,6 +347,7 @@ TEST_F(MessageReaderTest, DirectTest) {
341347

342348
// Not finished yet as we still have messages to read
343349
EXPECT_FALSE(reader.Finished());
350+
EXPECT_TRUE(reader.Status().ok());
344351

345352
// Now both message3 & message4 must be available
346353
auto message3_stream = reader.NextMessage();
@@ -355,6 +362,45 @@ TEST_F(MessageReaderTest, DirectTest) {
355362
// All done, the reader must be finished
356363
EXPECT_EQ(nullptr, reader.NextMessage().get());
357364
EXPECT_TRUE(reader.Finished());
365+
EXPECT_TRUE(reader.Status().ok());
366+
}
367+
368+
TEST_F(MessageReaderTest, IncompleteFrameHeader) {
369+
TestZeroCopyInputStream input_stream;
370+
MessageReader reader(&input_stream);
371+
372+
input_stream.AddChunk("\x0a");
373+
input_stream.Finish();
374+
375+
EXPECT_EQ(nullptr, reader.NextMessage().get());
376+
EXPECT_FALSE(reader.Status().ok());
377+
EXPECT_EQ(reader.Status().error_message(),
378+
"Incomplete gRPC frame header received");
379+
}
380+
381+
TEST_F(MessageReaderTest, InvalidFrameFlag) {
382+
TestZeroCopyInputStream input_stream;
383+
MessageReader reader(&input_stream);
384+
385+
input_stream.AddChunk(std::string("\x0A\x00\x00\x00\x00", 5));
386+
input_stream.Finish();
387+
388+
EXPECT_EQ(nullptr, reader.NextMessage().get());
389+
EXPECT_FALSE(reader.Status().ok());
390+
EXPECT_EQ(reader.Status().error_message(), "Unsupported gRPC frame flag: 10");
391+
}
392+
393+
TEST_F(MessageReaderTest, IncompleteFrame) {
394+
TestZeroCopyInputStream input_stream;
395+
MessageReader reader(&input_stream);
396+
397+
input_stream.AddChunk(std::string("\x00\x00\x00\x00\x05\x00", 6));
398+
input_stream.Finish();
399+
400+
EXPECT_EQ(nullptr, reader.NextMessage().get());
401+
EXPECT_FALSE(reader.Status().ok());
402+
EXPECT_EQ(reader.Status().error_message(),
403+
"Incomplete gRPC frame expected size: 5 actual size: 1");
358404
}
359405

360406
} // namespace

test/response_to_json_translator_test.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,75 @@ TEST_F(ResponseToJsonTranslatorTest, Streaming5KMessages) {
825825
EXPECT_TRUE(ExpectJsonArrayEq(expected_json_array, actual_json_array));
826826
}
827827

828+
TEST_F(ResponseToJsonTranslatorTest, IncompleteFrameHeader) {
829+
// Load the service config
830+
::google::api::Service service;
831+
ASSERT_TRUE(
832+
transcoding::testing::LoadService("bookstore_service.pb.txt", &service));
833+
834+
// Create a TypeHelper using the service config
835+
TypeHelper type_helper(service.types(), service.enums());
836+
837+
TestZeroCopyInputStream input_stream;
838+
ResponseToJsonTranslator translator(
839+
type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream);
840+
841+
input_stream.AddChunk(std::string("\x0A", 1));
842+
input_stream.Finish();
843+
844+
std::string actual;
845+
EXPECT_FALSE(translator.NextMessage(&actual));
846+
EXPECT_FALSE(translator.Status().ok());
847+
EXPECT_EQ(translator.Status().error_message(),
848+
"Incomplete gRPC frame header received");
849+
}
850+
851+
TEST_F(ResponseToJsonTranslatorTest, InvalidFrameFlag) {
852+
// Load the service config
853+
::google::api::Service service;
854+
ASSERT_TRUE(
855+
transcoding::testing::LoadService("bookstore_service.pb.txt", &service));
856+
857+
// Create a TypeHelper using the service config
858+
TypeHelper type_helper(service.types(), service.enums());
859+
860+
TestZeroCopyInputStream input_stream;
861+
ResponseToJsonTranslator translator(
862+
type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream);
863+
864+
input_stream.AddChunk(std::string("\x0A\x00\x00\x00\x00", 5));
865+
input_stream.Finish();
866+
867+
std::string actual;
868+
EXPECT_FALSE(translator.NextMessage(&actual));
869+
EXPECT_FALSE(translator.Status().ok());
870+
EXPECT_EQ(translator.Status().error_message(),
871+
"Unsupported gRPC frame flag: 10");
872+
}
873+
874+
TEST_F(ResponseToJsonTranslatorTest, IncompleteFrame) {
875+
// Load the service config
876+
::google::api::Service service;
877+
ASSERT_TRUE(
878+
transcoding::testing::LoadService("bookstore_service.pb.txt", &service));
879+
880+
// Create a TypeHelper using the service config
881+
TypeHelper type_helper(service.types(), service.enums());
882+
883+
TestZeroCopyInputStream input_stream;
884+
ResponseToJsonTranslator translator(
885+
type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream);
886+
887+
input_stream.AddChunk(std::string("\x00\x00\x00\x00\x05\x00", 6));
888+
input_stream.Finish();
889+
890+
std::string actual;
891+
EXPECT_FALSE(translator.NextMessage(&actual));
892+
EXPECT_FALSE(translator.Status().ok());
893+
EXPECT_EQ(translator.Status().error_message(),
894+
"Incomplete gRPC frame expected size: 5 actual size: 1");
895+
}
896+
828897
} // namespace
829898
} // namespace testing
830899
} // namespace transcoding

0 commit comments

Comments
 (0)