Skip to content

Commit 7b65be7

Browse files
authored
New zocalo.pickup command (#223)
* New zocalo.pickup command This reads dropfiles from the zocalo.go.fallback_location directory which is where zocalo.go writes messages if the message broker is unavailable. When the message broker becomes available again, zocalo.pickup can be used to re-submit messages to the processing_recipe queue. * Sort dropfiles by mtime
1 parent a0f4a00 commit 7b65be7

File tree

4 files changed

+174
-0
lines changed

4 files changed

+174
-0
lines changed

HISTORY.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ History
55
Unreleased
66
----------
77
* Add Dockerfile and build-and-push-docker-image GitHub workflow
8+
* Add ``zocalo.pickup`` command for re-submitting messages stored in the ``zocalo.go.fallback_location`` while the message broker is unavailable
89

910
0.26.0 (2022-11-04)
1011
-------------------

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ console_scripts =
4747
zocalo.dlq_purge = zocalo.cli.dlq_purge:run
4848
zocalo.dlq_reinject = zocalo.cli.dlq_reinject:run
4949
zocalo.go = zocalo.cli.go:run
50+
zocalo.pickup = zocalo.cli.pickup:run
5051
zocalo.queue_drain = zocalo.cli.queue_drain:run
5152
zocalo.service = zocalo.service:start_service
5253
zocalo.shutdown = zocalo.cli.shutdown:run

src/zocalo/cli/pickup.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
from __future__ import annotations
2+
3+
import argparse
4+
import json
5+
import pathlib
6+
import sys
7+
import time
8+
9+
import workflows.transport
10+
11+
import zocalo.configuration
12+
13+
14+
def run():
15+
zc = zocalo.configuration.from_file()
16+
zc.activate()
17+
dropdir = pathlib.Path(zc.storage["zocalo.go.fallback_location"])
18+
19+
parser = argparse.ArgumentParser(
20+
usage="zocalo.pickup [options]", description="Processes zocalo.go backlog"
21+
)
22+
23+
parser.add_argument("-?", action="help", help=argparse.SUPPRESS)
24+
parser.add_argument(
25+
"-d",
26+
"--delay",
27+
dest="delay",
28+
action="store",
29+
type=int,
30+
default=2,
31+
help="Number of seconds to wait between message dispatches",
32+
)
33+
parser.add_argument(
34+
"-w",
35+
"--wait",
36+
dest="wait",
37+
action="store",
38+
type=int,
39+
default=60,
40+
help="Number of seconds to wait initially",
41+
)
42+
parser.add_argument(
43+
"-v",
44+
"--verbose",
45+
dest="verbose",
46+
action="store_true",
47+
default=False,
48+
help="Show raw message before sending",
49+
)
50+
zc.add_command_line_options(parser)
51+
workflows.transport.add_command_line_options(parser, transport_argument=True)
52+
args = parser.parse_args()
53+
54+
try:
55+
files = list(dropdir.iterdir())
56+
except OSError:
57+
sys.exit("This program is only available to privileged users")
58+
59+
print(f"Found {len(files)} files")
60+
if not files:
61+
sys.exit()
62+
63+
if args.wait:
64+
print(f"Waiting {args.wait} seconds")
65+
time.sleep(args.wait)
66+
67+
print(f"Connecting to {args.transport}...")
68+
transport = workflows.transport.lookup(args.transport)()
69+
transport.connect()
70+
71+
file_info = {f: {} for f in files}
72+
73+
for f, finfo in file_info.items():
74+
with f.open() as fh:
75+
data = json.load(fh)
76+
finfo["message"] = data["message"]
77+
finfo["headers"] = data["headers"]
78+
finfo["originating-host"] = finfo["headers"].get("zocalo.go.host")
79+
finfo["recipes"] = ",".join(finfo["message"].get("recipes", []))
80+
finfo["mtime"] = f.stat().st_mtime
81+
82+
count = 0
83+
file_count = len(file_info)
84+
for f, finfo in dict(
85+
sorted(file_info.items(), key=lambda item: item[1]["mtime"])
86+
).items():
87+
print(
88+
f"Sending {f} from host {file_info[f]['originating-host']}"
89+
f" with recipes {file_info[f]['recipes']}"
90+
)
91+
assert f.exists()
92+
transport.send(
93+
"processing_recipe",
94+
file_info[f]["message"],
95+
headers=file_info[f]["headers"],
96+
)
97+
f.unlink()
98+
count = count + 1
99+
print(f"Done ({count} of {file_count})")
100+
try:
101+
time.sleep(args.delay)
102+
except KeyboardInterrupt:
103+
print("CTRL+C - stopping")
104+
time.sleep(0.5)
105+
sys.exit(1)
106+
107+
transport.disconnect()

tests/cli/test_pickup.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import sys
5+
import time
6+
import uuid
7+
from unittest import mock
8+
9+
import pytest
10+
import workflows.transport
11+
from workflows.transport.common_transport import CommonTransport
12+
13+
import zocalo.configuration
14+
from zocalo.cli import pickup
15+
16+
17+
@pytest.fixture
18+
def mock_zocalo_configuration(tmp_path):
19+
mock_zc = mock.MagicMock(zocalo.configuration.Configuration)
20+
mock_zc.storage = {
21+
"zocalo.go.fallback_location": str(tmp_path),
22+
}
23+
return mock_zc
24+
25+
26+
def test_pickup_empty_filelist_raises_system_exit(mocker, mock_zocalo_configuration):
27+
mocker.patch.object(
28+
zocalo.configuration, "from_file", return_value=mock_zocalo_configuration
29+
)
30+
with mock.patch.object(sys, "argv", ["prog"]), pytest.raises(SystemExit) as e:
31+
pickup.run()
32+
assert e.code == 0
33+
34+
35+
def test_pickup_sends_to_processing_recipe(mocker, mock_zocalo_configuration, tmp_path):
36+
mocked_transport = mocker.MagicMock(CommonTransport)
37+
mocker.patch.object(workflows.transport, "lookup", return_value=mocked_transport)
38+
mocker.patch.object(
39+
zocalo.configuration, "from_file", return_value=mock_zocalo_configuration
40+
)
41+
for i in range(10):
42+
msg = {
43+
"headers": {"zocalo.go.user": "foobar", "zocalo.go.host": "example.com"},
44+
"message": {
45+
"recipes": [f"thing{i}"],
46+
"parameters": {"foo": i},
47+
},
48+
}
49+
(tmp_path / str(uuid.uuid4())).write_text(json.dumps(msg))
50+
time.sleep(0.1)
51+
with mock.patch.object(sys, "argv", ["prog", "--wait", "0", "--delay", "0"]):
52+
pickup.run()
53+
mocked_transport().send.assert_has_calls(
54+
[
55+
mock.call(
56+
"processing_recipe",
57+
{
58+
"recipes": [f"thing{i}"],
59+
"parameters": {"foo": i},
60+
},
61+
headers=mock.ANY,
62+
)
63+
for i in range(10)
64+
]
65+
)

0 commit comments

Comments
 (0)