Skip to content

Commit 7dde47e

Browse files
committed
refactor grpc: refactor server::impl::CallProcessor
commit_hash:f0bd4b271eaee251f31980cf36da1fd376870523
1 parent 7d56d86 commit 7dde47e

File tree

9 files changed

+117
-134
lines changed

9 files changed

+117
-134
lines changed

grpc/include/userver/ugrpc/client/impl/unary_call.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <userver/engine/sleep.hpp>
88
#include <userver/server/request/task_inherited_data.hpp>
99
#include <userver/tracing/tags.hpp>
10+
#include <userver/utils/fast_scope_guard.hpp>
1011
#include <userver/utils/impl/internal_tag.hpp>
1112

1213
#include <userver/ugrpc/client/call_context.hpp>

grpc/include/userver/ugrpc/server/impl/call_processor.hpp

Lines changed: 55 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ void SetupSpan(
3535
std::string_view method_name
3636
);
3737

38-
grpc::Status ReportHandlerError(const std::exception& ex, CallState& state) noexcept;
39-
40-
void ReportRpcInterruptedError(CallState& state) noexcept;
41-
4238
grpc::Status ReportCustomError(const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex, CallState& state)
4339
noexcept;
4440

45-
void ReportFinish(bool finish_op_succeeded, const grpc::Status& status, CallState& state) noexcept;
41+
grpc::Status ReportHandlerError(const std::exception& ex, CallState& state) noexcept;
42+
43+
void ReportFinished(const grpc::Status& status, CallState& state) noexcept;
44+
45+
void ReportInterrupted(CallState& state) noexcept;
4646

4747
template <typename Response>
4848
void UnpackResult(Result<Response>&& result, std::optional<Response>& response, grpc::Status& status) {
@@ -64,6 +64,28 @@ void UnpackResult(StreamingResult<Response>&& result, std::optional<Response>& r
6464
}
6565
}
6666

