Skip to content

Commit 63e7760

Browse files
author
ivan-skryabin
committed
feat grpc: enforce move semantics in stream Write
Removed lvalue Write overloads to prevent unsafe usage patterns where modified objects could be reused after writes. This addresses potential bugs when middlewares mutate request objects commit_hash:54cf13e1732c28afb24282355e81e38ee1ff54be
1 parent 670199e commit 63e7760

File tree

18 files changed

+122
-126
lines changed

18 files changed

+122
-126
lines changed

grpc/benchmarks/base.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
4444
sample::ugrpc::StreamGreetingRequest&& request,
4545
ReadManyWriter& writer
4646
) override {
47-
sample::ugrpc::StreamGreetingResponse response;
48-
response.set_name("Hello again " + request.name());
47+
const std::string response_name = "Hello again " + request.name();
4948
for (int i = 0; i < request.number(); ++i) {
49+
sample::ugrpc::StreamGreetingResponse response;
50+
response.set_name(response_name);
5051
response.set_number(i);
51-
writer.Write(response);
52+
writer.Write(std::move(response));
5253
}
5354
return grpc::Status::OK;
5455
}
@@ -67,13 +68,13 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
6768

6869
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
6970
sample::ugrpc::StreamGreetingRequest request;
70-
sample::ugrpc::StreamGreetingResponse response;
7171
int count = 0;
7272
while (stream.Read(request)) {
7373
++count;
74+
sample::ugrpc::StreamGreetingResponse response;
7475
response.set_number(count);
7576
response.set_name("Hello " + request.name());
76-
stream.Write(response);
77+
stream.Write(std::move(response));
7778
}
7879
return grpc::Status::OK;
7980
}

grpc/functional_tests/basic_chaos/service.hpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ GreeterServiceComponent::SayHelloResponseStreamResult GreeterServiceComponent::S
7979
SayHelloResponseStreamWriter& writer
8080
) {
8181
std::string message = fmt::format("{}, {}", prefix_, request.name());
82-
api::GreetingResponse response;
8382
constexpr auto kCountSend = 5;
8483
constexpr std::chrono::milliseconds kTimeInterval{200};
8584
for (auto i = 0; i < kCountSend; ++i) {
85+
api::GreetingResponse response;
8686
message.push_back('!');
8787
response.set_greeting(grpc::string(message));
8888
engine::SleepFor(kTimeInterval);
89-
writer.Write(response);
89+
writer.Write(std::move(response));
9090
}
9191
return grpc::Status::OK;
9292
}
@@ -112,12 +112,12 @@ GreeterServiceComponent::SayHelloStreamsResult GreeterServiceComponent::SayHello
112112
constexpr std::chrono::milliseconds kTimeInterval{200};
113113
std::string income_message;
114114
api::GreetingRequest request;
115-
api::GreetingResponse response;
116115
while (stream.Read(request)) {
116+
api::GreetingResponse response;
117117
income_message.append(request.name());
118118
response.set_greeting(fmt::format("{}, {}", prefix_, income_message));
119119
engine::SleepFor(kTimeInterval);
120-
stream.Write(response);
120+
stream.Write(std::move(response));
121121
}
122122
return grpc::Status::OK;
123123
}
@@ -139,12 +139,12 @@ GreeterServiceComponent::SayHelloIndependentStreamsResult GreeterServiceComponen
139139
});
140140

141141
auto write_task = engine::AsyncNoSpan([&stream, prefix = prefix_, &kTimeIntervalWrite] {
142-
api::GreetingResponse response;
143142
const std::array names =
144143
{"Python", "C++", "linux", "userver", "grpc", "kernel", "developer", "core", "anonymous", "user"};
145144
for (const auto& name : names) {
145+
api::GreetingResponse response;
146146
response.set_greeting(fmt::format("{}, {}", prefix, name));
147-
stream.Write(response);
147+
stream.Write(std::move(response));
148148
engine::SleepFor(kTimeIntervalWrite);
149149
}
150150
});
@@ -154,7 +154,7 @@ GreeterServiceComponent::SayHelloIndependentStreamsResult GreeterServiceComponen
154154

155155
api::GreetingResponse response;
156156
response.set_greeting(grpc::string(final_string));
157-
stream.Write(response);
157+
stream.Write(std::move(response));
158158
return grpc::Status::OK;
159159
}
160160

