Skip to content

Commit 6498138

Browse files
authored
dlq_purge: Show what queues messages came from (#264)
Also, strip dlq. prefix from rabbitmq queues.
1 parent 3dc4880 commit 6498138

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

HISTORY.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Unreleased
66
----------
77
- Minimum supported python is now 3.11.
88
- Switch python packaging backend back to setuptools. (`#263 <https://github.com/DiamondLightSource/python-zocalo/pull/263>`_)
9+
- ``zocalo.dlq_purge``: Show which queues DLQ messages came from, and accept queue names with prefix. (`#264 <https://github.com/DiamondLightSource/python-zocalo/pull/264>`_)
910

1011
1.2.0 (2024-11-14)
1112
------------------

src/zocalo/cli/dlq_purge.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import time
1616
from datetime import datetime
1717
from functools import partial
18+
from typing import Literal
1819

1920
import workflows
2021

@@ -60,15 +61,17 @@ def run() -> None:
6061

6162
args = parser.parse_args(["--stomp-prfx=DLQ"] + sys.argv[1:])
6263
if args.transport == "PikaTransport":
63-
queues = ["dlq." + a for a in args.queues]
64+
queues = ["dlq." + a.removeprefix("dlq.") for a in args.queues]
6465
else:
6566
queues = args.queues
6667
transport = workflows.transport.lookup(args.transport)()
6768

6869
characterfilter = re.compile(r"[^a-zA-Z0-9._-]+", re.UNICODE)
69-
idlequeue: queue.Queue = queue.Queue()
70+
idlequeue: queue.Queue[Literal["start", "done"] | tuple[str, str]] = queue.Queue()
7071

71-
def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
72+
def receive_dlq_message(
73+
header: dict, message: dict, *, queue_name: str, rabbitmq=False
74+
) -> None:
7275
idlequeue.put_nowait("start")
7376
if rabbitmq:
7477
msg_time = int(datetime.timestamp(header["x-death"][0]["time"])) * 1000
@@ -99,8 +102,11 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
99102

100103
with filename.open("w") as fh:
101104
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
102-
print(
103-
f"Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}"
105+
idlequeue.put_nowait(
106+
(
107+
queue_name,
108+
f" Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}",
109+
)
104110
)
105111
transport.ack(header)
106112
idlequeue.put_nowait("done")
@@ -112,17 +118,32 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
112118
elif args.transport == "PikaTransport":
113119
rmq = RabbitMQAPI.from_zocalo_configuration(zc)
114120
queues = [q.name for q in rmq.queues() if q.name.startswith("dlq.")]
121+
print(f"Looking for DLQ messages in {len(queues)} queues...")
115122
for queue_ in queues:
116-
print("Looking for DLQ messages in " + queue_)
117123
transport.subscribe(
118124
queue_,
119-
partial(receive_dlq_message, rabbitmq=args.transport == "PikaTransport"),
125+
partial(
126+
receive_dlq_message,
127+
rabbitmq=args.transport == "PikaTransport",
128+
queue_name=queue_,
129+
),
120130
acknowledgement=True,
121131
)
132+
messages: dict[str, list[str]] = {}
122133
try:
123134
idlequeue.get(True, args.wait or 3)
124135
while True:
125-
idlequeue.get(True, args.wait or 0.1)
136+
result = idlequeue.get(True, args.wait or 0.1)
137+
if isinstance(result, tuple):
138+
queuename, message = result
139+
messages.setdefault(queuename, []).append(message)
140+
126141
except queue.Empty:
142+
# Print out what we found, per queue
143+
for queuename, q_messages in messages.items():
144+
print(f"Found {len(q_messages)} DLQ messages in {queuename}")
145+
for message in q_messages:
146+
print(message)
147+
127148
print("Done.")
128149
transport.disconnect()

0 commit comments

Comments
 (0)