Skip to content

Commit 8315dc9

Browse files
committed
feature: new Replication metrics
1 parent 4f648b1 commit 8315dc9

File tree

4 files changed

+49
-24
lines changed

4 files changed

+49
-24
lines changed

mamonsu/plugins/pgsql/driver/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class Pool(object):
6565
"""
6666
SELECT application_name,
6767
{0}
68-
pg_{1}_{2}_diff(pg_current_{1}_{2}(), replay_{2}) AS total_lag
68+
(pg_{1}_{2}_diff(pg_current_{1}_{2}(), replay_lsn))::int AS total_lag
6969
FROM pg_stat_replication;
7070
""",
7171
"""

mamonsu/plugins/pgsql/wal.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
class Wal(Plugin):
12-
AgentPluginType = "pg"
12+
AgentPluginType = "pgsql"
1313
DEFAULT_CONFIG = {
1414
"lag_more_than_in_sec": str(60 * 5)
1515
}
@@ -53,6 +53,8 @@ class Wal(Plugin):
5353
key_flush = "pgsql.replication.flush_lag{0}"
5454
key_replay = "pgsql.replication.replay_lag{0}"
5555
key_write = "pgsql.replication.write_lag{0}"
56+
key_send = "pgsql.replication.send_lag{0}"
57+
key_receive = "pgsql.replication.receive_lag{0}"
5658

5759
# keys for PG 14 and higher
5860
key_wal_records = "pgsql.wal.records.count{0}"
@@ -84,15 +86,23 @@ def run(self, zbx):
8486
if Pooler.server_version_greater("10.0"):
8587
result = Pooler.query(self.query_wal_lsn_diff)
8688
result_lags = Pooler.run_sql_type("wal_lag_lsn",
87-
args=[" flush_lag, replay_lag, write_lag, ", "wal", "lsn"])
89+
args=[" (pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn))::int AS send_lag, "
90+
"(pg_wal_lsn_diff(sent_lsn, flush_lsn))::int AS receive_lag, "
91+
"(pg_wal_lsn_diff(sent_lsn, write_lsn))::int AS write_lag, "
92+
"(pg_wal_lsn_diff(write_lsn, flush_lsn))::int AS flush_lag, "
93+
"(pg_wal_lsn_diff(flush_lsn, replay_lsn))::int AS replay_lag, " if not Pooler.is_bootstraped() else
94+
" send_lag, receive_lag, write_lag, flush_lag, replay_lag, ",
95+
"wal", "lsn"])
8896
if result_lags:
8997
lags = []
9098
for info in result_lags:
9199
lags.append({"{#APPLICATION_NAME}": info[0]})
92-
zbx.send("pgsql.replication.flush_lag[{0}]".format(info[0]), info[1])
93-
zbx.send("pgsql.replication.replay_lag[{0}]".format(info[0]), info[2])
100+
zbx.send("pgsql.replication.send_lag[{0}]".format(info[0]), info[1])
101+
zbx.send("pgsql.replication.receive_lag[{0}]".format(info[0]), info[2])
94102
zbx.send("pgsql.replication.write_lag[{0}]".format(info[0]), info[3])
95-
zbx.send("pgsql.replication.total_lag[{0}]".format(info[0]), float(info[4]))
103+
zbx.send("pgsql.replication.flush_lag[{0}]".format(info[0]), info[4])
104+
zbx.send("pgsql.replication.replay_lag[{0}]".format(info[0]), info[5])
105+
zbx.send("pgsql.replication.total_lag[{0}]".format(info[0]), float(info[6]))
96106
zbx.send("pgsql.replication.discovery[]", zbx.json({"data": lags}))
97107
del lags
98108
else:
@@ -242,22 +252,33 @@ def discovery_rules(self, template, dashboard=False):
242252
]
243253
}]
244254
items = [
255+
{"key": self.right_type(self.key_send, var_discovery="{#APPLICATION_NAME},"),
256+
"name": "Time elapsed sending recent WAL locally on {#APPLICATION_NAME}",
257+
"value_type": Plugin.VALUE_TYPE.numeric_float,
258+
"delay": self.plugin_config("interval"),
259+
"drawtype": 2},
260+
{"key": self.right_type(self.key_receive, var_discovery="{#APPLICATION_NAME},"),
261+
"name": "Time elapsed between receiving recent WAL locally and receiving notification that "
262+
"this standby server {#APPLICATION_NAME} has flushed it",
263+
"value_type": Plugin.VALUE_TYPE.numeric_float,
264+
"delay": self.plugin_config("interval"),
265+
"drawtype": 2},
266+
{"key": self.right_type(self.key_write, var_discovery="{#APPLICATION_NAME},"),
267+
"name": "Time elapsed between flushing recent WAL locally and receiving notification that "
268+
"this standby server {#APPLICATION_NAME} has written it",
269+
"value_type": Plugin.VALUE_TYPE.numeric_float,
270+
"delay": self.plugin_config("interval"),
271+
"drawtype": 2},
245272
{"key": self.right_type(self.key_flush, var_discovery="{#APPLICATION_NAME},"),
246273
"name": "Time elapsed between flushing recent WAL locally and receiving notification that "
247274
"this standby server {#APPLICATION_NAME} has written and flushed it",
248-
"value_type": Plugin.VALUE_TYPE.text,
275+
"value_type": Plugin.VALUE_TYPE.numeric_float,
249276
"delay": self.plugin_config("interval"),
250277
"drawtype": 2},
251278
{"key": self.right_type(self.key_replay, var_discovery="{#APPLICATION_NAME},"),
252279
"name": "Time elapsed between flushing recent WAL locally and receiving notification that "
253280
"this standby server {#APPLICATION_NAME} has written, flushed and applied",
254-
"value_type": Plugin.VALUE_TYPE.text,
255-
"delay": self.plugin_config("interval"),
256-
"drawtype": 2},
257-
{"key": self.right_type(self.key_write, var_discovery="{#APPLICATION_NAME},"),
258-
"name": "Time elapsed between flushing recent WAL locally and receiving notification that "
259-
"this standby server {#APPLICATION_NAME} has written it",
260-
"value_type": Plugin.VALUE_TYPE.text,
281+
"value_type": Plugin.VALUE_TYPE.numeric_float,
261282
"delay": self.plugin_config("interval"),
262283
"drawtype": 2},
263284
{"key": self.right_type(self.key_total_lag, var_discovery="{#APPLICATION_NAME},"),

mamonsu/tools/bootstrap/sql.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
$$ LANGUAGE SQL SECURITY DEFINER;
134134
135135
CREATE OR REPLACE FUNCTION mamonsu.archive_command_files()
136-
RETURNS TABLE(COUNT_FILES BIGINT, SIZE_FILES BIGINT) AS $$
136+
RETURNS TABLE(files_count BIGINT, files_size BIGINT) AS $$
137137
WITH values AS (
138138
SELECT
139139
4096/(ceil(pg_settings.setting::numeric/1024/1024)) AS segment_parts_count,
@@ -146,13 +146,13 @@
146146
('x' || substring(pg_{10}_name(pg_current_{4}()) from 17 for 8))::bit(32)::int END AS current_wal_mod
147147
FROM pg_settings, pg_stat_archiver
148148
WHERE pg_settings.name = 'wal_segment_size')
149-
SELECT greatest(coalesce((segment_parts_count - last_wal_mod) + ((current_wal_div - last_wal_div - 1) * segment_parts_count) + current_wal_mod - 1, 0), 0)::bigint AS count_files,
150-
greatest(coalesce(((segment_parts_count - last_wal_mod) + ((current_wal_div - last_wal_div - 1) * segment_parts_count) + current_wal_mod - 1) * segment_size, 0), 0)::bigint AS size_files
149+
SELECT greatest(coalesce((segment_parts_count - last_wal_mod) + ((current_wal_div - last_wal_div - 1) * segment_parts_count) + current_wal_mod - 1, 0), 0)::bigint AS files_count,
150+
greatest(coalesce(((segment_parts_count - last_wal_mod) + ((current_wal_div - last_wal_div - 1) * segment_parts_count) + current_wal_mod - 1) * segment_size, 0), 0)::bigint AS files_size
151151
FROM values
152152
$$ LANGUAGE SQL SECURITY DEFINER;
153153
154154
CREATE OR REPLACE FUNCTION mamonsu.archive_stat()
155-
RETURNS TABLE(ARCHIVED_COUNT BIGINT, FAILED_COUNT BIGINT) AS $$
155+
RETURNS TABLE(archived_count BIGINT, failed_count BIGINT) AS $$
156156
SELECT archived_count, failed_count from pg_stat_archiver
157157
$$ LANGUAGE SQL SECURITY DEFINER;
158158
@@ -169,9 +169,9 @@
169169
170170
CREATE OR REPLACE FUNCTION mamonsu.count_{3}_lag_lsn()
171171
RETURNS TABLE(application_name TEXT, {8} total_lag NUMERIC ) AS $$
172-
SELECT
173-
CONCAT(application_name, ' ', pid) as application_name,
174-
{6} coalesce(pg_{7}_diff(pg_current_{7}(), replay_{9}), 0) AS total_lag
172+
SELECT application_name,
173+
{6}
174+
(pg_{7}_diff(pg_current_{7}(), replay_{9}))::int AS total_lag
175175
FROM pg_stat_replication
176176
$$ LANGUAGE SQL SECURITY DEFINER;
177177
"""
@@ -208,7 +208,7 @@
208208
extension_schema text;
209209
BEGIN
210210
CREATE EXTENSION IF NOT EXISTS pgpro_stats WITH SCHEMA mamonsu;
211-
211+
212212
WITH tb_type AS (SELECT exists(SELECT * FROM pg_proc WHERE proname = 'pgpro_version'))
213213
SELECT
214214
CASE

