Skip to content

Commit 47105c3

Browse files
committed
bug grpc: enable Congestion Control for grpc services (rebase)
origin: **bug grpc: enable Congestion Control for grpc services** <https://nda.ya.ru/t/Ajxwo_qT7WdSzs> commit_hash:e5b223f405e65d1843713fcf0195135eccad56aa
1 parent 50e6dd1 commit 47105c3

File tree

13 files changed

+105
-25
lines changed

13 files changed

+105
-25
lines changed

core/include/userver/congestion_control/limiter.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ USERVER_NAMESPACE_BEGIN
88
namespace congestion_control {
99

1010
struct Limit {
11-
std::optional<size_t> load_limit;
12-
size_t current_load{0};
11+
std::optional<std::size_t> load_limit;
12+
std::size_t current_load{0};
1313

14-
std::string ToLogString() { return "limit=" + (load_limit ? std::to_string(*load_limit) : std::string("(none)")); }
14+
std::string ToLogString() const {
15+
return "limit=" + (load_limit ? std::to_string(*load_limit) : std::string("(none)"));
16+
}
1517
};
1618

1719
class Limiter {

core/include/userver/server/congestion_control/limiter.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@ namespace server::congestion_control {
1414
class Limitee {
1515
public:
1616
virtual void SetLimit(std::optional<size_t> new_limit) = 0;
17+
18+
virtual std::size_t GetLimitableHandlersCount() const = 0;
1719
};
1820

1921
class Limiter final : public USERVER_NAMESPACE::congestion_control::Limiter {
2022
public:
2123
void SetLimit(const USERVER_NAMESPACE::congestion_control::Limit& new_limit) override;
2224

25+
std::size_t GetLimitableHandlersCount() const;
26+
2327
void RegisterLimitee(Limitee& limitee);
2428

2529
private:

core/include/userver/server/server.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ class Server final : public congestion_control::Limitee, public congestion_contr
5050

5151
void AddHandler(const handlers::HttpHandlerBase& handler, engine::TaskProcessor& task_processor);
5252

53-
size_t GetThrottlableHandlersCount() const;
54-
5553
const http::HttpRequestHandler& GetHttpRequestHandler(bool is_monitor = false) const;
5654

5755
void StartMonitorPort();
@@ -63,6 +61,8 @@ class Server final : public congestion_control::Limitee, public congestion_contr
6361

6462
void SetLimit(std::optional<size_t> new_limit) override;
6563

64+
size_t GetLimitableHandlersCount() const override;
65+
6666
void SetRpsRatelimit(std::optional<size_t> rps);
6767

6868
void SetRpsRatelimitStatusCode(http::HttpStatus status_code);

core/src/congestion_control/component.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@ void Component::OnConfigUpdate(const dynamic_config::Snapshot& cfg) {
149149
}
150150

151151
void Component::OnAllComponentsLoaded() {
152-
LOG_DEBUG()
153-
<< "Found " << pimpl_->server.GetThrottlableHandlersCount()
154-
<< " registered HTTP handlers with enabled throttling";
155-
if (pimpl_->server.GetThrottlableHandlersCount() == 0) {
152+
const auto limitable_handlers_count = pimpl_->server_limiter.GetLimitableHandlersCount();
153+
LOG_DEBUG() << "Found " << limitable_handlers_count << " registered handlers with enabled throttling";
154+
155+
if (limitable_handlers_count == 0) {
156156
pimpl_->force_disabled = true;
157-
LOG_WARNING() << "No throttlable HTTP handlers registered, disabling";
157+
LOG_WARNING() << "No throttlable handlers registered, disabling";
158158

159159
// apply fake_mode
160160
OnConfigUpdate(pimpl_->dynamic_config.GetSnapshot());

core/src/server/congestion_control/limiter.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include <userver/server/congestion_control/limiter.hpp>
22

3+
#include <boost/range/adaptor/transformed.hpp>
4+
#include <boost/range/numeric.hpp>
5+
36
USERVER_NAMESPACE_BEGIN
47

58
namespace server::congestion_control {
@@ -13,6 +16,14 @@ void Limiter::SetLimit(const USERVER_NAMESPACE::congestion_control::Limit& new_l
1316
}
1417
}
1518

19+
std::size_t Limiter::GetLimitableHandlersCount() const {
20+
auto lock = limitees_.Lock();
21+
return boost::accumulate(
22+
*lock | boost::adaptors::transformed([](auto ptr) { return ptr->GetLimitableHandlersCount(); }),
23+
std::size_t{0}
24+
);
25+
}
26+
1627
void Limiter::RegisterLimitee(Limitee& limitee) {
1728
auto lock = limitees_.Lock();
1829
lock->emplace_back(&limitee);

core/src/server/server.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ RequestsView& ServerImpl::GetRequestsView() {
292292
return requests_view_;
293293
}
294294

295-
void Server::SetLimit(std::optional<size_t> new_limit) { SetRpsRatelimit(new_limit); }
296-
297295
void ServerImpl::WriteTotalHandlerStatistics(utils::statistics::Writer& writer) const {
298296
handlers::HttpHandlerStatisticsSnapshot total;
299297

@@ -381,8 +379,6 @@ void Server::AddHandler(const handlers::HttpHandlerBase& handler, engine::TaskPr
381379
pimpl_->AddHandler(handler, task_processor);
382380
}
383381

384-
size_t Server::GetThrottlableHandlersCount() const { return pimpl_->GetThrottlableHandlersCount(); }
385-
386382
const http::HttpRequestHandler& Server::GetHttpRequestHandler(bool is_monitor) const {
387383
return pimpl_->GetHttpRequestHandler(is_monitor);
388384
}
@@ -404,6 +400,10 @@ void Server::Stop() { pimpl_->Stop(); }
404400

405401
RequestsView& Server::GetRequestsView() { return pimpl_->GetRequestsView(); }
406402

403+
void Server::SetLimit(std::optional<size_t> new_limit) { SetRpsRatelimit(new_limit); }
404+
405+
size_t Server::GetLimitableHandlersCount() const { return pimpl_->GetThrottlableHandlersCount(); }
406+
407407
void Server::SetRpsRatelimit(std::optional<size_t> rps) { pimpl_->SetRpsRatelimit(rps); }
408408

409409
void Server::SetRpsRatelimitStatusCode(http::HttpStatus status_code) { pimpl_->SetRpsRatelimitStatusCode(status_code); }

grpc/functional_tests/middleware_server/src/service.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,38 @@
22

33
#include <fmt/format.h>
44

5+
#include <userver/components/component_context.hpp>
6+
#include <userver/congestion_control/component.hpp>
57
#include <userver/engine/async.hpp>
68
#include <userver/engine/sleep.hpp>
9+
#include <userver/utils/assert.hpp>
710
#include <userver/yaml_config/merge_schemas.hpp>
811

912
#include <userver/ugrpc/server/service_component_base.hpp>
1013

1114
namespace functional_tests {
1215

16+
namespace {
17+
18+
void EnsureCongestionControlEnbled(const congestion_control::Controller& congestion_control_controller) {
19+
UINVARIANT(congestion_control_controller.IsEnabled(), "CongestionControl Controller should be enabled");
20+
}
21+
22+
} // namespace
23+
24+
GreeterServiceComponent::GreeterServiceComponent(
25+
const components::ComponentConfig& config,
26+
const components::ComponentContext& context
27+
)
28+
: samples::api::GreeterServiceBase::Component(config, context),
29+
congestion_control_controller_{context.FindComponent<congestion_control::Component>().GetServerController()}
30+
{}
31+
1332
GreeterServiceComponent::SayHelloResult GreeterServiceComponent::SayHello(
1433
CallContext& /*context*/,
1534
samples::api::GreetingRequest&& request
1635
) {
36+
EnsureCongestionControlEnbled(congestion_control_controller_);
1737
samples::api::GreetingResponse response;
1838
response.set_greeting(fmt::format("Hello, {}!", request.name()));
1939
return response;
@@ -24,6 +44,7 @@ GreeterServiceComponent::SayHelloResponseStreamResult GreeterServiceComponent::S
2444
samples::api::GreetingRequest&& request,
2545
SayHelloResponseStreamWriter& writer
2646
) {
47+
EnsureCongestionControlEnbled(congestion_control_controller_);
2748
std::string message = fmt::format("{}, {}", "Hello", request.name());
2849
constexpr auto kCountSend = 5;
2950
constexpr std::chrono::milliseconds kTimeInterval{200};
@@ -41,6 +62,7 @@ GreeterServiceComponent::SayHelloRequestStreamResult GreeterServiceComponent::Sa
4162
CallContext& /*context*/,
4263
SayHelloRequestStreamReader& reader
4364
) {
65+
EnsureCongestionControlEnbled(congestion_control_controller_);
4466
std::string income_message;
4567
samples::api::GreetingRequest request;
4668
while (reader.Read(request)) {
@@ -55,6 +77,7 @@ GreeterServiceComponent::SayHelloStreamsResult GreeterServiceComponent::SayHello
5577
CallContext& /*context*/,
5678
SayHelloStreamsReaderWriter& stream
5779
) {
80+
EnsureCongestionControlEnbled(congestion_control_controller_);
5881
constexpr std::chrono::milliseconds kTimeInterval{200};
5982
std::string income_message;
6083
samples::api::GreetingRequest request;

grpc/functional_tests/middleware_server/src/service.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22
#include <userver/utest/using_namespace_userver.hpp>
33

4+
#include <userver/congestion_control/controller.hpp>
5+
46
#include <samples/greeter_client.usrv.pb.hpp>
57
#include <samples/greeter_service.usrv.pb.hpp>
68

@@ -10,8 +12,7 @@ class GreeterServiceComponent final : public samples::api::GreeterServiceBase::C
1012
public:
1113
static constexpr std::string_view kName = "greeter-service";
1214

13-
GreeterServiceComponent(const components::ComponentConfig& config, const components::ComponentContext& context)
14-
: samples::api::GreeterServiceBase::Component(config, context) {}
15+
GreeterServiceComponent(const components::ComponentConfig& config, const components::ComponentContext& context);
1516

1617
SayHelloResult SayHello(CallContext& context, samples::api::GreetingRequest&& request) final;
1718

@@ -26,6 +27,9 @@ class GreeterServiceComponent final : public samples::api::GreeterServiceBase::C
2627
SayHelloStreamsResult SayHelloStreams(CallContext& context, SayHelloStreamsReaderWriter& stream) final;
2728

2829
static yaml_config::Schema GetStaticConfigSchema();
30+
31+
private:
32+
const congestion_control::Controller& congestion_control_controller_;
2933
};
3034

3135
} // namespace functional_tests
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,27 @@
1+
from typing import Any
2+
13
import pytest
24

35
import samples.greeter_pb2_grpc as greeter_services
46

57
pytest_plugins = ['pytest_userver.plugins.grpc']
68

79

10+
@pytest.fixture(scope='session')
11+
def dynamic_config_fallback_patch() -> dict[str, Any]:
12+
return {'USERVER_RPS_CCONTROL_ENABLED': True}
13+
14+
15+
@pytest.fixture(scope='session')
16+
def congestion_control_fake_mode() -> bool:
17+
return False
18+
19+
20+
@pytest.fixture(scope='session')
21+
def service_env():
22+
return {'CPU_LIMIT': '1c'}
23+
24+
825
@pytest.fixture
926
def grpc_client(grpc_channel):
1027
return greeter_services.GreeterServiceStub(grpc_channel)

grpc/include/userver/ugrpc/server/middlewares/congestion_control/component.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
/// @file userver/ugrpc/server/middlewares/congestion_control/component.hpp
44
/// @brief @copybrief ugrpc::server::middlewares::congestion_control::Component
55

6+
#include <atomic>
7+
#include <cstddef>
8+
69
#include <userver/server/congestion_control/limiter.hpp>
710
#include <userver/ugrpc/server/middlewares/base.hpp>
811
#include <userver/utils/token_bucket.hpp>
@@ -51,10 +54,13 @@ class Component final
5154

5255
void SetLimit(std::optional<size_t> new_limit) override;
5356

57+
std::size_t GetLimitableHandlersCount() const override;
58+
5459
private:
5560
std::shared_ptr<utils::TokenBucket> rate_limit_{
5661
std::make_shared<utils::TokenBucket>(utils::TokenBucket::MakeUnbounded())
5762
};
63+
mutable std::atomic<std::size_t> limitable_handlers_count_{0};
5864
};
5965

6066
} // namespace ugrpc::server::middlewares::congestion_control

0 commit comments

Comments
 (0)