Skip to content

Commit 751a462

Browse files
committed
Update the dlq reposting tool to not require zocalo
1 parent 3742e0e commit 751a462

File tree

1 file changed

+146
-48
lines changed

1 file changed

+146
-48
lines changed

src/murfey/cli/dlq_resubmit.py

Lines changed: 146 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,122 @@
11
import argparse
22
import json
3-
import subprocess
3+
import os
4+
import time
5+
from datetime import datetime
6+
from functools import partial
47
from pathlib import Path
8+
from queue import Empty, Queue
59

610
import requests
11+
from jose import jwt
12+
from workflows.transport.pika_transport import PikaTransport
713

14+
dlq_dump_path = Path("./DLQ")
815

9-
def handle_failed_posts(json_folder: Path, token: str):
16+
17+
def dlq_purge(queue: str, rabbitmq_credentials: Path) -> list[Path]:
18+
transport = PikaTransport()
19+
transport.load_configuration_file(rabbitmq_credentials)
20+
transport.connect()
21+
22+
queue_to_purge = "dlq." + queue
23+
idlequeue: Queue = Queue()
24+
exported_messages = []
25+
26+
def receive_dlq_message(header: dict, message: dict) -> None:
27+
idlequeue.put_nowait("start")
28+
header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"])
29+
30+
timestamp = time.localtime(int(header["x-death"][0]["time"]))
31+
filepath = dlq_dump_path / time.strftime("%Y-%m-%d", timestamp)
32+
filepath.mkdir(parents=True, exist_ok=True)
33+
filename = filepath / (
34+
f"{queue}-"
35+
+ time.strftime("%Y%m%d-%H%M%S", timestamp)
36+
+ "-"
37+
+ str(header["message-id"])
38+
)
39+
40+
dlqmsg = {
41+
"exported": {
42+
"date": time.strftime("%Y-%m-%d"),
43+
"time": time.strftime("%H:%M:%S"),
44+
},
45+
"header": header,
46+
"message": message,
47+
}
48+
49+
with filename.open("w") as fh:
50+
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
51+
print(f"Message {header['message-id']} exported to {filename}")
52+
exported_messages.append(filename)
53+
transport.ack(header)
54+
idlequeue.put_nowait("done")
55+
56+
print("Looking for DLQ messages in " + queue_to_purge)
57+
transport.subscribe(
58+
queue_to_purge,
59+
partial(receive_dlq_message),
60+
acknowledgement=True,
61+
)
62+
try:
63+
idlequeue.get(True, 3)
64+
while True:
65+
idlequeue.get(True, 0.1)
66+
except Empty:
67+
print("Done.")
68+
transport.disconnect()
69+
return exported_messages
70+
71+
72+
def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
73+
transport = PikaTransport()
74+
transport.load_configuration_file(rabbitmq_credentials)
75+
transport.connect()
76+
77+
for f, dlqfile in enumerate(messages_path):
78+
if not Path(dlqfile).is_file():
79+
print(f"Ignoring missing file {dlqfile}")
80+
continue
81+
with open(dlqfile) as fh:
82+
dlqmsg = json.load(fh)
83+
print(f"Parsing message from {dlqfile}")
84+
if (
85+
not isinstance(dlqmsg, dict)
86+
or not dlqmsg.get("header")
87+
or not dlqmsg.get("message")
88+
):
89+
print(f"File {dlqfile} is not a valid DLQ message.")
90+
continue
91+
92+
header = dlqmsg["header"]
93+
header["dlq-reinjected"] = "True"
94+
95+
drop_keys = {
96+
"message-id",
97+
"routing_key",
98+
"redelivered",
99+
"exchange",
100+
"consumer_tag",
101+
"delivery_mode",
102+
}
103+
clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys}
104+
105+
destination = header.get("x-death", [{}])[0].get("queue")
106+
transport.send(
107+
destination,
108+
dlqmsg["message"],
109+
headers=clean_header,
110+
)
111+
dlqfile.unlink()
112+
print(f"Done {dlqfile}\n")
113+
114+
transport.disconnect()
115+
116+
117+
def handle_failed_posts(messages_path: list[Path], token: str):
10118
"""Deal with any messages that have been sent as failed client posts"""
11-
for json_file in json_folder.glob("*"):
119+
for json_file in messages_path:
12120
with open(json_file, "r") as json_data:
13121
message = json.load(json_data)
14122