grpc/functional_tests/error_status/src/error_status_server.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ ErrorStatusServiceComponent::ReturnStreamErrorStatusResult ErrorStatusServiceCom
4343
api::ErrorStatusRequest&& request,
4444
ReturnStreamErrorStatusWriter& writer
4545
) {
46-
api::ErrorStatusResponse response;
4746
for (int i = 0; i < kCountSend; ++i) {
48-
writer.Write(response);
47+
writer.Write(api::ErrorStatusResponse{});
4948
}
5049

5150
const grpc::StatusCode request_status_code = ugrpc::StatusCodeFromString(request.status_code());
@@ -66,9 +65,8 @@ ErrorStatusServiceComponent::ThrowStreamErrorWithStatusException(
6665
api::ErrorStatusRequest&& request,
6766
ThrowStreamErrorWithStatusExceptionWriter& writer
6867
) {
69-
api::ErrorStatusResponse response;
7068
for (int i = 0; i < kCountSend; ++i) {
71-
writer.Write(response);
69+
writer.Write(api::ErrorStatusResponse{});
7270
}
7371

7472
const grpc::StatusCode request_status_code = ugrpc::StatusCodeFromString(request.status_code());

grpc/functional_tests/middleware_server/src/service.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ GreeterServiceComponent::SayHelloResponseStreamResult GreeterServiceComponent::S
2525
SayHelloResponseStreamWriter& writer
2626
) {
2727
std::string message = fmt::format("{}, {}", "Hello", request.name());
28-
samples::api::GreetingResponse response;
2928
constexpr auto kCountSend = 5;
3029
constexpr std::chrono::milliseconds kTimeInterval{200};
3130
for (auto i = 0; i < kCountSend; ++i) {
31+
samples::api::GreetingResponse response;
3232
message.push_back('!');
3333
response.set_greeting(grpc::string(message));
3434
engine::SleepFor(kTimeInterval);
35-
writer.Write(response);
35+
writer.Write(std::move(response));
3636
}
3737
return grpc::Status::OK;
3838
}
@@ -58,12 +58,12 @@ GreeterServiceComponent::SayHelloStreamsResult GreeterServiceComponent::SayHello
5858
constexpr std::chrono::milliseconds kTimeInterval{200};
5959
std::string income_message;
6060
samples::api::GreetingRequest request;
61-
samples::api::GreetingResponse response;
6261
while (stream.Read(request)) {
62+
samples::api::GreetingResponse response;
6363
income_message.append(request.name());
6464
response.set_greeting(fmt::format("{}, {}", "Hello", income_message));
6565
engine::SleepFor(kTimeInterval);
66-
stream.Write(response);
66+
stream.Write(std::move(response));
6767
}
6868
return grpc::Status::OK;
6969
}

grpc/include/userver/ugrpc/server/impl/stream_adapter.hpp

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,10 @@ class ReaderAdapter : public Reader<typename CallTraits::Request> {
2525
template <typename CallTraits>
2626
class WriterAdapter : public Writer<typename CallTraits::Response> {
2727
public:
28-
void Write(typename CallTraits::Response& response) final { Write(response, grpc::WriteOptions{}); }
29-
30-
void Write(typename CallTraits::Response& response, const grpc::WriteOptions& options) override {
31-
static_cast<Responder<CallTraits>&>(*this).DoWrite(response, options);
32-
}
33-
34-
void Write(typename CallTraits::Response&& response) final { Write(response, grpc::WriteOptions{}); }
28+
void Write(typename CallTraits::Response&& response) final { Write(std::move(response), grpc::WriteOptions{}); }
3529

3630
void Write(typename CallTraits::Response&& response, const grpc::WriteOptions& options) override {
37-
Write(response, options);
31+
static_cast<Responder<CallTraits>&>(*this).DoWrite(response, options);
3832
}
3933
};
4034

@@ -45,16 +39,10 @@ class ReaderWriterAdapter : public ReaderWriter<typename CallTraits::Request, ty
4539
return static_cast<Responder<CallTraits>&>(*this).DoRead(request);
4640
}
4741

