Skip to content

Commit 067f163

Browse files
committed
test postgresql: u_clients table
commit_hash:22e090d729789289f0ddb47f46ed683f52004207
1 parent 9630356 commit 067f163

File tree

4 files changed

+302
-62
lines changed

4 files changed

+302
-62
lines changed

.mapping.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2938,6 +2938,7 @@
29382938
"postgresql/src/storages/postgres/tests/composite_types_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/composite_types_pgtest.cpp",
29392939
"postgresql/src/storages/postgres/tests/conn_stats_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/conn_stats_pgtest.cpp",
29402940
"postgresql/src/storages/postgres/tests/connection_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/connection_pgtest.cpp",
2941+
"postgresql/src/storages/postgres/tests/connlimit_watchdog_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/connlimit_watchdog_pgtest.cpp",
29412942
"postgresql/src/storages/postgres/tests/date_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/date_pgtest.cpp",
29422943
"postgresql/src/storages/postgres/tests/dsn_test.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/dsn_test.cpp",
29432944
"postgresql/src/storages/postgres/tests/enums_pgtest.cpp":"taxi/uservices/userver/postgresql/src/storages/postgres/tests/enums_pgtest.cpp",

postgresql/src/storages/postgres/connlimit_watchdog.cpp

Lines changed: 75 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include <storages/postgres/connlimit_watchdog.hpp>
22

33
#include <storages/postgres/detail/cluster_impl.hpp>
4-
#include <userver/hostinfo/blocking/get_hostname.hpp>
54
#include <userver/utils/from_string.hpp>
65

76
USERVER_NAMESPACE_BEGIN
@@ -16,75 +15,34 @@ constexpr size_t kReservedConn = 5;
1615
constexpr int kMaxStepsWithError = 3;
1716
constexpr size_t kFallbackConnlimit = 20;
1817

19-
// Check if u_clients exists and has the correct schema
20-
[[maybe_unused]] constexpr const char* kCreateUClients = R"(
21-
DO $$
22-
BEGIN
23-
IF EXISTS (
24-
SELECT 1
25-
FROM information_schema.columns
26-
WHERE table_name = 'u_clients'
27-
AND column_name = 'hostname'
28-
AND data_type = 'text'
29-
) AND EXISTS (
30-
SELECT 1
31-
FROM information_schema.columns
32-
WHERE table_name = 'u_clients'
33-
AND column_name = 'updated'
34-
AND data_type = 'timestamp with time zone'
35-
) AND EXISTS (
36-
SELECT 1
37-
FROM information_schema.columns
38-
WHERE table_name = 'u_clients'
39-
AND column_name = 'max_connections'
40-
AND data_type = 'integer'
41-
) AND EXISTS (
42-
SELECT 1
43-
FROM information_schema.columns
44-
WHERE table_name = 'u_clients'
45-
AND column_name = 'cur_user'
46-
AND data_type = 'text'
47-
) THEN
48-
RAISE NOTICE 'Drop u_clients';
49-
50-
DROP TABLE IF EXISTS u_clients;
51-
52-
CREATE TABLE u_clients (
53-
hostname TEXT PRIMARY KEY,
54-
updated TIMESTAMPTZ NOT NULL,
55-
max_connections INTEGER NOT NULL
56-
);
57-
ELSE
58-
RAISE NOTICE 'u_clients schema is ok';
59-
60-
CREATE TABLE IF NOT EXISTS u_clients (
61-
hostname TEXT PRIMARY KEY,
62-
updated TIMESTAMPTZ NOT NULL,
63-
max_connections INTEGER NOT NULL
64-
);
65-
END IF;
66-
END $$;
67-
)";
68-
6918
} // namespace
7019

7120
ConnlimitWatchdog::ConnlimitWatchdog(
7221
detail::ClusterImpl& cluster,
7322
testsuite::TestsuiteTasks& testsuite_tasks,
7423
int shard_number,
75-
std::function<void()> on_new_connlimit
24+
std::function<void()> on_new_connlimit,
25+
std::string host_name
7626
)
7727
: cluster_(cluster),
7828
connlimit_(0),
7929
on_new_connlimit_(std::move(on_new_connlimit)),
8030
testsuite_tasks_(testsuite_tasks),
81-
shard_number_(shard_number) {}
31+
shard_number_(shard_number),
32+
host_name_(std::move(host_name)) {}
8233

