Skip to content

Commit 3499aa3

Browse files
committed
feature: divided WAL plugin into WAL and Replication plugins
1 parent 88c90e0 commit 3499aa3

File tree

6 files changed

+222
-181
lines changed

6 files changed

+222
-181
lines changed

mamonsu/plugins/pgsql/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import mamonsu.lib.platform as platform
22

33
__all__ = ['bgwriter', 'connections', 'databases']
4-
__all__ += ['health', 'instance', 'wal']
4+
__all__ += ['health', 'instance', 'wal', 'replication']
55
__all__ += ['statements', 'pg_buffercache', 'wait_sampling']
66
__all__ += ['checkpoint', 'oldest', 'pg_locks']
77
__all__ += ['cfs']

mamonsu/plugins/pgsql/replication.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from mamonsu.plugins.pgsql.plugin import PgsqlPlugin as Plugin
4+
from distutils.version import LooseVersion
5+
from .pool import Pooler
6+
from mamonsu.lib.zbx_template import ZbxTemplate
7+
8+
NUMBER_NON_ACTIVE_SLOTS = 0
9+
10+
11+
class Replication(Plugin):
12+
AgentPluginType = "pgsql"
13+
DEFAULT_CONFIG = {
14+
"lag_more_than_in_sec": str(60 * 5)
15+
}
16+
17+
# get time of replication lag
18+
query_agent_replication_lag = """
19+
SELECT CASE WHEN NOT pg_is_in_recovery() OR coalesce(pg_last_{1}(), '0/00000000') = coalesce(pg_last_{2}(), '0/00000000')
20+
THEN 0
21+
ELSE extract (epoch FROM now() - coalesce(pg_last_xact_replay_timestamp(), now() - INTERVAL '{0} seconds'))
22+
END;
23+
"""
24+
25+
# for discovery rule for name of each replica
26+
key_lsn_replication_discovery = "pgsql.replication.discovery{0}"
27+
key_total_lag = "pgsql.replication.total_lag{0}"
28+
# for PG 10 and higher
29+
key_flush = "pgsql.replication.flush_lag{0}"
30+
key_replay = "pgsql.replication.replay_lag{0}"
31+
key_write = "pgsql.replication.write_lag{0}"
32+
key_send = "pgsql.replication.send_lag{0}"
33+
key_receive = "pgsql.replication.receive_lag{0}"
34+
35+
key_replication = "pgsql.replication_lag{0}"
36+
key_non_active_slots = "pgsql.replication.non_active_slots{0}"
37+
38+
def run(self, zbx):
39+
40+
if Pooler.server_version_greater("10.0"):
41+
lag = Pooler.run_sql_type("replication_lag_slave_query",
42+
args=[self.plugin_config("interval"), "wal_receive_lsn", "wal_replay_lsn"])
43+
else:
44+
lag = Pooler.run_sql_type("replication_lag_slave_query",
45+
args=[self.plugin_config("interval"), "xlog_receive_location",
46+
"xlog_replay_location"])
47+
if lag[0][0] is not None:
48+
zbx.send("pgsql.replication_lag[sec]", float(lag[0][0]))
49+
50+
if not Pooler.in_recovery():
51+
Pooler.run_sql_type("replication_lag_master_query")
52+
if Pooler.server_version_greater("10.0") and (Pooler.is_superuser() or Pooler.is_bootstraped()):
53+
result_lags = Pooler.run_sql_type("wal_lag_lsn",
54+
args=[" (pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn))::int AS send_lag, "
55+
"(pg_wal_lsn_diff(sent_lsn, flush_lsn))::int AS receive_lag, "
56+
"(pg_wal_lsn_diff(sent_lsn, write_lsn))::int AS write_lag, "
57+
"(pg_wal_lsn_diff(write_lsn, flush_lsn))::int AS flush_lag, "
58+
"(pg_wal_lsn_diff(flush_lsn, replay_lsn))::int AS replay_lag, " if not Pooler.is_bootstraped() else
59+
" send_lag, receive_lag, write_lag, flush_lag, replay_lag, ",
60+
"wal", "lsn"])
61+
if result_lags:
62+
lags = []
63+
for info in result_lags:
64+
lags.append({"{#APPLICATION_NAME}": info[0]})
65+
zbx.send("pgsql.replication.total_lag[{0}]".format(info[0]), float(info[6]))
66+
zbx.send("pgsql.replication.send_lag[{0}]".format(info[0]), float(info[1]))
67+
zbx.send("pgsql.replication.receive_lag[{0}]".format(info[0]), float(info[2]))
68+
zbx.send("pgsql.replication.write_lag[{0}]".format(info[0]), float(info[3]))
69+
zbx.send("pgsql.replication.flush_lag[{0}]".format(info[0]), float(info[4]))
70+
zbx.send("pgsql.replication.replay_lag[{0}]".format(info[0]), float(info[5]))
71+
zbx.send("pgsql.replication.discovery[]", zbx.json({"data": lags}))
72+
del lags
73+
elif Pooler.is_superuser() or Pooler.is_bootstraped():
74+
result_lags = Pooler.run_sql_type("wal_lag_lsn", args=[" ", "xlog", "location"])
75+
if result_lags:
76+
lags = []
77+
for info in result_lags:
78+
lags.append({"{#APPLICATION_NAME}": info[0]})
79+
zbx.send("pgsql.replication.total_lag[{0}]".format(info[0]), float(info[1]))
80+
zbx.send("pgsql.replication.discovery[]", zbx.json({"data": lags}))
81+
del lags
82+
else:
83+
self.disable_and_exit_if_not_superuser()
84+
85+
non_active_slots = Pooler.query("""
86+
SELECT count(*)
87+
FROM pg_replication_slots
88+
WHERE active = 'false';
89+
""")
90+
zbx.send(self.key_non_active_slots.format("[]"), int(non_active_slots[0][0]))
91+
92+
93+
def items(self, template, dashboard=False):
94+
result = ""
95+
if self.Type == "mamonsu":
96+
delta = Plugin.DELTA.as_is
97+
else:
98+
delta = Plugin.DELTA_SPEED
99+
result += template.item({
100+
"name": "PostgreSQL Replication: Streaming Replication Lag",
101+
"key": self.right_type(self.key_replication, "sec"),
102+
"delay": self.plugin_config("interval")
103+
}) + template.item({
104+
"name": "PostgreSQL Replication: Count Non-Active Replication Slots",
105+
"key": self.right_type(self.key_non_active_slots),
106+
"value_type": self.VALUE_TYPE.numeric_unsigned,
107+
})
108+
if not dashboard:
109+
return result
110+
else:
111+
return []
112+
113+
def triggers(self, template, dashboard=False):
114+
triggers = template.trigger({
115+
"name": "PostgreSQL streaming lag too high on {HOSTNAME} (value={ITEM.LASTVALUE})",
116+
"expression": "{#TEMPLATE:" + self.right_type(self.key_replication,
117+
"sec") + ".last()}>" + self.plugin_config(
118+
"lag_more_than_in_sec")
119+
}) + template.trigger({
120+
"name": "PostgreSQL number of non-active replication slots on {HOSTNAME} (value={ITEM.LASTVALUE})",
121+
"expression": "{#TEMPLATE:" + self.right_type(self.key_non_active_slots) + ".last()}>" + str(
122+
NUMBER_NON_ACTIVE_SLOTS)
123+
})
124+
return triggers
125+
126+
def discovery_rules(self, template, dashboard=False):
127+
rule = {
128+
"name": "Replication lag discovery",
129+
"key": self.key_lsn_replication_discovery.format("[{0}]".format(self.Macros[self.Type]))
130+
}
131+
if Plugin.old_zabbix:
132+
conditions = []
133+
rule["filter"] = "{#APPLICATION_NAME}:.*"
134+
else:
135+
conditions = [{
136+
"condition": [
137+
{"macro": "{#APPLICATION_NAME}",
138+
"value": ".*",
139+
"operator": 8,
140+
"formulaid": "A"}
141+
]
142+
}]
143+
items = [
144+
{"key": self.right_type(self.key_send, var_discovery="{#APPLICATION_NAME},"),
145+
"name": "PostgreSQL WAL Send Lag: Time elapsed sending recent WAL locally on {#APPLICATION_NAME}",
146+
"value_type": Plugin.VALUE_TYPE.numeric_float,
147+
"delay": self.plugin_config("interval"),
148+
"drawtype": 2},
149+
{"key": self.right_type(self.key_receive, var_discovery="{#APPLICATION_NAME},"),
150+
"name": "PostgreSQL WAL Receive Lag: Time elapsed between receiving recent WAL locally and receiving notification that "
151+
"this standby server {#APPLICATION_NAME} has flushed it",
152+
"value_type": Plugin.VALUE_TYPE.numeric_float,
153+
"delay": self.plugin_config("interval"),
154+
"drawtype": 2},
155+
{"key": self.right_type(self.key_write, var_discovery="{#APPLICATION_NAME},"),
156+
"name": "PostgreSQL WAL Write Lag: Time elapsed between flushing recent WAL locally and receiving notification that "
157+
"this standby server {#APPLICATION_NAME} has written it",
158+
"value_type": Plugin.VALUE_TYPE.numeric_float,
159+
"delay": self.plugin_config("interval"),
160+
"drawtype": 2},
161+
{"key": self.right_type(self.key_flush, var_discovery="{#APPLICATION_NAME},"),
162+
"name": "PostgreSQL WAL Flush Lag: Time elapsed between flushing recent WAL locally and receiving notification that "
163+
"this standby server {#APPLICATION_NAME} has written and flushed it",
164+
"value_type": Plugin.VALUE_TYPE.numeric_float,
165+
"delay": self.plugin_config("interval"),
166+
"drawtype": 2},
167+
{"key": self.right_type(self.key_replay, var_discovery="{#APPLICATION_NAME},"),
168+
"name": "PostgreSQL WAL Replay Lag: Time elapsed between flushing recent WAL locally and receiving notification that "
169+
"this standby server {#APPLICATION_NAME} has written, flushed and applied",
170+
"value_type": Plugin.VALUE_TYPE.numeric_float,
171+
"delay": self.plugin_config("interval"),
172+
"drawtype": 2},
173+
{"key": self.right_type(self.key_total_lag, var_discovery="{#APPLICATION_NAME},"),
174+
"name": "Delta of total lag for {#APPLICATION_NAME}",
175+
"value_type": Plugin.VALUE_TYPE.numeric_float,
176+
"delay": self.plugin_config("interval"),
177+
"drawtype": 2}
178+
]
179+
graphs = [
180+
{
181+
"name": "Delta of total lag for {#APPLICATION_NAME}",
182+
"items": [
183+
{"color": "8B817C",
184+
"key": self.right_type(self.key_total_lag, var_discovery="{#APPLICATION_NAME},")},
185+
]
186+
}
187+
]
188+
return template.discovery_rule(rule=rule, conditions=conditions, items=items, graphs=graphs)
189+
190+
def keys_and_queries(self, template_zabbix):
191+
result = []
192+
if LooseVersion(self.VersionPG) < LooseVersion("10"):
193+
result.append("{0},$2 $1 -c \"{1}\"".format("pgsql.replication_lag.sec[*]",
194+
self.query_agent_replication_lag.format(
195+
self.plugin_config("interval"), "xlog_receive_location",
196+
"xlog_replay_location")))
197+
else:
198+
result.append("{0},$2 $1 -c \"{1}\"".format("pgsql.replication_lag.sec[*]",
199+
self.query_agent_replication_lag.format(
200+
self.plugin_config("interval"), "wal_receive_lsn",
201+
"wal_replay_lsn")))
202+
return template_zabbix.key_and_query(result)

0 commit comments

Comments
 (0)