48-
void Write(typename CallTraits::Response& response) final { Write(response, grpc::WriteOptions{}); }
49-
50-
void Write(typename CallTraits::Response& response, const grpc::WriteOptions& options) override {
51-
static_cast<Responder<CallTraits>&>(*this).DoWrite(response, options);
52-
}
53-
54-
void Write(typename CallTraits::Response&& response) final { Write(response, grpc::WriteOptions{}); }
42+
void Write(typename CallTraits::Response&& response) final { Write(std::move(response), grpc::WriteOptions{}); }
5543

5644
void Write(typename CallTraits::Response&& response, const grpc::WriteOptions& options) override {
57-
Write(response, options);
45+
static_cast<Responder<CallTraits>&>(*this).DoWrite(response, options);
5846
}
5947
};
6048

grpc/include/userver/ugrpc/server/stream.hpp

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,23 @@ class Writer {
4444
/// @param response the next message to write
4545
/// @throws ugrpc::server::RpcError on an RPC error
4646
/// @throws std::exception (internal) on error from middlewares
47-
virtual void Write(Response& response) = 0;
48-
49-
/// @brief Write the next outgoing message
50-
/// @param response the next message to write
51-
/// @param options the write options
52-
/// @throws ugrpc::server::RpcError on an RPC error
53-
/// @throws std::exception (internal) on error from middlewares
54-
virtual void Write(Response& response, const grpc::WriteOptions& options) = 0;
55-
56-
/// @brief Write the next outgoing message
57-
/// @param response the next message to write
58-
/// @throws ugrpc::server::RpcError on an RPC error
59-
/// @throws std::exception (internal) on error from middlewares
47+
///
48+
/// This method uses move-only semantics for safety:
49+
/// - Middlewares may modify responses during writes, making reuse dangerous
50+
/// - After the call, `response` is in moved-from state and must not be reused
51+
/// - Explicit std::move ensures ownership transfer and prevents accidental reuse
6052
virtual void Write(Response&& response) = 0;
6153

6254
/// @brief Write the next outgoing message
6355
/// @param response the next message to write
6456
/// @param options the write options
6557
/// @throws ugrpc::server::RpcError on an RPC error
6658
/// @throws std::exception (internal) on error from middlewares
59+
///
60+
/// This method uses move-only semantics for safety:
61+
/// - Middlewares may modify responses during writes, making reuse dangerous
62+
/// - After the call, `response` is in moved-from state and must not be reused
63+
/// - Explicit std::move ensures ownership transfer and prevents accidental reuse
6764
virtual void Write(Response&& response, const grpc::WriteOptions& options) = 0;
6865
};
6966

grpc/tests/base_test.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,12 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
6767
ReadManyWriter& writer
6868
) override {
6969
CheckServerContext(context.GetServerContext());
70-
sample::ugrpc::StreamGreetingResponse response;
71-
response.set_name("Hello again " + request.name());
70+
const std::string response_name = "Hello again " + request.name();
7271
for (int i = 0; i < request.number(); ++i) {
72+
sample::ugrpc::StreamGreetingResponse response;
73+
response.set_name(response_name);
7374
response.set_number(i);
74-
writer.Write(response);
75+
writer.Write(std::move(response));
7576
}
7677
return grpc::Status::OK;
7778
}
@@ -92,13 +93,13 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
9293
ChatResult Chat(CallContext& context, ChatReaderWriter& stream) override {
9394
CheckServerContext(context.GetServerContext());
9495
sample::ugrpc::StreamGreetingRequest request;
95-
sample::ugrpc::StreamGreetingResponse response;
9696
int count = 0;
9797
while (stream.Read(request)) {
98+
sample::ugrpc::StreamGreetingResponse response;
9899
++count;
99100
response.set_number(count);
100101
response.set_name("Hello " + request.name());
101-
stream.Write(response);
102+
stream.Write(std::move(response));
102103
}
103104
return grpc::Status::OK;
104105
}

grpc/tests/cancel_test.cpp

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,17 @@ class UnitTestServiceCancelEcho final : public sample::ugrpc::UnitTestServiceBas
3030
public:
3131
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
3232
sample::ugrpc::StreamGreetingRequest request;
33-
sample::ugrpc::StreamGreetingResponse response{};
34-
35-
EXPECT_TRUE(stream.Read(request));
36-
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.UninitializedObject)
37-
UEXPECT_NO_THROW(stream.Write(response));
38-
39-
EXPECT_FALSE(stream.Read(request));
40-
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.UninitializedObject)
41-
UEXPECT_THROW(stream.Write(response), ugrpc::server::RpcInterruptedError);
4233

