Skip to content

Commit 18a4170

Browse files
committed
refactor grpc: refactor server Responder
commit_hash:0317eaa3f81fb9510d403fab69d338bb5004b478
1 parent 26ded17 commit 18a4170

File tree

4 files changed

+36
-47
lines changed

4 files changed

+36
-47
lines changed

grpc/include/userver/ugrpc/server/impl/async_methods.hpp

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,47 @@ namespace ugrpc::server::impl {
1616
extern const grpc::Status kUnimplementedStatus;
1717
extern const grpc::Status kUnknownErrorStatus;
1818

19-
void CheckInvocationSucceeded(bool ok, std::string_view call_name, std::string_view stage);
20-
2119
template <typename GrpcStream, typename Response>
2220
[[nodiscard]] bool Finish(GrpcStream& stream, const Response& response, const grpc::Status& status) {
23-
ugrpc::impl::AsyncMethodInvocation finish;
24-
stream.Finish(response, status, finish.GetCompletionTag());
25-
return finish.WaitNonCancellable();
21+
ugrpc::impl::AsyncMethodInvocation invocation;
22+
stream.Finish(response, status, invocation.GetCompletionTag());
23+
return invocation.WaitNonCancellable();
2624
}
2725

2826
template <typename GrpcStream>
2927
[[nodiscard]] bool Finish(GrpcStream& stream, const grpc::Status& status) {
30-
ugrpc::impl::AsyncMethodInvocation finish;
31-
stream.Finish(status, finish.GetCompletionTag());
32-
return finish.WaitNonCancellable();
28+
ugrpc::impl::AsyncMethodInvocation invocation;
29+
stream.Finish(status, invocation.GetCompletionTag());
30+
return invocation.WaitNonCancellable();
3331
}
3432

3533
template <typename GrpcStream>
3634
[[nodiscard]] bool FinishWithError(GrpcStream& stream, const grpc::Status& status) {
3735
UASSERT(!status.ok());
38-
ugrpc::impl::AsyncMethodInvocation finish;
39-
stream.FinishWithError(status, finish.GetCompletionTag());
40-
return finish.WaitNonCancellable();
36+
ugrpc::impl::AsyncMethodInvocation invocation;
37+
stream.FinishWithError(status, invocation.GetCompletionTag());
38+
return invocation.WaitNonCancellable();
4139
}
4240

4341
template <typename GrpcStream>
44-
void SendInitialMetadata(GrpcStream& stream, std::string_view call_name) {
45-
ugrpc::impl::AsyncMethodInvocation metadata;
46-
stream.SendInitialMetadata(metadata.GetCompletionTag());
47-
CheckInvocationSucceeded(metadata.WaitNonCancellable(), call_name, "SendInitialMetadata");
42+
[[nodiscard]] bool SendInitialMetadata(GrpcStream& stream) {
43+
ugrpc::impl::AsyncMethodInvocation invocation;
44+
stream.SendInitialMetadata(invocation.GetCompletionTag());
45+
return invocation.WaitNonCancellable();
4846
}
4947

5048
template <typename GrpcStream, typename Request>
5149
[[nodiscard]] bool Read(GrpcStream& stream, Request& request) {
52-
ugrpc::impl::AsyncMethodInvocation read;
53-
stream.Read(&request, read.GetCompletionTag());
54-
return read.WaitNonCancellable();
50+
ugrpc::impl::AsyncMethodInvocation invocation;
51+
stream.Read(&request, invocation.GetCompletionTag());
52+
return invocation.WaitNonCancellable();
5553
}
5654

5755
template <typename GrpcStream, typename Response>
58-
void Write(GrpcStream& stream, const Response& response, grpc::WriteOptions options, std::string_view call_name) {
59-
ugrpc::impl::AsyncMethodInvocation write;
60-
stream.Write(response, options, write.GetCompletionTag());
61-
CheckInvocationSucceeded(write.WaitNonCancellable(), call_name, "Write");
56+
[[nodiscard]] bool Write(GrpcStream& stream, const Response& response, grpc::WriteOptions options) {
57+
ugrpc::impl::AsyncMethodInvocation invocation;
58+
stream.Write(response, options, invocation.GetCompletionTag());
59+
return invocation.WaitNonCancellable();
6260
}
6361

6462
template <typename GrpcStream, typename Response>
@@ -68,9 +66,9 @@ template <typename GrpcStream, typename Response>
6866
grpc::WriteOptions options,
6967
const grpc::Status& status
7068
) {
71-
ugrpc::impl::AsyncMethodInvocation write_and_finish;
72-
stream.WriteAndFinish(response, options, status, write_and_finish.GetCompletionTag());
73-
return write_and_finish.WaitNonCancellable();
69+
ugrpc::impl::AsyncMethodInvocation invocation;
70+
stream.WriteAndFinish(response, options, status, invocation.GetCompletionTag());
71+
return invocation.WaitNonCancellable();
7472
}
7573

7674
} // namespace ugrpc::server::impl

grpc/include/userver/ugrpc/server/impl/call_processor.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ class CallProcessor final {
119119
impl::UnpackResult(std::move(result), final_response, Status());
120120
});
121121

