Skip to content

Commit 03defc0

Browse files
committed
Add graylog system notification cleaner
1 parent 48fbbca commit 03defc0

File tree

1 file changed

+161
-0
lines changed

1 file changed

+161
-0
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import json
2+
import warnings
3+
from typing import Optional
4+
5+
import requests
6+
7+
#!/usr/bin/env -S uv --quiet run --script
8+
# /// script
9+
# requires-python = ">=3.13"
10+
# dependencies = [
11+
# "typer",
12+
# "requests",
13+
# ]
14+
# ///
15+
import typer
16+
from requests.auth import HTTPBasicAuth
17+
18+
warnings.filterwarnings(
19+
"ignore",
20+
".*Adding certificate verification is strongly advised.*",
21+
)
22+
23+
24+
app = typer.Typer(help="Graylog Notification Cleanup Tool")
25+
26+
27+
def delete_notification(
28+
session: requests.Session,
29+
base_url: str,
30+
notification_id: str,
31+
notification_type: str,
32+
) -> bool:
33+
url = f"{base_url}/api/system/notifications/{notification_type}/{notification_id}"
34+
35+
headers = {
36+
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:138.0) Gecko/20100101 Firefox/138.0",
37+
"Accept": "application/json",
38+
"Accept-Language": "en-US,en;q=0.5",
39+
"X-Requested-With": "XMLHttpRequest",
40+
"X-Requested-By": "XMLHttpRequest",
41+
"Content-Type": "application/json",
42+
"Origin": base_url,
43+
"Referer": f"{base_url}/system/overview",
44+
}
45+
46+
try:
47+
response = session.delete(url, headers=headers)
48+
response.raise_for_status()
49+
return True
50+
except requests.RequestException as e:
51+
typer.echo(f"Error deleting notification {notification_id}: {str(e)}", err=True)
52+
typer.echo(f"Response content: {response.text}", err=True)
53+
return False
54+
55+
56+
@app.command()
57+
def cleanup(
58+
graylog_url: str = typer.Option(
59+
default=...,
60+
envvar="GRAYLOG_HTTP_EXTERNAL_URI",
61+
prompt=False,
62+
help="Base Graylog URL",
63+
),
64+
username: Optional[str] = typer.Option(
65+
None,
66+
"--username",
67+
"-u",
68+
envvar="SERVICES_USER",
69+
prompt=False,
70+
help="Graylog username",
71+
),
72+
password: Optional[str] = typer.Option(
73+
None,
74+
"--password",
75+
"-p",
76+
envvar="SERVICES_PASSWORD",
77+
prompt=False,
78+
hide_input=True,
79+
help="Graylog password",
80+
),
81+
dry_run: bool = typer.Option(False, help="Simulate cleanup without deleting"),
82+
force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation prompt"),
83+
):
84+
# Validate authentication options
85+
if not (username and password):
86+
typer.echo(
87+
"Error: You must provide either username/password or an API token", err=True
88+
)
89+
raise typer.Exit(code=1)
90+
graylog_url = graylog_url.rstrip("/")
91+
92+
# Fetch notifications
93+
auth = HTTPBasicAuth(username, password) if username else None
94+
headers = {
95+
"Referer": f"{graylog_url}/system/overview",
96+
"X-Requested-With": "XMLHttpRequest",
97+
"X-Requested-By": "XMLHttpRequest",
98+
"X-Graylog-No-Session-Extension": "true",
99+
"Content-Type": "application/json",
100+
"Connection": "keep-alive",
101+
# Existing headers if there are any; otherwise, remove this comment
102+
}
103+
notifications_url = f"{graylog_url}/api/system/notifications"
104+
session = requests.Session()
105+
session.verify = False
106+
session.auth = auth # Graylog username is always "admin"
107+
# Modify the notification fetching section:
108+
try:
109+
response = session.get(
110+
notifications_url, auth=(username, password), headers=headers, verify=True
111+
)
112+
response.raise_for_status()
113+
114+
# Check for empty response
115+
if not response.content.strip():
116+
typer.echo("Error: Received empty response from server", err=True)
117+
raise typer.Exit(code=1)
118+
119+
notifications = response.json()["notifications"]
120+
except json.JSONDecodeError as exc:
121+
typer.echo(f"Invalid JSON response: {response.text[:200]}", err=True)
122+
raise typer.Exit(code=1) from exc
123+
124+
# Display summary
125+
typer.echo(f"Found {len(notifications)} notifications to process")
126+
if dry_run:
127+
typer.echo("Dry run enabled - no changes will be made")
128+
129+
if not force and not dry_run:
130+
typer.confirm("Are you sure you want to delete all notifications?", abort=True)
131+
132+
# Process notifications
133+
success_count = 0
134+
# print(notifications[0:5])
135+
# exit(1)
136+
for notification in notifications:
137+
n_type = notification.get("type")
138+
n_id = notification.get("key")
139+
140+
if not n_type or not n_id:
141+
typer.echo(
142+
f"Skipping notification due to missing type or id: {notification}"
143+
)
144+
continue
145+
146+
if dry_run:
147+
typer.echo(f"[DRY RUN] Would delete {n_type} notification {n_id}")
148+
success_count += 1
149+
continue
150+
151+
if delete_notification(session, graylog_url, n_id, n_type):
152+
typer.echo(f"Deleted {n_type} notification {n_id}")
153+
success_count += 1
154+
155+
typer.echo(
156+
f"Successfully processed {success_count}/{len(notifications)} notifications"
157+
)
158+
159+
160+
if __name__ == "__main__":
161+
app()

0 commit comments

Comments
 (0)