Skip to content

Commit 2fb9116

Browse files
committed
Handle the async case
1 parent d117b48 commit 2fb9116

File tree

3 files changed

+96
-59
lines changed

3 files changed

+96
-59
lines changed

src/murfey/cli/repost_failed_calls.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import argparse
2+
import asyncio
23
import json
34
from datetime import datetime
45
from functools import partial
5-
from inspect import getfullargspec
6+
from inspect import getfullargspec, iscoroutinefunction
67
from pathlib import Path
78
from queue import Empty, Queue
89

@@ -106,9 +107,9 @@ def handle_failed_posts(messages_path: list[Path], murfey_db: Session):
106107
for json_file in messages_path:
107108
with open(json_file, "r") as json_data:
108109
message = json.load(json_data)
109-
router_name = message.get("router_name", "")
110+
router_name = message.get("message", {}).get("router_name", "")
110111
router_base = router_name.split(".")[0]
111-
function_name = message.get("function_name", "")
112+
function_name = message.get("message", {}).get("function_name", "")
112113
if not router_name or not function_name:
113114
print(
114115
f"Cannot repost {json_file} as it does not have a router or function name"
@@ -122,22 +123,31 @@ def handle_failed_posts(messages_path: list[Path], murfey_db: Session):
122123
except AttributeError:
123124
print(f"Cannot repost {json_file} as {function_name} does not exist")
124125
continue
125-
expected_args = getfullargspec(function_to_call).args
126+
expected_args = getfullargspec(function_to_call)
126127

127-
call_kwargs = message.get("kwargs", {})
128-
call_data = message.get("data", {})
128+
call_kwargs = message.get("message", {}).get("kwargs", {})
129+
call_data = message.get("message", {}).get("data", {})
129130
function_call_dict = {}
130131

131-
for call_arg in expected_args:
132-
if call_arg in call_kwargs.keys():
133-
function_call_dict[call_arg] = call_kwargs[call_arg]
134-
elif call_arg == "db":
135-
function_call_dict["db"] = murfey_db
136-
else:
137-
function_call_dict[call_arg] = call_data
132+
try:
133+
for call_arg in expected_args.args:
134+
call_arg_type = expected_args.annotations.get(call_arg, str)
135+
if call_arg in call_kwargs.keys():
136+
function_call_dict[call_arg] = call_arg_type(call_kwargs[call_arg])
137+
elif call_arg == "db":
138+
function_call_dict["db"] = murfey_db
139+
else:
140+
print(call_data, call_arg_type, call_arg)
141+
function_call_dict[call_arg] = call_arg_type(**call_data)
142+
except TypeError as e:
143+
print(f"Cannot repost {json_file} due to argument error: {e}")
144+
continue
138145

139146
try:
140-
function_to_call(**function_call_dict)
147+
if iscoroutinefunction(function_to_call):
148+
asyncio.run(function_to_call(**function_call_dict))
149+
else:
150+
function_to_call(**function_call_dict)
141151
print(f"Reposted {json_file}")
142152
json_file.unlink()
143153
except Exception as e:

tests/cli/test_repost_failed_calls.py

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from unittest import mock
77

88
from murfey.cli import repost_failed_calls
9+
from murfey.util.config import security_from_file
10+
from murfey.util.db import Tilt
911

1012

1113
@mock.patch("murfey.cli.repost_failed_calls.PikaTransport")
@@ -104,101 +106,124 @@ def test_handle_dlq_messages(mock_transport, tmp_path):
104106
mock_transport().disconnect.assert_called_once()
105107

106108

107-
@mock.patch("murfey.cli.repost_failed_calls.requests")
108-
def test_handle_failed_posts(mock_requests, tmp_path):
109+
def test_handle_failed_posts(tmp_path):
109110
"""Test that the API is called with any failed client post messages"""
110111
# Create some sample messages
111112
messages_paths_list: list[Path] = []
112113
messages_dict: dict[str, dict] = {
113114
"msg1": {
114-
"message": {"url": "sample/url", "json": {"content": "msg1"}},
115-
},
115+
"message": {
116+
"router_name": "workflow.tomo_router",
117+
"function_name": "register_completed_tilt_series",
118+
"kwargs": {"visit_name": "cm12345-1", "session_id": 1},
119+
"data": {
120+
"tags": ["tag"],
121+
"source": "source",
122+
"tilt_series_lengths": [10],
123+
},
124+
},
125+
}, # normal example
116126
"msg2": {
117-
"message": {"url": "sample/url", "json": {"content": "msg2"}},
118-
},
127+
"message": {
128+
"router_name": "workflow.tomo_router",
129+
"function_name": "register_tilt",
130+
"kwargs": {"visit_name": "cm12345-1", "session_id": 1},
131+
"data": {
132+
"tilt_series_tag": "tag",
133+
"source": "source",
134+
"movie_path": "path",
135+
},
136+
},
137+
}, # async example
119138
"msg3": {
120-
"message": {"content": "msg3"}, # not a failed client post
139+
"message": {
140+
"router_name": "workflow.tomo_router",
141+
"function_name": "register_completed_tilt_series",
142+
"data": {"tags": ["tag"]},
143+
}
121144
},
122145
"msg4": {
123-
"header": {"content": "msg3"}, # does not have a message
146+
"message": {"function_name": "dummy"}, # does not have a router
147+
},
148+
"msg5": {
149+
"message": {"router_name": "workflow"}, # does not have a function
150+
},
151+
"msg6": {
152+
"message": {
153+
"router_name": "workflow",
154+
"function_name": "dummy",
155+
}, # function does not exist
124156
},
125157
}
126158
for file_name, message in messages_dict.items():
127159
messages_paths_list.append(tmp_path / file_name)
128160
with open(tmp_path / file_name, "w") as msg_file:
129161
json.dump(message, msg_file)
130162