122-
// Streaming handler can detect RPC breakage during a network interaction => IsFinished.
122+
// Streaming handler can detect RPC breakage during a network interaction => IsInterrupted().
123123
// RpcFinishedEvent can signal RPC interruption while in the handler => ShouldCancel.
124-
if (responder_.IsFinished() || engine::current_task::ShouldCancel()) {
124+
if (responder_.IsInterrupted() || engine::current_task::ShouldCancel()) {
125125
impl::ReportRpcInterruptedError(state_);
126126
// Don't run OnCallFinish.
127127
return;
@@ -223,7 +223,7 @@ class CallProcessor final {
223223
} catch (const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex) {
224224
Status() = impl::ReportCustomError(ex, state_);
225225
} catch (const RpcInterruptedError& /*ex*/) {
226-
UASSERT(responder_.IsFinished());
226+
UASSERT(responder_.IsInterrupted());
227227
// RPC interruption will be reported below.
228228
} catch (const std::exception& ex) {
229229
Status() = impl::ReportHandlerError(ex, state_);

grpc/include/userver/ugrpc/server/impl/rpc.hpp

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Responder final : public ResponderBase, public CallTraits::StreamAdapter {
6666
Responder& operator=(Responder&&) = delete;
6767
~Responder() override;
6868

69-
bool IsFinished() const { return is_finished_; }
69+
bool IsInterrupted() const { return is_interrupted_; }
7070

7171
/// @brief Await and read the next incoming message. Only makes sense for client-streaming RPCs.
7272
/// @param request where to put the request on success
@@ -113,6 +113,7 @@ class Responder final : public ResponderBase, public CallTraits::StreamAdapter {
113113
RawResponder& raw_responder_;
114114
// Separate flags are required to be able to set them in parallel in Read and Write.
115115
bool are_reads_done_{kCallKind == CallKind::kUnaryCall};
116+
bool is_interrupted_{false};
116117
bool is_finished_{false};
117118
};
118119

@@ -124,7 +125,7 @@ Responder<CallTraits>::Responder(CallState& state, RawResponder& raw_responder)
124125

125126
template <typename CallTraits>
126127
Responder<CallTraits>::~Responder() {
127-
UASSERT(is_finished_ || engine::current_task::ShouldCancel());
128+
UASSERT(is_finished_ || is_interrupted_ || engine::current_task::ShouldCancel());
128129
}
129130

130131
template <typename CallTraits>
@@ -146,7 +147,7 @@ bool Responder<CallTraits>::DoRead(Request& request) {
146147
template <typename CallTraits>
147148
void Responder<CallTraits>::DoWrite(Response& response, const grpc::WriteOptions& options) {
148149
static_assert(impl::IsServerStreaming(kCallKind));
149-
UINVARIANT(!is_finished_, "'Write' called on a finished stream");
150+
UINVARIANT(!is_interrupted_, "'Write' called on an interrupted stream");
150151

151152
if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
152153
ApplyResponseHook(response);
@@ -156,20 +157,16 @@ void Responder<CallTraits>::DoWrite(Response& response, const grpc::WriteOptions
156157
// For some reason, gRPC requires explicit 'SendInitialMetadata' in output streams.
157158
if (!are_reads_done_) {
158159
are_reads_done_ = true;
159-
try {
160-
impl::SendInitialMetadata(raw_responder_, GetCallName());
161-
} catch (const RpcInterruptedError&) {
162-
is_finished_ = true;
163-
throw;
160+
if (!impl::SendInitialMetadata(raw_responder_)) {
161+
is_interrupted_ = true;
162+
throw RpcInterruptedError(GetCallName(), "SendInitialMetadata");
164163
}
165164
}
166165
}
167166

168-
try {
169-
impl::Write(raw_responder_, response, options, GetCallName());
170-
} catch (const RpcInterruptedError&) {
171-
is_finished_ = true;
172-
throw;
167+
if (!impl::Write(raw_responder_, response, options)) {
168+
is_interrupted_ = true;
169+
throw RpcInterruptedError(GetCallName(), "Write");
173170
}
174171
}
175172

grpc/src/ugrpc/server/impl/async_methods.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@ const grpc::Status kUnknownErrorStatus{
1313
"The service method has exited unexpectedly, without providing a status"
1414
};
1515

16-
void CheckInvocationSucceeded(bool ok, std::string_view call_name, std::string_view stage) {
17-
if (!ok) {
18-
throw RpcInterruptedError(call_name, stage);
19-
}
20-
}
21-
2216
} // namespace ugrpc::server::impl
2317

2418
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)