34+
{
35+
sample::ugrpc::StreamGreetingResponse response{};
36+
EXPECT_TRUE(stream.Read(request));
37+
UEXPECT_NO_THROW(stream.Write(std::move(response)));
38+
}
39+
{
40+
sample::ugrpc::StreamGreetingResponse response{};
41+
EXPECT_FALSE(stream.Read(request));
42+
UEXPECT_THROW(stream.Write(std::move(response)), ugrpc::server::RpcInterruptedError);
43+
}
4344
return grpc::Status::OK;
4445
}
4546
};
@@ -75,8 +76,7 @@ class UnitTestServiceCancelEchoInf final : public sample::ugrpc::UnitTestService
7576
return grpc::Status::OK;
7677
}
7778
sample::ugrpc::StreamGreetingResponse response{};
78-
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.UninitializedObject)
79-
stream.Write(response);
79+
stream.Write(std::move(response));
8080
return grpc::Status::OK;
8181
}
8282
}
@@ -120,10 +120,8 @@ class UnitTestServiceCancelEchoInfWrites final : public sample::ugrpc::UnitTestS
120120
sample::ugrpc::StreamGreetingRequest request;
121121
EXPECT_TRUE(stream.Read(request));
122122

123-
sample::ugrpc::StreamGreetingResponse response{};
124123
for (;;) {
125-
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.UninitializedObject)
126-
stream.Write(response);
124+
stream.Write(sample::ugrpc::StreamGreetingResponse{});
127125
}
128126

129127
return grpc::Status::OK;
@@ -166,10 +164,8 @@ class UnitTestServiceCancelEchoNoSecondWrite final : public sample::ugrpc::UnitT
166164
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
167165
sample::ugrpc::StreamGreetingRequest request;
168166
EXPECT_TRUE(stream.Read(request));
169-
sample::ugrpc::StreamGreetingResponse response{};
170167

171-
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.UninitializedObject)
172-
stream.Write(response);
168+
stream.Write(sample::ugrpc::StreamGreetingResponse{});
173169
return grpc::Status::OK;
174170
}
175171
};
@@ -197,11 +193,11 @@ class UnitTestServiceEcho final : public sample::ugrpc::UnitTestServiceBase {
197193
public:
198194
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
199195
sample::ugrpc::StreamGreetingRequest request;
200-
sample::ugrpc::StreamGreetingResponse response;
201196
while (stream.Read(request)) {
197+
sample::ugrpc::StreamGreetingResponse response;
202198
response.set_name(request.name());
203199
response.set_number(request.number());
204-
stream.Write(response);
200+
stream.Write(std::move(response));
205201
}
206202
return grpc::Status::OK;
207203
}

grpc/tests/client_cancel_test.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
3333
sample::ugrpc::StreamGreetingRequest&& request,
3434
ReadManyWriter& writer
3535
) override {
36-
sample::ugrpc::StreamGreetingResponse response;
37-
response.set_name("Hello again " + request.name());
36+
const std::string response_name = "Hello again " + request.name();
3837
for (int i = 0; i < request.number(); ++i) {
38+
sample::ugrpc::StreamGreetingResponse response;
39+
response.set_name(response_name);
3940
response.set_number(i);
40-
writer.Write(response);
41+
writer.Write(std::move(response));
4142
}
4243
return grpc::Status::OK;
4344
}
@@ -56,13 +57,13 @@ class UnitTestService final : public sample::ugrpc::UnitTestServiceBase {
5657

5758
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
5859
sample::ugrpc::StreamGreetingRequest request;
59-
sample::ugrpc::StreamGreetingResponse response;
6060
int count = 0;
6161
while (stream.Read(request)) {
62+
sample::ugrpc::StreamGreetingResponse response;
6263
++count;
6364
response.set_number(count);
6465
response.set_name("Hello " + request.name());
65-
stream.Write(response);
66+
stream.Write(std::move(response));
6667
}
6768
return grpc::Status::OK;
6869
}

0 commit comments

Comments
 (0)