Skip to content

Commit 575d64b

Browse files
authored
Use TranscoderInputStream to reduce confusion around ByteCount() (#225)
* Add TranscoderInputStream to reduce confusion * fix_format
1 parent 272ff02 commit 575d64b

15 files changed

+125
-97
lines changed

BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ cc_library(
6767
"message_stream.h",
6868
],
6969
deps = [
70+
":transcoder_input_stream",
7071
"//external:protobuf",
7172
],
7273
)
@@ -125,6 +126,7 @@ cc_library(
125126
"message_reader.h",
126127
],
127128
deps = [
129+
":transcoder_input_stream",
128130
"//external:protobuf",
129131
],
130132
)
@@ -144,6 +146,17 @@ cc_library(
144146
],
145147
)
146148

149+
cc_library(
150+
name = "transcoder_input_stream",
151+
srcs = [
152+
"transcoder_input_stream.h",
153+
],
154+
visibility = ["//visibility:public"],
155+
deps = [
156+
"@protobuf_git//:protobuf",
157+
],
158+
)
159+
147160
cc_library(
148161
name = "transcoding",
149162
srcs = [
@@ -223,6 +236,7 @@ cc_library(
223236
srcs = ["test_common.cc"],
224237
hdrs = ["test_common.h"],
225238
deps = [
239+
":transcoder_input_stream",
226240
"//external:googletest",
227241
"//external:protobuf",
228242
"//external:service_config",

message_reader.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
#include <memory>
2020

21-
#include "google/protobuf/io/zero_copy_stream.h"
2221
#include "google/protobuf/io/zero_copy_stream_impl.h"
2322

2423
namespace google {
@@ -29,7 +28,7 @@ namespace transcoding {
2928
namespace pb = ::google::protobuf;
3029
namespace pbio = ::google::protobuf::io;
3130

32-
MessageReader::MessageReader(pbio::ZeroCopyInputStream* in)
31+
MessageReader::MessageReader(TranscoderInputStream* in)
3332
: in_(in),
3433
current_message_size_(0),
3534
have_current_message_size_(false),
@@ -99,7 +98,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
9998
// Check if we have the current message size. If not try to read it.
10099
if (!have_current_message_size_) {
101100
const size_t kDelimiterSize = 5;
102-
if (in_->ByteCount() < static_cast<pb::int64>(kDelimiterSize)) {
101+
if (in_->BytesAvailable() < static_cast<pb::int64>(kDelimiterSize)) {
103102
// We don't have 5 bytes available to read the length of the message.
104103
// Find out whether the stream is finished and return false.
105104
finished_ = IsStreamFinished(in_);
@@ -117,10 +116,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
117116
have_current_message_size_ = true;
118117
}
119118

120-
// We interpret ZeroCopyInputStream::ByteCount() as the number of bytes
121-
// available for reading at the moment. Check if we have the full message
122-
// available to read.
123-
if (in_->ByteCount() < static_cast<pb::int64>(current_message_size_)) {
119+
if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
124120
// We don't have a full message
125121
return std::unique_ptr<pbio::ZeroCopyInputStream>();
126122
}

message_reader.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
#include <memory>
1919

20-
#include "google/protobuf/io/zero_copy_stream.h"
20+
#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
2121
#include "google/protobuf/stubs/status.h"
2222

2323
namespace google {
@@ -49,11 +49,6 @@ namespace transcoding {
4949
// }
5050
// }
5151
//
52-
// NOTE: MesssageReader assumes that ZeroCopyInputStream::ByteCount() returns
53-
// the number of bytes available to read at the moment. That's what
54-
// MessageReader uses to determine whether there is a complete message
55-
// available or not.
56-
//
5752
// NOTE: MessageReader is unable to recognize the case when there is an
5853
// incomplete message at the end of the input. The callers will need to
5954
// detect it and act appropriately.
@@ -64,7 +59,7 @@ namespace transcoding {
6459
//
6560
class MessageReader {
6661
public:
67-
MessageReader(::google::protobuf::io::ZeroCopyInputStream* in);
62+
MessageReader(TranscoderInputStream* in);
6863

6964
// If a full message is available, NextMessage() returns a ZeroCopyInputStream
7065
// over the message. Otherwise returns nullptr - this might be temporary, the
@@ -82,7 +77,7 @@ class MessageReader {
8277
bool Finished() const { return finished_; }
8378

8479
private:
85-
::google::protobuf::io::ZeroCopyInputStream* in_;
80+
TranscoderInputStream* in_;
8681
// The size of the current message.
8782
unsigned int current_message_size_;
8883
// Whether we have read the current message size or not

message_stream.cc

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include <memory>
2020
#include <string>
2121

22-
#include "google/protobuf/io/zero_copy_stream.h"
2322
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
2423

2524
namespace google {
@@ -32,12 +31,12 @@ namespace pbio = ::google::protobuf::io;
3231
namespace {
3332

3433
// a ZeroCopyInputStream implementation over a MessageStream implementation
35-
class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {
34+
class InputStreamOverMessageStream : public TranscoderInputStream {
3635
public:
37-
// src - the underlying MessageStream. ZeroCopyStreamOverMessageStream doesn't
36+
// src - the underlying MessageStream. InputStreamOverMessageStream doesn't
3837
// maintain the ownership of src, the caller must make sure it exists
39-
// throughtout the lifetime of ZeroCopyStreamOverMessageStream.
40-
ZeroCopyStreamOverMessageStream(MessageStream* src)
38+
// throughtout the lifetime of InputStreamOverMessageStream.
39+
InputStreamOverMessageStream(MessageStream* src)
4140
: src_(src), message_(), position_(0) {}
4241

4342
// ZeroCopyInputStream implementation
@@ -72,19 +71,15 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {
7271

7372
bool Skip(int) { return false; } // Not implemented (no need)
7473

75-
::google::protobuf::int64 ByteCount() const {
76-
// NOTE: we are changing the ByteCount() interpretation. In our case
77-
// ByteCount() returns the number of bytes available for reading at this
78-
// moment. In the original interpretation it is supposed to be the number
79-
// of bytes read so far.
80-
// We need this such that the consumers are able to read the gRPC delimited
81-
// message stream only if there is a full message available.
74+
google::protobuf::int64 ByteCount() const { return 0; } // Not implemented
75+
76+
int64_t BytesAvailable() const {
8277
if (position_ >= message_.size()) {
8378
// If the current message is all done, try to read the next message
8479
// to make sure we return the correct byte count.
85-
const_cast<ZeroCopyStreamOverMessageStream*>(this)->ReadNextMessage();
80+
const_cast<InputStreamOverMessageStream*>(this)->ReadNextMessage();
8681
}
87-
return static_cast<::google::protobuf::int64>(message_.size() - position_);
82+
return static_cast<int64_t>(message_.size() - position_);
8883
}
8984

9085
private:
@@ -109,10 +104,9 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {
109104

110105
} // namespace
111106

112-
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
113-
MessageStream::CreateZeroCopyInputStream() {
114-
return std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>(
115-
new ZeroCopyStreamOverMessageStream(this));
107+
std::unique_ptr<TranscoderInputStream> MessageStream::CreateInputStream() {
108+
return std::unique_ptr<TranscoderInputStream>(
109+
new InputStreamOverMessageStream(this));
116110
}
117111

118112
} // namespace transcoding

message_stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <memory>
1919
#include <string>
2020

21+
#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
2122
#include "google/protobuf/io/zero_copy_stream.h"
2223
#include "google/protobuf/stubs/status.h"
2324

@@ -73,8 +74,7 @@ class MessageStream {
7374
// Virtual destructor
7475
virtual ~MessageStream() {}
7576
// Creates ZeroCopyInputStream implementation based on this stream
76-
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
77-
CreateZeroCopyInputStream();
77+
std::unique_ptr<TranscoderInputStream> CreateInputStream();
7878
};
7979

8080
} // namespace transcoding

message_stream_test.cc

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
7070

7171
bool Test(const Messages& messages) {
7272
TestMessageStream test_message_stream;
73-
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
73+
auto input_stream = test_message_stream.CreateInputStream();
7474

7575
const void* data = nullptr;
7676
int size = 0;
7777

7878
// Check that Next() returns true and a 0-sized buffer meaning that
7979
// nothing is available at the moment.
80-
if (!zero_copy_stream->Next(&data, &size)) {
80+
if (!input_stream->Next(&data, &size)) {
8181
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
8282
return false;
8383
}
@@ -91,13 +91,13 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
9191
test_message_stream.AddMessage(message);
9292

9393
// message.size() bytes must be available for reading
94-
if (static_cast<int>(message.size()) != zero_copy_stream->ByteCount()) {
95-
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
94+
if (static_cast<int>(message.size()) != input_stream->BytesAvailable()) {
95+
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
9696
return false;
9797
}
9898

9999
// Now try to read & match the message
100-
if (!zero_copy_stream->Next(&data, &size)) {
100+
if (!input_stream->Next(&data, &size)) {
101101
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
102102
return false;
103103
}
@@ -120,16 +120,16 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
120120
// Not a valid test case
121121
continue;
122122
}
123-
zero_copy_stream->BackUp(backup_size);
123+
input_stream->BackUp(backup_size);
124124

125125
// backup_size bytes must be available for reading again
126-
if (static_cast<int>(backup_size) != zero_copy_stream->ByteCount()) {
127-
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
126+
if (static_cast<int>(backup_size) != input_stream->BytesAvailable()) {
127+
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
128128
return false;
129129
}
130130

131131
// Now Next() must return the backed up data again.
132-
if (!zero_copy_stream->Next(&data, &size)) {
132+
if (!input_stream->Next(&data, &size)) {
133133
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
134134
return false;
135135
}
@@ -143,7 +143,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
143143
}
144144

145145
// At this point no data should be available
146-
if (!zero_copy_stream->Next(&data, &size)) {
146+
if (!input_stream->Next(&data, &size)) {
147147
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
148148
return false;
149149
}
@@ -156,7 +156,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
156156
// Now finish the MessageStream & make sure the ZeroCopyInputStream has
157157
// ended.
158158
test_message_stream.Finish();
159-
if (zero_copy_stream->Next(&data, &size)) {
159+
if (input_stream->Next(&data, &size)) {
160160
ADD_FAILURE() << "The stream still hasn't finished" << std::endl;
161161
return false;
162162
}
@@ -201,14 +201,14 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DifferenteSizesOneStream) {
201201

202202
TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
203203
TestMessageStream test_message_stream;
204-
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
204+
auto input_stream = test_message_stream.CreateInputStream();
205205

206206
const void* data = nullptr;
207207
int size = 0;
208208

209209
// Check that Next() returns true and a 0-sized buffer meaning that
210210
// nothing is available at the moment.
211-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
211+
EXPECT_TRUE(input_stream->Next(&data, &size));
212212
EXPECT_EQ(0, size);
213213

214214
// Test messages
@@ -221,37 +221,37 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
221221
test_message_stream.AddMessage(message1);
222222

223223
// message1 is available for reading
224-
EXPECT_EQ(message1.size(), zero_copy_stream->ByteCount());
225-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
224+
EXPECT_EQ(message1.size(), input_stream->BytesAvailable());
225+
EXPECT_TRUE(input_stream->Next(&data, &size));
226226
EXPECT_EQ(message1, std::string(reinterpret_cast<const char*>(data), size));
227227

228228
// Back up a bit
229-
zero_copy_stream->BackUp(5);
229+
input_stream->BackUp(5);
230230

231231
// Now read the backed up data again
232-
EXPECT_EQ(5, zero_copy_stream->ByteCount());
233-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
232+
EXPECT_EQ(5, input_stream->BytesAvailable());
233+
EXPECT_TRUE(input_stream->Next(&data, &size));
234234
EXPECT_EQ(message1.substr(message1.size() - 5),
235235
std::string(reinterpret_cast<const char*>(data), size));
236236

237237
// Add message2 to the MessageStream
238238
test_message_stream.AddMessage(message2);
239239

240240
// message2 is available for reading
241-
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
242-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
241+
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
242+
EXPECT_TRUE(input_stream->Next(&data, &size));
243243
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));
244244

245245
// Back up all of message2
246-
zero_copy_stream->BackUp(message2.size());
246+
input_stream->BackUp(message2.size());
247247

248248
// Now read message2 again
249-
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
250-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
249+
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
250+
EXPECT_TRUE(input_stream->Next(&data, &size));
251251
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));
252252

253253
// At this point no data should be available
254-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
254+
EXPECT_TRUE(input_stream->Next(&data, &size));
255255
EXPECT_EQ(0, size);
256256

257257
// Add both message3 & message4 & finish the MessageStream afterwards
@@ -260,16 +260,16 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
260260
test_message_stream.Finish();
261261

262262
// Read & match both message3 & message4
263-
EXPECT_EQ(message3.size(), zero_copy_stream->ByteCount());
264-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
263+
EXPECT_EQ(message3.size(), input_stream->BytesAvailable());
264+
EXPECT_TRUE(input_stream->Next(&data, &size));
265265
EXPECT_EQ(message3, std::string(reinterpret_cast<const char*>(data), size));
266266

267-
EXPECT_EQ(message4.size(), zero_copy_stream->ByteCount());
268-
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
267+
EXPECT_EQ(message4.size(), input_stream->BytesAvailable());
268+
EXPECT_TRUE(input_stream->Next(&data, &size));
269269
EXPECT_EQ(message4, std::string(reinterpret_cast<const char*>(data), size));
270270

271271
// All done!
272-
EXPECT_FALSE(zero_copy_stream->Next(&data, &size));
272+
EXPECT_FALSE(input_stream->Next(&data, &size));
273273
}
274274

275275
} // namespace

response_to_json_translator.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
#include <string>
2020

21-
#include "google/protobuf/io/zero_copy_stream.h"
2221
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
2322
#include "google/protobuf/stubs/status.h"
2423
#include "google/protobuf/util/json_util.h"
@@ -31,7 +30,7 @@ namespace transcoding {
3130

3231
ResponseToJsonTranslator::ResponseToJsonTranslator(
3332
::google::protobuf::util::TypeResolver* type_resolver, std::string type_url,
34-
bool streaming, ::google::protobuf::io::ZeroCopyInputStream* in)
33+
bool streaming, TranscoderInputStream* in)
3534
: type_resolver_(type_resolver),
3635
type_url_(std::move(type_url)),
3736
streaming_(streaming),

response_to_json_translator.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ class ResponseToJsonTranslator : public MessageStream {
6666
// format (http://www.grpc.io/docs/guides/wire.html)
6767
ResponseToJsonTranslator(
6868
::google::protobuf::util::TypeResolver* type_resolver,
69-
std::string type_url, bool streaming,
70-
::google::protobuf::io::ZeroCopyInputStream* in);
69+
std::string type_url, bool streaming, TranscoderInputStream* in);
7170

7271
// MessageStream implementation
7372
bool NextMessage(std::string* message);

test_common.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void TestZeroCopyInputStream::BackUp(int count) {
8181
position_ -= count;
8282
}
8383

84-
pb::int64 TestZeroCopyInputStream::ByteCount() const {
84+
int64_t TestZeroCopyInputStream::BytesAvailable() const {
8585
auto total = current_.size() - position_;
8686
for (auto chunk : chunks_) {
8787
total += chunk.size();

0 commit comments

Comments
 (0)