mamonsu/tools/bootstrap/start.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,13 @@ def fill_query_params(queries):
166166
'wal_lsn' if Pooler.server_version_greater('10.0') else 'xlog_location',
167167
'waiting' if Pooler.server_version_less('9.6.0') else 'case when wait_event_type is null then false '
168168
' else true end as waiting',
169-
'flush_lag, replay_lag, write_lag,' if Pooler.server_version_greater('10.0') else '',
169+
'(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn))::int AS send_lag, '
170+
'(pg_wal_lsn_diff(sent_lsn, flush_lsn))::int AS receive_lag, '
171+
'(pg_wal_lsn_diff(sent_lsn, write_lsn))::int AS write_lag, '
172+
'(pg_wal_lsn_diff(write_lsn, flush_lsn))::int AS flush_lag, '
173+
'(pg_wal_lsn_diff(flush_lsn, replay_lsn))::int AS replay_lag,' if Pooler.server_version_greater('10.0') else '',
170174
'wal_lsn' if Pooler.server_version_greater('10.0') else 'xlog_location',
171-
'flush_lag INTERVAL, replay_lag INTERVAL, write_lag INTERVAL,' if Pooler.server_version_greater('10.0')
175+
'send_lag INTEGER, receive_lag INTEGER, write_lag INTEGER, flush_lag INTEGER, replay_lag INTEGER,' if Pooler.server_version_greater('10.0')
172176
else '',
173177
'lsn' if Pooler.server_version_greater('10.0') else 'location',
174178
'walfile' if Pooler.server_version_greater('10.0') else 'xlogfile',

0 commit comments

Comments
 (0)