Skip to content

Commit 639a8fd

Browse files
committed
Merged recent changes from 'main' branch
2 parents 6994141 + 400765c commit 639a8fd

File tree

17 files changed

+523
-172
lines changed

17 files changed

+523
-172
lines changed

.bumpclient.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tool.bumpversion]
2-
current_version = "0.16.10"
2+
current_version = "0.16.12"
33
commit = true
44
tag = false
55

.bumpversion.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tool.bumpversion]
2-
current_version = "0.16.10"
2+
current_version = "0.16.12"
33
commit = true
44
tag = true
55

Helm/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: v1
22
name: murfey-services
33
description: Umbrella Helm chart for deploying the servers and daemons needed to enable Murfey to transfer and process data
4-
version: 0.16.10
4+
version: 0.16.12
55
dependencies:
66
- name: murfey-instrument-server-clem
77
- name: murfey-server
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
apiVersion: v1
22
name: murfey-instrument-server-clem
33
description: Helm chart for deploying a Murfey instrument server, which executes orders to detect, modify, and transfer files on the instrument PC, and notifies the backend server about transferred files
4-
version: 0.16.10
4+
version: 0.16.12
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
apiVersion: v1
22
name: murfey-rsync
33
description: Helm chart for deploying an rsync daemon, which is responsible for executing the transfer of files from the client storage directory to the server storage system
4-
version: 0.16.10
4+
version: 0.16.12
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
apiVersion: v1
22
name: murfey-server
33
description: Helm chart for deploying a Murfey backend server, which is responsible for orchestrating the data transfer and processing workflow between the client PC and the storage system
4-
version: 0.16.10
4+
version: 0.16.12

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ requires = [
77

88
[project]
99
name = "murfey"
10-
version = "0.16.10"
10+
version = "0.16.12"
1111
description = "Client-Server architecture hauling Cryo-EM data"
1212
readme = "README.md"
1313
keywords = [
@@ -86,10 +86,10 @@ murfey = "murfey.client:run"
8686
"murfey.create_db" = "murfey.cli.create_db:run"
8787
"murfey.db_sql" = "murfey.cli.murfey_db_sql:run"
8888
"murfey.decrypt_password" = "murfey.cli.decrypt_db_password:run"
89-
"murfey.dlq_murfey" = "murfey.cli.dlq_resubmit:run"
9089
"murfey.generate_key" = "murfey.cli.generate_crypto_key:run"
9190
"murfey.generate_password" = "murfey.cli.generate_db_password:run"
9291
"murfey.instrument_server" = "murfey.instrument_server:run"
92+
"murfey.repost_failed_calls" = "murfey.cli.repost_failed_calls:run"
9393
"murfey.server" = "murfey.server:run"
9494
"murfey.sessions" = "murfey.cli.db_sessions:run"
9595
"murfey.simulate" = "murfey.cli.dummy:run"

src/murfey/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from __future__ import annotations
22

3-
__version__ = "0.16.10"
4-
__supported_client_version__ = "0.16.10"
3+
__version__ = "0.16.12"
4+
__supported_client_version__ = "0.16.12"

src/murfey/cli/dlq_resubmit.py

Lines changed: 0 additions & 95 deletions
This file was deleted.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import argparse
2+
import json
3+
from datetime import datetime
4+
from functools import partial
5+
from pathlib import Path
6+
from queue import Empty, Queue
7+
8+
import requests
9+
from jose import jwt
10+
from workflows.transport.pika_transport import PikaTransport
11+
12+
from murfey.util.config import security_from_file
13+
14+
15+
def dlq_purge(
16+
dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path
17+
) -> list[Path]:
18+
transport = PikaTransport()
19+
transport.load_configuration_file(rabbitmq_credentials)
20+
transport.connect()
21+
22+
queue_to_purge = f"dlq.{queue}"
23+
idlequeue: Queue = Queue()
24+
exported_messages = []
25+
26+
def receive_dlq_message(header: dict, message: dict) -> None:
27+
idlequeue.put_nowait("start")
28+
header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"])
29+
filename = dlq_dump_path / f"{queue}-{header['message-id']}"
30+
dlqmsg = {"header": header, "message": message}
31+
with filename.open("w") as fh:
32+
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
33+
print(f"Message {header['message-id']} exported to {filename}")
34+
exported_messages.append(filename)
35+
transport.ack(header)
36+
idlequeue.put_nowait("done")
37+
38+
print("Looking for DLQ messages in " + queue_to_purge)
39+
transport.subscribe(
40+
queue_to_purge,
41+
partial(receive_dlq_message),
42+
acknowledgement=True,
43+
)
44+
try:
45+
while True:
46+
idlequeue.get(True, 0.1)
47+
except Empty:
48+
print("Done dlq purge")
49+
transport.disconnect()
50+
return exported_messages
51+
52+
53+
def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
54+
transport = PikaTransport()
55+
transport.load_configuration_file(rabbitmq_credentials)
56+
transport.connect()
57+
58+
for f, dlqfile in enumerate(messages_path):
59+
if not dlqfile.is_file():
60+
continue
61+
with open(dlqfile) as fh:
62+
dlqmsg = json.load(fh)
63+
header = dlqmsg["header"]
64+
header["dlq-reinjected"] = "True"
65+
66+
drop_keys = {
67+
"message-id",
68+
"routing_key",
69+
"redelivered",
70+
"exchange",
71+
"consumer_tag",
72+
"delivery_mode",
73+
}
74+
clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys}
75+
76+
destination = header.get("x-death", [{}])[0].get("queue")
77+
transport.send(
78+
destination,
79+
dlqmsg["message"],
80+
headers=clean_header,
81+
)
82+
dlqfile.unlink()
83+
print(f"Reinjected {dlqfile}\n")
84+
85+
transport.disconnect()
86+
87+
88+
def handle_failed_posts(messages_path: list[Path], token: str):
89+
"""Deal with any messages that have been sent as failed client posts"""
90+
for json_file in messages_path:
91+
with open(json_file, "r") as json_data:
92+
message = json.load(json_data)
93+
94+
if not message.get("message") or not message["message"].get("url"):
95+
print(f"{json_file} is not a failed client post")
96+
continue
97+
dest = message["message"]["url"]
98+
message_json = message["message"]["json"]
99+
100+
response = requests.post(
101+
dest, json=message_json, headers={"Authorization": f"Bearer {token}"}
102+
)
103+
if response.status_code != 200:
104+
print(f"Failed to repost {json_file}")
105+
else:
106+
print(f"Reposted {json_file}")
107+
json_file.unlink()
108+
109+
110+
def run():
111+
"""
112+
Method of checking and purging murfey queues on rabbitmq
113+
Two types of messages are possible:
114+
- failed client posts which need reposting to the murfey server API
115+
- feedback messages that can be sent back to rabbitmq
116+
"""
117+
parser = argparse.ArgumentParser(
118+
description="Purge and reinject failed murfey messages"
119+
)
120+
parser.add_argument(
121+
"-c",
122+
"--config",
123+
help="Security config file",
124+
required=True,
125+
)
126+
parser.add_argument(
127+
"-u",
128+
"--username",
129+
help="Token username",
130+
required=True,
131+
)
132+
parser.add_argument(
133+
"-d", "--dir", default="DLQ", help="Directory to export messages to"
134+
)
135+
args = parser.parse_args()
136+
137+
# Read the security config file
138+
security_config = security_from_file(args.config)
139+
140+
# Get the token to post to the api with
141+
token = jwt.encode(
142+
{"user": args.username},
143+
security_config.auth_key,
144+
algorithm=security_config.auth_algorithm,
145+
)
146+
147+
# Purge the queue and repost/reinject any messages found
148+
dlq_dump_path = Path(args.dir)
149+
dlq_dump_path.mkdir(parents=True, exist_ok=True)
150+
exported_messages = dlq_purge(
151+
dlq_dump_path,
152+
security_config.feedback_queue,
153+
security_config.rabbitmq_credentials,
154+
)
155+
handle_failed_posts(exported_messages, token)
156+
handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials)
157+
158+
# Clean up any created directories
159+
try:
160+
dlq_dump_path.rmdir()
161+
except OSError:
162+
print(f"Cannot remove {dlq_dump_path} as it is not empty")
163+
print("Done")

0 commit comments

Comments
 (0)