131-
class Response:
132-
def __init__(self, status_code):
133-
self.status_code = status_code
134-
135-
mock_requests.post.side_effect = [Response(200), Response(300)]
136-
137-
repost_failed_calls.handle_failed_posts(messages_paths_list, "dummy_token")
163+
mock_db = mock.Mock()
164+
mock_exec_return = mock.Mock()
165+
mock_exec_return.all.return_value = []
166+
mock_db.exec.return_value = mock_exec_return
167+
repost_failed_calls.handle_failed_posts(messages_paths_list, mock_db)
138168

139169
# Check the failed posts were resent
140-
assert mock_requests.post.call_count == 2
141-
mock_requests.post.assert_any_call(
142-
"sample/url",
143-
json={"content": "msg1"},
144-
headers={"Authorization": "Bearer dummy_token"},
145-
)
146-
mock_requests.post.assert_any_call(
147-
"sample/url",
148-
json={"content": "msg2"},
149-
headers={"Authorization": "Bearer dummy_token"},
170+
assert mock_db.exec.call_count == 3
171+
assert mock_db.exec().one.call_count == 1
172+
assert mock_db.exec().all.call_count == 2
173+
assert mock_exec_return.one.call_count == 1
174+
assert mock_exec_return.all.call_count == 2
175+
assert mock_db.commit.call_count == 3
176+
mock_db.add.assert_called_once_with(
177+
Tilt(movie_path="path", tilt_series_id=mock.ANY, motion_corrected=False)
150178
)
151179

152180
# Check only the failed post which was successfully reinjected got deleted
153181
assert not (tmp_path / "msg1").is_file() # got resent
154-
assert (tmp_path / "msg2").is_file() # failed reinjection
155-
assert (tmp_path / "msg3").is_file() # not a failed client post
156-
assert (tmp_path / "msg4").is_file() # does not have a message
182+
assert not (tmp_path / "msg2").is_file() # got resent
183+
assert (tmp_path / "msg3").is_file() # failed reinjection
184+
assert (tmp_path / "msg4").is_file() # does not have a router
185+
assert (tmp_path / "msg5").is_file() # does not have a function
186+
assert (tmp_path / "msg6").is_file() # function does not exist
157187

158188

159189
@mock.patch("murfey.cli.repost_failed_calls.dlq_purge")
160190
@mock.patch("murfey.cli.repost_failed_calls.handle_failed_posts")
161191
@mock.patch("murfey.cli.repost_failed_calls.handle_dlq_messages")
162-
@mock.patch("murfey.cli.repost_failed_calls.jwt")
192+
@mock.patch("murfey.cli.repost_failed_calls.get_murfey_db_session")
163193
def test_run_repost_failed_calls(
164-
mock_jwt,
194+
mock_db,
165195
mock_reinject,
166196
mock_repost,
167197
mock_purge,
168198
mock_security_configuration,
169199
):
170-
mock_jwt.encode.return_value = "dummy_token"
200+
mock_db.return_value = "db"
171201
mock_purge.return_value = ["/path/to/msg1"]
172202

173203
config_file = mock_security_configuration
174204
with open(config_file) as f:
175-
security_config = json.load(f)
205+
security_config_dict = json.load(f)
176206

177207
sys.argv = [
178208
"murfey.repost_failed_calls",
179209
"--config",
180210
str(config_file),
181-
"--username",
182-
"user",
183211
"--dir",
184212
"DLQ_dir",
185213
]
186214
repost_failed_calls.run()
187215

188-
mock_jwt.encode.assert_called_with(
189-
{"user": "user"},
190-
security_config["auth_key"],
191-
algorithm=security_config["auth_algorithm"],
192-
)
216+
security_config_class = security_from_file(config_file)
217+
mock_db.assert_called_with(security_config_class)
193218

194219
mock_purge.assert_called_once_with(
195220
Path("DLQ_dir"),
196221
"murfey_feedback",
197-
Path(security_config["rabbitmq_credentials"]),
222+
Path(security_config_dict["rabbitmq_credentials"]),
198223
)
199-
mock_repost.assert_called_once_with(["/path/to/msg1"], "dummy_token")
224+
mock_repost.assert_called_once_with(["/path/to/msg1"], "db")
200225
mock_reinject.assert_called_once_with(
201-
["/path/to/msg1"], Path(security_config["rabbitmq_credentials"])
226+
["/path/to/msg1"], Path(security_config_dict["rabbitmq_credentials"])
202227
)
203228

204229

@@ -218,6 +243,4 @@ def test_repost_failed_calls_exists():
218243
cleaned_help_line = (
219244
stdout_as_string.split("\n\n")[0].replace("\n", "").replace(" ", "")
220245
)
221-
assert cleaned_help_line == (
222-
"usage:murfey.repost_failed_calls[-h]-cCONFIG-uUSERNAME[-dDIR]"
223-
)
246+
assert cleaned_help_line == ("usage:murfey.repost_failed_calls[-h]-cCONFIG[-dDIR]")

tests/client/test_context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def test_tomography_context_add_tomo_tilt(mock_post, mock_get, tmp_path):
2626
default_destinations={tmp_path: str(tmp_path)},
2727
instrument_name="",
2828
visit="test",
29+
murfey_session=1,
2930
)
3031
context = TomographyContext("tomo", tmp_path)
3132
(tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch()
@@ -82,6 +83,7 @@ def test_tomography_context_add_tomo_tilt_out_of_order(mock_post, mock_get, tmp_
8283
default_destinations={tmp_path: str(tmp_path)},
8384
instrument_name="",
8485
visit="test",
86+
murfey_session=1,
8587
)
8688
context = TomographyContext("tomo", tmp_path)
8789
(tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch()
@@ -166,6 +168,7 @@ def test_tomography_context_add_tomo_tilt_delayed_tilt(mock_post, mock_get, tmp_
166168
default_destinations={tmp_path: str(tmp_path)},
167169
instrument_name="",
168170
visit="test",
171+
murfey_session=1,
169172
)
170173
context = TomographyContext("tomo", tmp_path)
171174
(tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch()
@@ -230,6 +233,7 @@ def test_setting_tilt_series_size_and_completion_from_mdoc_parsing(
230233
default_destinations={tmp_path: str(tmp_path)},
231234
instrument_name="",
232235
visit="test",
236+
murfey_session=1,
233237
)
234238
context = TomographyContext("tomo", tmp_path)
235239
assert len(context._tilt_series_sizes) == 0

0 commit comments

Comments
 (0)