Skip to content

Commit 275b5d8

Browse files
feat: add Charmed Kafka plugins
Signed-off-by: Marc Oppenheimer <marcaoppenheimer@gmail.com>
1 parent 4fff766 commit 275b5d8

File tree

3 files changed

+515
-0
lines changed

3 files changed

+515
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from datetime import datetime, timedelta
2+
import json
3+
from sos.report.plugins import Plugin, UbuntuPlugin, PluginOpt
4+
import glob
5+
import re
6+
7+
PATHS = {
8+
"CONF": f"/var/snap/charmed-kafka/current/etc/cruise-control",
9+
"LOGS": f"/var/snap/charmed-kafka/common/var/log/cruise-control",
10+
}
11+
12+
DATE_FORMAT = "%Y-%m-%d-%H"
13+
14+
15+
class CharmedCruiseControl(Plugin, UbuntuPlugin):
16+
17+
short_desc = "Cruise Control (from Charmed Kafka)"
18+
plugin_name = "charmed_cruise_control"
19+
20+
current_date = datetime.now()
21+
default_date_from = "1970-01-01-00"
22+
default_date_to = (current_date + timedelta(hours=1)).strftime(DATE_FORMAT)
23+
24+
option_list = [
25+
PluginOpt(
26+
"date-from",
27+
default="1970-01-01-00",
28+
desc="date from which to filter logs, in format YYYY-MM-DD-HH",
29+
val_type=str,
30+
),
31+
PluginOpt(
32+
"date-to",
33+
default=default_date_to,
34+
desc="date to which to filter logs, in format YYYY-MM-DD-HH",
35+
val_type=str,
36+
),
37+
]
38+
39+
@property
40+
def credentials_args(self) -> str:
41+
with open(f"{PATHS['CONF']}/cruisecontrol.credentials") as f:
42+
content = f.read().strip()
43+
44+
if match := re.match(r"balancer: (?P<pwd>\w+),ADMIN", content):
45+
pwd = match.group("pwd")
46+
return f"-u balancer:{pwd}"
47+
else:
48+
return ""
49+
50+
def setup(self):
51+
# --- FILE EXCLUSIONS ---
52+
53+
for file in glob.glob(f"{PATHS['LOGS']}/*"):
54+
date = re.search(
55+
pattern=r"([0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2})", string=file
56+
)
57+
58+
# include files without date, aka current files
59+
if not date:
60+
continue
61+
62+
file_dt = datetime.strptime(date.group(1), DATE_FORMAT)
63+
64+
if file_dt < datetime.strptime(
65+
str(self.get_option("date-from")), DATE_FORMAT
66+
) or file_dt > datetime.strptime(
67+
str(self.get_option("date-to")), DATE_FORMAT
68+
):
69+
# skip files outside given range
70+
self.add_forbidden_path(file)
71+
72+
# hide keys/stores
73+
self.add_forbidden_path(
74+
[
75+
f"{PATHS['CONF']}/*.pem",
76+
f"{PATHS['CONF']}/*.key",
77+
f"{PATHS['CONF']}/*.p12",
78+
f"{PATHS['CONF']}/*.jks",
79+
]
80+
)
81+
82+
# --- FILE INCLUSIONS ---
83+
84+
self.add_copy_spec(
85+
[
86+
f"{PATHS['CONF']}",
87+
f"{PATHS['LOGS']}",
88+
"/var/log/juju",
89+
]
90+
)
91+
92+
# --- SNAP LOGS ---
93+
94+
self.add_cmd_output(
95+
"snap logs charmed-kafka.cruise-control -n 100000",
96+
suggest_filename="snap-logs",
97+
)
98+
99+
# --- CRUISE CONTROL STATE ---
100+
101+
self.add_cmd_output(
102+
f"curl {self.credentials_args} localhost:9090/kafkacruisecontrol/state?super_verbose=true",
103+
"cruise-control-state",
104+
)
105+
106+
# --- CLUSTER STATE ---
107+
108+
self.add_cmd_output(
109+
f"curl {self.credentials_args} localhost:9090/kafkacruisecontrol/kafka_cluster_state?verbose=true",
110+
"cluster-state",
111+
)
112+
113+
# --- PARTITION LOAD ---
114+
115+
self.add_cmd_output(
116+
f"curl {self.credentials_args} localhost:9090/kafkacruisecontrol/partition_load",
117+
"partition-load",
118+
)
119+
120+
# --- USER TASKS ---
121+
122+
self.add_cmd_output(
123+
f"curl {self.credentials_args} localhost:9090/kafkacruisecontrol/user_tasks",
124+
"user-tasks",
125+
)
126+
127+
# --- JMX METRICS ---
128+
129+
self.add_cmd_output("curl localhost:9102/metrics", "jmx-metrics")
130+
131+
def postproc(self):
132+
# --- SCRUB PASSWORDS ---
133+
134+
for scrub_pattern in [r'(password=")[^"]*', r"(balancer: )[^,]*"]:
135+
self.do_path_regex_sub(
136+
f"{PATHS['CONF']}/*",
137+
scrub_pattern,
138+
r"\1*********",
139+
)
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
from datetime import datetime, timedelta
2+
import json
3+
from sos.report.plugins import Plugin, UbuntuPlugin, PluginOpt
4+
import glob
5+
import re
6+
7+
PATHS = {
8+
"CONF": "/var/snap/charmed-kafka/current/etc/kafka",
9+
"LOGS": "/var/snap/charmed-kafka/common/var/log/kafka",
10+
}
11+
12+
DATE_FORMAT = "%Y-%m-%d-%H"
13+
14+
15+
class CharmedKafka(Plugin, UbuntuPlugin):
16+
short_desc = "Charmed Kafka"
17+
plugin_name = "charmed_kafka"
18+
19+
current_date = datetime.now()
20+
default_date_from = "1970-01-01-00"
21+
default_date_to = (current_date + timedelta(hours=1)).strftime(DATE_FORMAT)
22+
23+
option_list = [
24+
PluginOpt(
25+
"date-from",
26+
default="1970-01-01-00",
27+
desc="date from which to filter logs, in format YYYY-MM-DD-HH",
28+
val_type=str,
29+
),
30+
PluginOpt(
31+
"date-to",
32+
default=default_date_to,
33+
desc="date to which to filter logs, in format YYYY-MM-DD-HH",
34+
val_type=str,
35+
),
36+
]
37+
38+
@property
39+
def bootstrap_server(self) -> str | None:
40+
lines = []
41+
with open(f"{PATHS['CONF']}/client.properties") as f:
42+
lines = f.readlines()
43+
44+
for line in lines:
45+
if "bootstrap" in line:
46+
# ensure using internal address if not set in client.properties
47+
return re.sub(r":(?!1)(\d+)", r":1\1", line.split("=")[1])
48+
49+
@property
50+
def default_bin_args(self) -> str:
51+
if not self.bootstrap_server:
52+
return ""
53+
54+
return f"--bootstrap-server {self.bootstrap_server} --command-config {PATHS['CONF']}/client.properties"
55+
56+
def setup(self):
57+
# --- FILE EXCLUSIONS ---
58+
59+
for file in glob.glob(f"{PATHS['LOGS']}/*"):
60+
date = re.search(
61+
pattern=r"([0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2})", string=file
62+
)
63+
64+
# include files without date, aka current files
65+
if not date:
66+
continue
67+
68+
file_dt = datetime.strptime(date.group(1), DATE_FORMAT)
69+
70+
if file_dt < datetime.strptime(
71+
str(self.get_option("date-from")), DATE_FORMAT
72+
) or file_dt > datetime.strptime(
73+
str(self.get_option("date-to")), DATE_FORMAT
74+
):
75+
# skip files outside given range
76+
self.add_forbidden_path(file)
77+
78+
# hide keys/stores
79+
self.add_forbidden_path(
80+
[
81+
f"{PATHS['CONF']}/*.pem",
82+
f"{PATHS['CONF']}/*.key",
83+
f"{PATHS['CONF']}/*.p12",
84+
f"{PATHS['CONF']}/*.jks",
85+
]
86+
)
87+
88+
# --- FILE INCLUSIONS ---
89+
90+
self.add_copy_spec(
91+
[
92+
f"{PATHS['CONF']}",
93+
f"{PATHS['LOGS']}",
94+
"/var/log/juju",
95+
]
96+
)
97+
98+
# --- SNAP LOGS ---
99+
100+
self.add_cmd_output(
101+
"snap logs charmed-kafka.daemon -n 100000", suggest_filename="snap-logs"
102+
)
103+
104+
# --- TOPICS ---
105+
106+
self.add_cmd_output(
107+
f"charmed-kafka.topics --describe {self.default_bin_args}",
108+
env={"KAFKA_OPTS": ""},
109+
suggest_filename="kafka-topics",
110+
)
111+
112+
# --- CONFIGS ---
113+
114+
for entity in ["topics", "clients", "users", "brokers", "ips"]:
115+
self.add_cmd_output(
116+
f"charmed-kafka.configs --describe --all --entity-type {entity} {self.default_bin_args}",
117+
env={"KAFKA_OPTS": ""},
118+
suggest_filename=f"kafka-configs-{entity}",
119+
)
120+
121+
# --- ACLs ---
122+
123+
self.add_cmd_output(
124+
f"charmed-kafka.acls --list {self.default_bin_args}",
125+
env={"KAFKA_OPTS": ""},
126+
suggest_filename="kafka-acls",
127+
)
128+
129+
# --- LOG DIRS ---
130+
131+
log_dirs_output = self.exec_cmd(
132+
f"charmed-kafka.log-dirs --describe {self.default_bin_args}",
133+
env={"KAFKA_OPTS": ""},
134+
)
135+
log_dirs = {}
136+
137+
if log_dirs_output and log_dirs_output["status"] == 0:
138+
for line in log_dirs_output["output"].splitlines():
139+
try:
140+
log_dirs = json.loads(line)
141+
break
142+
except json.JSONDecodeError:
143+
continue
144+
145+
with self.collection_file("kafka-log-dirs") as f:
146+
f.write(json.dumps(log_dirs, indent=4))
147+
148+
# --- TRANSACTIONS ---
149+
150+
transactions_list = self.exec_cmd(
151+
f"charmed-kafka.transactions {self.default_bin_args} list",
152+
env={"KAFKA_OPTS": ""},
153+
)
154+
transactional_ids = []
155+
156+
if transactions_list and transactions_list["status"] == 0:
157+
transactional_ids = transactions_list["output"].splitlines()[1:]
158+
159+
transactions_outputs = []
160+
for transactional_id in transactional_ids:
161+
transactions_describe = self.exec_cmd(
162+
f"charmed-kafka.transactions {self.default_bin_args} describe --transactional-id {transactional_id}"
163+
)
164+
165+
if transactions_describe and transactions_describe["status"] == 0:
166+
transactions_outputs.append(transactions_describe["output"])
167+
168+
with self.collection_file("kafka-transactions") as f:
169+
f.write("\n".join(transactions_outputs))
170+
171+
# --- JMX METRICS ---
172+
173+
self.add_cmd_output("curl localhost:9101/metrics", "jmx-metrics")
174+
175+
def postproc(self):
176+
# --- SCRUB PASSWORDS ---
177+
178+
self.do_path_regex_sub(
179+
f"{PATHS['CONF']}/*",
180+
r'(password=")[^"]*',
181+
r"\1*********",
182+
)

0 commit comments

Comments
 (0)