Skip to content

Commit 15a47db

Browse files
Add a tool to reinject murfey dlq messages (#358)
Here's the outlines of a tool to pick up murfey messages from rabbitmq, and either post them to the server or reinject to rabbitmq. It's probably functional in this form but feel free to suggest better ways to do this. Currently it is a bit clunky as it requires the token to be given then subprocess runs some zocalo commands. I'd suggest we leave this as an open PR until I've had the opportunity to test this on a real case.
1 parent 157f985 commit 15a47db

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ murfey = "murfey.client:run"
8383
"murfey.create_db" = "murfey.cli.create_db:run"
8484
"murfey.db_sql" = "murfey.cli.murfey_db_sql:run"
8585
"murfey.decrypt_password" = "murfey.cli.decrypt_db_password:run"
86+
"murfey.dlq_murfey" = "murfey.cli.dlq_resubmit:run"
8687
"murfey.generate_key" = "murfey.cli.generate_crypto_key:run"
8788
"murfey.generate_password" = "murfey.cli.generate_db_password:run"
8889
"murfey.instrument_server" = "murfey.instrument_server:run"

src/murfey/cli/dlq_resubmit.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import argparse
2+
import json
3+
import subprocess
4+
from pathlib import Path
5+
6+
import requests
7+
8+
9+
def handle_failed_posts(json_folder: Path, token: str):
10+
"""Deal with any messages that have been sent as failed client posts"""
11+
for json_file in json_folder.glob("*"):
12+
with open(json_file, "r") as json_data:
13+
message = json.load(json_data)
14+
15+
if not message.get("message") or not message["message"].get("url"):
16+
print(f"{json_file} is not a failed client post")
17+
continue
18+
dest = message["message"]["url"]
19+
message_json = message["message"]["json"]
20+
21+
response = requests.post(
22+
dest, json=message_json, headers={"Authorization": f"Bearer {token}"}
23+
)
24+
if response.status_code != 200:
25+
print(f"Failed to repost {json_file}")
26+
else:
27+
print(f"Reposted {json_file}")
28+
json_file.unlink()
29+
30+
31+
def handle_dlq_messages(json_folder: Path):
32+
"""Reinjected to the queue"""
33+
for json_file in json_folder.glob("*"):
34+
reinject_result = subprocess.run(
35+
["zocalo.dlq_reinject", "-e", "devrmq", str(json_file)],
36+
capture_output=True,
37+
)
38+
if reinject_result.returncode == 0:
39+
print(f"Reinjected {json_file}")
40+
json_file.unlink()
41+
else:
42+
print(f"Failed to reinject {json_file}")
43+
44+
45+
def run():
46+
"""
47+
Method of checking and purging murfey queues on rabbitmq
48+
Two types of messages are possible:
49+
- failed client posts which need reposting to the murfey server API
50+
- feedback messages that can be sent back to rabbitmq
51+
"""
52+
parser = argparse.ArgumentParser(
53+
description="Purge and reinject failed murfey messages"
54+
)
55+
parser.add_argument(
56+
"--queue",
57+
help="Queue to check and purge",
58+
required=True,
59+
)
60+
parser.add_argument(
61+
"--token",
62+
help="Murfey token",
63+
required=True,
64+
)
65+
args = parser.parse_args()
66+
67+
purge_result = subprocess.run(
68+
["zocalo.dlq_purge", "-e", "devrmq", args.queue],
69+
capture_output=True,
70+
)
71+
if purge_result.returncode != 0:
72+
print(f"Failed to purge {args.queue}")
73+
return
74+
purge_stdout = purge_result.stdout.decode("utf8")
75+
export_directories = []
76+
if "exported" in purge_stdout:
77+
for line in purge_stdout.split("\n"):
78+
if line.strip().startswith("DLQ/"):
79+
dlq_dir = "DLQ/" + line.split("/")[1]
80+
if dlq_dir not in export_directories:
81+
print(f"Found messages in {dlq_dir}")
82+
export_directories.append(dlq_dir)
83+
84+
if not export_directories:
85+
print("No exported messages found")
86+
return
87+
88+
for json_dir in export_directories:
89+
handle_failed_posts(Path(json_dir), args.token)
90+
handle_dlq_messages(Path(json_dir))
91+
print("Done")
92+
93+
94+
if __name__ == "__main__":
95+
run()

0 commit comments

Comments
 (0)