Skip to content

Commit 8823cc2

Browse files
Update the dlq reposting tool to not require zocalo (#493)
Modifies the tool to repost failed client posts to the murfey api, by removing the need for zocalo. Also no longer needs the token to be provided, it takes the security configuration file as an argument and works out the token from that. Uses some code copied across from cryoem-services, which originates from `python-zocalo`
1 parent 1236de2 commit 8823cc2

File tree

5 files changed

+408
-96
lines changed

5 files changed

+408
-96
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ murfey = "murfey.client:run"
8686
"murfey.create_db" = "murfey.cli.create_db:run"
8787
"murfey.db_sql" = "murfey.cli.murfey_db_sql:run"
8888
"murfey.decrypt_password" = "murfey.cli.decrypt_db_password:run"
89-
"murfey.dlq_murfey" = "murfey.cli.dlq_resubmit:run"
9089
"murfey.generate_key" = "murfey.cli.generate_crypto_key:run"
9190
"murfey.generate_password" = "murfey.cli.generate_db_password:run"
9291
"murfey.instrument_server" = "murfey.instrument_server:run"
92+
"murfey.repost_failed_calls" = "murfey.cli.repost_failed_calls:run"
9393
"murfey.server" = "murfey.server:run"
9494
"murfey.sessions" = "murfey.cli.db_sessions:run"
9595
"murfey.simulate" = "murfey.cli.dummy:run"

src/murfey/cli/dlq_resubmit.py

Lines changed: 0 additions & 95 deletions
This file was deleted.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import argparse
2+
import json
3+
from datetime import datetime
4+
from functools import partial
5+
from pathlib import Path
6+
from queue import Empty, Queue
7+
8+
import requests
9+
from jose import jwt
10+
from workflows.transport.pika_transport import PikaTransport
11+
12+
from murfey.util.config import security_from_file
13+
14+
15+
def dlq_purge(
16+
dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path
17+
) -> list[Path]:
18+
transport = PikaTransport()
19+
transport.load_configuration_file(rabbitmq_credentials)
20+
transport.connect()
21+
22+
queue_to_purge = f"dlq.{queue}"
23+
idlequeue: Queue = Queue()
24+
exported_messages = []
25+
26+
def receive_dlq_message(header: dict, message: dict) -> None:
27+
idlequeue.put_nowait("start")
28+
header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"])
29+
filename = dlq_dump_path / f"{queue}-{header['message-id']}"
30+
dlqmsg = {"header": header, "message": message}
31+
with filename.open("w") as fh:
32+
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
33+
print(f"Message {header['message-id']} exported to {filename}")
34+
exported_messages.append(filename)
35+
transport.ack(header)
36+
idlequeue.put_nowait("done")
37+
38+
print("Looking for DLQ messages in " + queue_to_purge)
39+
transport.subscribe(
40+
queue_to_purge,
41+
partial(receive_dlq_message),
42+
acknowledgement=True,
43+
)
44+
try:
45+
while True:
46+
idlequeue.get(True, 0.1)
47+
except Empty:
48+
print("Done dlq purge")
49+
transport.disconnect()
50+
return exported_messages
51+
52+
53+
def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
54+
transport = PikaTransport()
55+
transport.load_configuration_file(rabbitmq_credentials)
56+
transport.connect()
57+
58+
for f, dlqfile in enumerate(messages_path):
59+
if not dlqfile.is_file():
60+
continue
61+
with open(dlqfile) as fh:
62+
dlqmsg = json.load(fh)
63+
header = dlqmsg["header"]
64+
header["dlq-reinjected"] = "True"
65+
66+
drop_keys = {
67+
"message-id",
68+
"routing_key",
69+
"redelivered",
70+
"exchange",
71+
"consumer_tag",
72+
"delivery_mode",
73+
}
74+
clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys}
75+
76+
destination = header.get("x-death", [{}])[0].get("queue")
77+
transport.send(
78+
destination,
79+
dlqmsg["message"],
80+
headers=clean_header,
81+
)
82+
dlqfile.unlink()
83+
print(f"Reinjected {dlqfile}\n")
84+
85+
transport.disconnect()
86+
87+
88+
def handle_failed_posts(messages_path: list[Path], token: str):
89+
"""Deal with any messages that have been sent as failed client posts"""
90+
for json_file in messages_path:
91+
with open(json_file, "r") as json_data:
92+
message = json.load(json_data)
93+
94+
if not message.get("message") or not message["message"].get("url"):
95+
print(f"{json_file} is not a failed client post")
96+
continue
97+
dest = message["message"]["url"]
98+
message_json = message["message"]["json"]
99+
100+
response = requests.post(
101+
dest, json=message_json, headers={"Authorization": f"Bearer {token}"}
102+
)
103+
if response.status_code != 200:
104+
print(f"Failed to repost {json_file}")
105+
else:
106+
print(f"Reposted {json_file}")
107+
json_file.unlink()
108+
109+
110+
def run():
111+
"""
112+
Method of checking and purging murfey queues on rabbitmq
113+
Two types of messages are possible:
114+
- failed client posts which need reposting to the murfey server API
115+
- feedback messages that can be sent back to rabbitmq
116+
"""
117+
parser = argparse.ArgumentParser(
118+
description="Purge and reinject failed murfey messages"
119+
)
120+
parser.add_argument(
121+
"-c",
122+
"--config",
123+
help="Security config file",
124+
required=True,
125+
)
126+
parser.add_argument(
127+
"-u",
128+
"--username",
129+
help="Token username",
130+
required=True,
131+
)
132+
parser.add_argument(
133+
"-d", "--dir", default="DLQ", help="Directory to export messages to"
134+
)
135+
args = parser.parse_args()
136+
137+
# Read the security config file
138+
security_config = security_from_file(args.config)
139+
140+
# Get the token to post to the api with
141+
token = jwt.encode(
142+
{"user": args.username},
143+
security_config.auth_key,
144+
algorithm=security_config.auth_algorithm,
145+
)
146+
147+
# Purge the queue and repost/reinject any messages found
148+
dlq_dump_path = Path(args.dir)
149+
dlq_dump_path.mkdir(parents=True, exist_ok=True)
150+
exported_messages = dlq_purge(
151+
dlq_dump_path,
152+
security_config.feedback_queue,
153+
security_config.rabbitmq_credentials,
154+
)
155+
handle_failed_posts(exported_messages, token)
156+
handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials)
157+
158+
# Clean up any created directories
159+
try:
160+
dlq_dump_path.rmdir()
161+
except OSError:
162+
print(f"Cannot remove {dlq_dump_path} as it is not empty")
163+
print("Done")

0 commit comments

Comments
 (0)