67+
template <typename CallTraits>
68+
bool Finish(
69+
impl::Responder<CallTraits>& responder,
70+
const std::optional<typename CallTraits::Response>& response,
71+
grpc::Status& status
72+
) {
73+
if (status.ok()) {
74+
if constexpr (IsServerStreaming(CallTraits::kCallKind)) {
75+
if (response.has_value()) {
76+
return responder.Finish(*response);
77+
} else {
78+
return responder.Finish();
79+
}
80+
} else {
81+
UINVARIANT(response.has_value(), "response should not be empty");
82+
return responder.Finish(*response);
83+
}
84+
} else {
85+
return responder.FinishWithError(status);
86+
}
87+
}
88+
6789
template <typename CallTraits>
6890
class CallProcessor final {
6991
public:
@@ -84,7 +106,7 @@ class CallProcessor final {
84106
)
85107
: state_(std::move(params), CallTraits::kCallKind),
86108
responder_(state_, raw_responder),
87-
middleware_call_context_(utils::impl::InternalTag{}, state_),
109+
middleware_call_context_(utils::impl::InternalTag{}, state_, status_),
88110
initial_request_(initial_request),
89111
service_(service),
90112
service_method_(service_method)
@@ -102,52 +124,29 @@ class CallProcessor final {
102124
void DoCall() {
103125
RunOnCallStart();
104126

127+
bool finished = false;
128+
105129
// Don't keep the config snapshot for too long, especially for streaming RPCs.
106130
state_.config_snapshot.reset();
107131

108-
// Final response is the response sent to the client together with status in the final batch.
109-
std::optional<Response> final_response;
110-
111-
if (!Status().ok()) {
112-
RunOnCallFinish(final_response);
113-
impl::ReportFinish(responder_.FinishWithError(Status()), Status(), state_);
114-
return;
132+
std::optional<Response> response;
133+
if (!engine::current_task::ShouldCancel() && status_.ok()) {
134+
RunWithCatch([this, &response] {
135+
auto result = CallHandler();
136+
impl::UnpackResult(std::move(result), response, status_);
137+
});
115138
}
116139

117-
RunWithCatch([this, &final_response] {
118-
auto result = CallHandler();
119-
impl::UnpackResult(std::move(result), final_response, Status());
120-
});
121-
122-
// Streaming handler can detect RPC breakage during a network interaction => IsInterrupted().
123-
// RpcFinishedEvent can signal RPC interruption while in the handler => ShouldCancel.
124-
if (responder_.IsInterrupted() || engine::current_task::ShouldCancel()) {
125-
impl::ReportRpcInterruptedError(state_);
126-
// Don't run OnCallFinish.
127-
return;
140+
if (!engine::current_task::ShouldCancel() && !responder_.IsInterrupted()) {
141+
RunOnCallFinish(response);
142+
finished = impl::Finish(responder_, response, status_);
128143
}
129144

130-
if (!Status().ok()) {
131-
RunOnCallFinish(final_response);
132-
impl::ReportFinish(responder_.FinishWithError(Status()), Status(), state_);
133-
return;
134-
}
135-
136-
RunOnCallFinish(final_response);
137-
138-
if (!Status().ok()) {
139-
impl::ReportFinish(responder_.FinishWithError(Status()), Status(), state_);
140-
return;
141-
}
142-
143-
if constexpr (IsServerStreaming(CallTraits::kCallKind)) {
144-
if (!final_response) {
145-
impl::ReportFinish(responder_.Finish(), Status(), state_);
146-
return;
147-
}
145+
if (finished) {
146+
impl::ReportFinished(status_, state_);
147+
} else {
148+
impl::ReportInterrupted(state_);
148149
}
149-
UASSERT(final_response);
150-
impl::ReportFinish(responder_.Finish(*final_response), Status(), state_);
151150
}
152151

153152
private:
@@ -169,37 +168,37 @@ class CallProcessor final {
169168
UASSERT(success_pre_hooks_count_ == 0);
170169
for (const auto& m : state_.middlewares) {
171170
RunWithCatch([this, &m] { m->OnCallStart(middleware_call_context_); });
172-
if (!Status().ok()) {
171+
if (!status_.ok()) {
173172
return;
174173
}
175174
// On fail, we must call OnRpcFinish only for middlewares for which OnRpcStart has been called successfully.
176175
// So, we watch to count of these middlewares.
177176
++success_pre_hooks_count_;
178177
if constexpr (std::is_base_of_v<google::protobuf::Message, InitialRequest>) {
179178
RunWithCatch([this, &m] { m->PostRecvMessage(middleware_call_context_, initial_request_); });
180-
if (!Status().ok()) {
179+
if (!status_.ok()) {
181180
return;
182181
}
183182
}
184183
}
185184
}
186185

187-
void RunOnCallFinish(std::optional<Response>& final_response) {
186+
void RunOnCallFinish(std::optional<Response>& response) {
188187
const auto& mids = state_.middlewares;
189188
const auto rbegin = mids.rbegin() + (mids.size() - success_pre_hooks_count_);
190189
for (auto it = rbegin; it != mids.rend(); ++it) {
191190
const auto& middleware = *it;
192191

193192
if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
194-
if (Status().ok() && final_response.has_value()) {
195-
RunWithCatch([this, &middleware, &final_response] {
196-
middleware->PreSendMessage(middleware_call_context_, *final_response);
193+
if (status_.ok() && response.has_value()) {
194+
RunWithCatch([this, &middleware, &response] {
195+
middleware->PreSendMessage(middleware_call_context_, *response);
197196
});
198197
}
199198
}
200199

201200
// We must call all OnRpcFinish despite the failures. So, don't check the status.
202-
RunWithCatch([this, &middleware] { middleware->OnCallFinish(middleware_call_context_, Status()); });
201+
RunWithCatch([this, &middleware] { middleware->OnCallFinish(middleware_call_context_, status_); });
203202
}
204203
}
205204

@@ -208,23 +207,22 @@ class CallProcessor final {
208207
try {
209208
func();
210209
} catch (MiddlewareRpcInterruptionError& ex) {
211-
Status() = ex.ExtractStatus();
210+
status_ = ex.ExtractStatus();
212211
} catch (ErrorWithStatus& ex) {
213-
Status() = ex.ExtractStatus();
212+
status_ = ex.ExtractStatus();
214213
} catch (const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex) {
215-
Status() = impl::ReportCustomError(ex, state_);
214+
status_ = impl::ReportCustomError(ex, state_);
216215
} catch (const RpcInterruptedError& /*ex*/) {
217216
UASSERT(responder_.IsInterrupted());
218217
// RPC interruption will be reported below.
219218
} catch (const std::exception& ex) {
220-
Status() = impl::ReportHandlerError(ex, state_);
219+
status_ = impl::ReportHandlerError(ex, state_);
221220
}
222221
}
223222

224-
grpc::Status& Status() { return middleware_call_context_.GetStatus(utils::impl::InternalTag{}); }
225-
226223
CallState state_;
227224
Responder responder_;
225+
grpc::Status status_;
228226
MiddlewareCallContext middleware_call_context_;
229227
// Initial request is the request which is sent to the service together with RPC initiation.
230228
// Unary-request RPCs have an initial request, client-streaming RPCs don't.

grpc/include/userver/ugrpc/server/impl/call_state.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <optional>
44
#include <string_view>
55

6+
#include <boost/container/flat_map.hpp>
7+
68
#include <grpcpp/completion_queue.h>
79
#include <grpcpp/server_context.h>
810

grpc/include/userver/ugrpc/server/middlewares/base.hpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class MiddlewareCallContext final : public CallContextBase {
4343
public:
4444
/// @cond
4545
// For internal use only
46-
MiddlewareCallContext(utils::impl::InternalTag, impl::CallState& state);
46+
MiddlewareCallContext(utils::impl::InternalTag, impl::CallState& state, grpc::Status& status);
4747
/// @endcond
4848

4949
/// @brief Aborts the RPC, returning the specified status to the upstream client, see details below.
@@ -80,13 +80,10 @@ class MiddlewareCallContext final : public CallContextBase {
8080
/// @cond
8181
// For internal use only.
8282
ugrpc::impl::RpcStatisticsScope& GetStatistics(utils::impl::InternalTag);
83-
84-
// For internal use only.
85-
grpc::Status& GetStatus(utils::impl::InternalTag) { return status_; }
8683
/// @endcond
8784

8885
private:
89-
grpc::Status status_;
86+
grpc::Status& status_;
9087
};
9188

9289
/// @ingroup userver_base_classes userver_grpc_server_middlewares

grpc/src/ugrpc/server/impl/call_processor.cpp

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,6 @@ namespace ugrpc::server::impl {
2222

2323
namespace {
2424

25-
void ReportFinishSuccess(const grpc::Status& status, CallState& state) noexcept {
26-
try {
27-
state.statistics_scope.OnExplicitFinish(status.error_code());
28-
29-
auto& span = state.GetSpan();
30-
span.AddNonInheritableTag(tracing::kGrpcCode, ugrpc::ToString(status.error_code()));
31-
if (!status.ok()) {
32-
span.AddNonInheritableTag(tracing::kErrorFlag, true);
33-
span.AddNonInheritableTag(tracing::kErrorMessage, status.error_message());
34-
const auto default_error_log_level =
35-
IsServerError(status.error_code()) ? logging::Level::kError : logging::Level::kWarning;
36-
const auto error_log_level =
37-
utils::FindOrDefault(state.status_codes_log_level, status.error_code(), default_error_log_level);
38-
span.SetLogLevel(error_log_level);
39-
}
40-
} catch (const std::exception& ex) {
41-
LOG_ERROR() << "Error in ReportFinishSuccess: " << ex;
42-
}
43-
}
44-
4525
logging::Level AdjustLogLevelForCancellations(logging::Level level) {
4626
return engine::current_task::ShouldCancel() ? std::min(level, logging::Level::kWarning) : level;
4727
}
@@ -98,6 +78,26 @@ void SetupSpan(
9878
span.AddNonInheritableTag(tracing::kRpcMethod, std::string{method_name});
9979
}
10080

81+
grpc::Status ReportCustomError(const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex, CallState& state)
82+
noexcept {
83+
try {
84+
grpc::Status status{CustomStatusToGrpc(ex.GetCode()), ex.GetExternalErrorBody()};
85+
86+
const auto log_level = AdjustLogLevelForCancellations(
87+
IsServerError(status.error_code()) ? logging::Level::kError : logging::Level::kWarning
88+
);
89+
LOG(log_level) << "Error in " << state.call_name << ": " << ex;
90+
auto& span = state.GetSpan();
91+
span.AddNonInheritableTag(tracing::kErrorFlag, true);
92+
span.AddNonInheritableTag(tracing::kErrorMessage, ex.what());
93+
span.SetLogLevel(log_level);
94+
return status;
95+
} catch (const std::exception& new_ex) {
96+
LOG_ERROR() << "Error in ReportCustomError: " << new_ex;
97+
return grpc::Status{grpc::StatusCode::INTERNAL, ""};
98+
}
99+
}
100+
101101
grpc::Status ReportHandlerError(const std::exception& ex, CallState& state) noexcept {
102102
try {
103103
auto& span = state.GetSpan();
@@ -113,7 +113,26 @@ grpc::Status ReportHandlerError(const std::exception& ex, CallState& state) noex
113113
}
114114
}
115115

116-
void ReportRpcInterruptedError(CallState& state) noexcept {
116+
void ReportFinished(const grpc::Status& status, CallState& state) noexcept {
117+
try {
118+
state.statistics_scope.OnExplicitFinish(status.error_code());
119+
auto& span = state.GetSpan();
120+
span.AddNonInheritableTag(tracing::kGrpcCode, ugrpc::ToString(status.error_code()));
121+
if (!status.ok()) {
122+
span.AddNonInheritableTag(tracing::kErrorFlag, true);
123+
span.AddNonInheritableTag(tracing::kErrorMessage, status.error_message());
124+
const auto default_error_log_level =
125+
IsServerError(status.error_code()) ? logging::Level::kError : logging::Level::kWarning;
126+
const auto error_log_level =
127+
utils::FindOrDefault(state.status_codes_log_level, status.error_code(), default_error_log_level);
128+
span.SetLogLevel(error_log_level);
129+
}
130+
} catch (const std::exception& ex) {
131+
LOG_ERROR() << "Error in ReportFinished: " << ex;
132+
}
133+
}
134+
135+
void ReportInterrupted(CallState& state) noexcept {
117136
try {
118137
// RPC interruption leads to asynchronous task cancellation by RpcFinishedEvent,
119138
// so the task either is already cancelled, or is going to be cancelled.
@@ -122,39 +141,11 @@ void ReportRpcInterruptedError(CallState& state) noexcept {
122141
<< "'. The previously logged cancellation or network exception, if any, is likely caused by it.";
123142
state.statistics_scope.OnNetworkError();
124143
auto& span = state.GetSpan();
125-
span.AddNonInheritableTag(tracing::kErrorMessage, "RPC interrupted");
126144
span.AddNonInheritableTag(tracing::kErrorFlag, true);
145+
span.AddNonInheritableTag(tracing::kErrorMessage, "RPC interrupted");
127146
span.SetLogLevel(logging::Level::kWarning);
128147
} catch (const std::exception& ex) {
129-
LOG_ERROR() << "Error in ReportRpcInterruptedError: " << ex;
130-
}
131-
}
132-
133-
grpc::Status ReportCustomError(const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex, CallState& state)
134-
noexcept {
135-
try {
136-
grpc::Status status{CustomStatusToGrpc(ex.GetCode()), ex.GetExternalErrorBody()};
137-
138-
const auto log_level = AdjustLogLevelForCancellations(
139-
IsServerError(status.error_code()) ? logging::Level::kError : logging::Level::kWarning
140-
);
141-
LOG(log_level) << "Error in " << state.call_name << ": " << ex;
142-
auto& span = state.GetSpan();
143-
span.AddNonInheritableTag(tracing::kErrorFlag, true);
144-
span.AddNonInheritableTag(tracing::kErrorMessage, ex.what());
145-
span.SetLogLevel(log_level);
146-
return status;
147-
} catch (const std::exception& new_ex) {
148-
LOG_ERROR() << "Error in ReportCustomError: " << new_ex;
149-
return grpc::Status{grpc::StatusCode::INTERNAL, ""};
150-
}
151-
}
152-
153-
void ReportFinish(bool finish_op_succeeded, const grpc::Status& status, CallState& state) noexcept {
154-
if (finish_op_succeeded) {
155-
ReportFinishSuccess(status, state);
156-
} else {
157-
ReportRpcInterruptedError(state);
148+
LOG_ERROR() << "Error in ReportInterrupted: " << ex;
158149
}
159150
}
160151

grpc/src/ugrpc/server/impl/rpc.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@ std::unique_lock<engine::SingleWaitingTaskMutex> ResponderBase::TakeMutexIfBidir
3131

3232
void ResponderBase::ApplyRequestHook(google::protobuf::Message& request) {
3333
auto lock = TakeMutexIfBidirectional();
34-
MiddlewareCallContext middleware_call_context{utils::impl::InternalTag{}, state_};
35-
34+
grpc::Status status;
35+
MiddlewareCallContext middleware_call_context{utils::impl::InternalTag{}, state_, status};
3636
for (const auto& middleware : state_.middlewares) {
3737
middleware->PostRecvMessage(middleware_call_context, request);
38-
auto& status = middleware_call_context.GetStatus(utils::impl::InternalTag{});
3938
if (!status.ok()) {
4039
throw impl::MiddlewareRpcInterruptionError(std::move(status));
4140
}
@@ -44,11 +43,10 @@ void ResponderBase::ApplyRequestHook(google::protobuf::Message& request) {
4443

4544
void ResponderBase::ApplyResponseHook(google::protobuf::Message& response) {
4645
auto lock = TakeMutexIfBidirectional();
47-
MiddlewareCallContext middleware_call_context{utils::impl::InternalTag{}, state_};
48-
46+
grpc::Status status;
47+
MiddlewareCallContext middleware_call_context{utils::impl::InternalTag{}, state_, status};
4948
for (const auto& middleware : boost::adaptors::reverse(state_.middlewares)) {
5049
middleware->PreSendMessage(middleware_call_context, response);
51-
auto& status = middleware_call_context.GetStatus(utils::impl::InternalTag{});
5250
if (!status.ok()) {
5351
throw impl::MiddlewareRpcInterruptionError(std::move(status));
5452
}

0 commit comments

Comments
 (0)