@@ -28,20 +136,6 @@ def handle_failed_posts(json_folder: Path, token: str):
28136
json_file.unlink()
29137

30138

31-
def handle_dlq_messages(json_folder: Path):
32-
"""Reinjected to the queue"""
33-
for json_file in json_folder.glob("*"):
34-
reinject_result = subprocess.run(
35-
["zocalo.dlq_reinject", "-e", "devrmq", str(json_file)],
36-
capture_output=True,
37-
)
38-
if reinject_result.returncode == 0:
39-
print(f"Reinjected {json_file}")
40-
json_file.unlink()
41-
else:
42-
print(f"Failed to reinject {json_file}")
43-
44-
45139
def run():
46140
"""
47141
Method of checking and purging murfey queues on rabbitmq
@@ -53,43 +147,47 @@ def run():
53147
description="Purge and reinject failed murfey messages"
54148
)
55149
parser.add_argument(
56-
"--queue",
57-
help="Queue to check and purge",
58-
required=True,
150+
"-c",
151+
"--config",
152+
help="Security config file",
153+
required=False,
59154
)
60155
parser.add_argument(
61-
"--token",
62-
help="Murfey token",
156+
"-u",
157+
"--username",
158+
help="Token username",
63159
required=True,
64160
)
65161
args = parser.parse_args()
66162

67-
purge_result = subprocess.run(
68-
["zocalo.dlq_purge", "-e", "devrmq", args.queue],
69-
capture_output=True,
163+
# Set the environment variable then read it by importing the security config
164+
os.environ["MURFEY_SECURITY_CONFIGURATION"] = args.config
165+
from murfey.util.config import get_security_config
166+
167+
security_config = get_security_config()
168+
169+
# Get the token to post to the api with
170+
token = jwt.encode(
171+
{"user": args.username},
172+
security_config.auth_key,
173+
algorithm=security_config.auth_algorithm,
70174
)
71-
if purge_result.returncode != 0:
72-
print(f"Failed to purge {args.queue}")
73-
return
74-
purge_stdout = purge_result.stdout.decode("utf8")
75-
export_directories = []
76-
if "exported" in purge_stdout:
77-
for line in purge_stdout.split("\n"):
78-
if line.strip().startswith("DLQ/"):
79-
dlq_dir = "DLQ/" + line.split("/")[1]
80-
if dlq_dir not in export_directories:
81-
print(f"Found messages in {dlq_dir}")
82-
export_directories.append(dlq_dir)
83-
84-
if not export_directories:
85-
print("No exported messages found")
86-
return
87-
88-
for json_dir in export_directories:
89-
handle_failed_posts(Path(json_dir), args.token)
90-
handle_dlq_messages(Path(json_dir))
91-
print("Done")
92175

176+
# Purge the queue and repost/reinject any messages found
177+
exported_messages = dlq_purge(
178+
security_config.feedback_queue, security_config.rabbitmq_credentials
179+
)
180+
handle_failed_posts(exported_messages, token)
181+
handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials)
93182

94-
if __name__ == "__main__":
95-
run()
183+
# Clean up any created directories
184+
for date_directory in dlq_dump_path.glob("*"):
185+
try:
186+
date_directory.rmdir()
187+
except OSError:
188+
print(f"Cannot remove {date_directory} as it is not empty")
189+
try:
190+
dlq_dump_path.rmdir()
191+
except OSError:
192+
print(f"Cannot remove {dlq_dump_path} as it is not empty")
193+
print("Done")

0 commit comments

Comments
 (0)