8334
void ConnlimitWatchdog::Start() {
8435
try {
8536
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
86-
current_user_ = trx.Execute("SELECT current_user").AsSingleRow<std::string>();
87-
trx.Execute(kCreateUClients);
37+
trx.Execute(R"(
38+
CREATE TABLE IF NOT EXISTS u_clients (
39+
hostname TEXT PRIMARY KEY,
40+
updated TIMESTAMPTZ NOT NULL,
41+
max_connections INTEGER NOT NULL
42+
);
43+
)");
44+
trx.Execute("ALTER TABLE u_clients ADD COLUMN IF NOT EXISTS cur_user TEXT");
45+
// Beware! Do **not** change queries in StepV*, but rather provide a new StepV* to avoid migration issues.
8846
trx.Commit();
8947
} catch (const storages::postgres::AccessRuleViolation& e) {
9048
// Possible in some CREATE TABLE IF NOT EXISTS races with other services
@@ -97,18 +55,18 @@ void ConnlimitWatchdog::Start() {
9755
if (testsuite_tasks_.IsEnabled()) {
9856
connlimit_ = kTestsuiteConnlimit;
9957
testsuite_tasks_.RegisterTask(
100-
fmt::format("connlimit_watchdog_{}_{}", cluster_.GetDbName(), shard_number_), [this] { Step(); }
58+
fmt::format("connlimit_watchdog_{}_{}", cluster_.GetDbName(), shard_number_), [this] { StepV1(); }
10159
);
10260
} else {
10361
periodic_.Start(
10462
"connlimit_watchdog",
10563
{std::chrono::seconds(2), {}, {USERVER_NAMESPACE::utils::PeriodicTask::Flags::kNow}},
106-
[this] { Step(); }
64+
[this] { StepV1(); }
10765
);
10866
}
10967
}
11068

111-
void ConnlimitWatchdog::Step() {
69+
void ConnlimitWatchdog::StepV1() {
11270
static auto kHostname = hostinfo::blocking::GetRealHostName();
11371
try {
11472
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
@@ -166,6 +124,65 @@ void ConnlimitWatchdog::Step() {
166124
on_new_connlimit_();
167125
}
168126

127+
void ConnlimitWatchdog::StepV2() {
128+
static auto kHostname = hostinfo::blocking::GetRealHostName();
129+
try {
130+
auto trx = cluster_.Begin({ClusterHostType::kMaster}, {}, kCommandControl);
131+
132+
auto max_connections1 = USERVER_NAMESPACE::utils::FromString<ssize_t>(
133+
trx.Execute("SHOW max_connections;").AsSingleRow<std::string>()
134+
);
135+
auto max_connections2 =
136+
trx.Execute("SELECT rolconnlimit FROM pg_roles WHERE rolname = current_user").AsSingleRow<ssize_t>();
137+
if (max_connections2 < 0) max_connections2 = max_connections1;
138+
size_t max_connections = std::min(max_connections1, max_connections2);
139+
140+
if (max_connections > kReservedConn)
141+
max_connections -= kReservedConn;
142+
else
143+
max_connections = 1;
144+
145+
trx.Execute(
146+
R"(
147+
INSERT INTO u_clients (hostname, updated, max_connections, cur_user) VALUES
148+
($1, NOW(), $2, current_user) ON CONFLICT (hostname) DO UPDATE SET updated = NOW(), max_connections = $2, cur_user = current_user
149+
)",
150+
host_name_,
151+
static_cast<int>(GetConnlimit())
152+
);
153+
154+
auto instances =
155+
trx.Execute(
156+
R"(SELECT count(*) FROM u_clients WHERE updated >= NOW() - make_interval(secs => 15) AND (cur_user = current_user OR cur_user is NULL))"
157+
)
158+
.AsSingleRow<int>();
159+
if (instances == 0) instances = 1;
160+
161+
auto connlimit = max_connections / instances;
162+
if (connlimit == 0) connlimit = 1;
163+
LOG((connlimit_ == connlimit) ? logging::Level::kDebug : logging::Level::kWarning)
164+
<< "max_connections = " << max_connections << ", instances = " << instances
165+
<< ", connlimit = " << connlimit;
166+
connlimit_ = connlimit;
167+
168+
trx.Commit();
169+
steps_with_errors_ = 0;
170+
} catch (const Error& e) {
171+
if (++steps_with_errors_ > kMaxStepsWithError) {
172+
/*
173+
* Something's wrong with PG server. Try to lower the load by lowering
174+
* max connection to a small value. Active connections will be gracefully
175+
* closed. When the server returns the response, we'll get the real
176+
* connlimit value. The period with "too low max_connections" should be
177+
* relatively small.
178+
*/
179+
connlimit_ = kFallbackConnlimit;
180+
}
181+
}
182+
183+
on_new_connlimit_();
184+
}
185+
169186
void ConnlimitWatchdog::Stop() { periodic_.Stop(); }
170187

171188
size_t ConnlimitWatchdog::GetConnlimit() const { return connlimit_.load(); }

postgresql/src/storages/postgres/connlimit_watchdog.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <atomic>
44

5+
#include <userver/hostinfo/blocking/get_hostname.hpp>
56
#include <userver/testsuite/tasks.hpp>
67
#include <userver/utils/periodic_task.hpp>
78

@@ -15,18 +16,21 @@ class ClusterImpl;
1516

1617
class ConnlimitWatchdog final {
1718
public:
18-
explicit ConnlimitWatchdog(
19+
ConnlimitWatchdog(
1920
detail::ClusterImpl& cluster,
2021
testsuite::TestsuiteTasks& testsuite_tasks,
2122
int shard_number,
22-
std::function<void()> on_new_connlimit
23+
std::function<void()> on_new_connlimit,
24+
std::string host_name = hostinfo::blocking::GetRealHostName()
2325
);
2426

2527
void Start();
2628

2729
void Stop();
2830

29-
void Step();
31+
// Beware! Do **not** change queries in StepV*, but rather provide a new StepV* to avoid migration issues.
32+
void StepV1();
33+
void StepV2();
3034

3135
size_t GetConnlimit() const;
3236

@@ -36,9 +40,9 @@ class ConnlimitWatchdog final {
3640
std::function<void()> on_new_connlimit_;
3741
testsuite::TestsuiteTasks& testsuite_tasks_;
3842
int steps_with_errors_{0};
39-
std::string current_user_;
4043
USERVER_NAMESPACE::utils::PeriodicTask periodic_;
4144
int shard_number_;
45+
std::string host_name_;
4246
};
4347

4448
} // namespace storages::postgres

0 commit comments

Comments
 (0)