|
| 1 | +import json |
| 2 | +import subprocess |
| 3 | +import sys |
| 4 | +from pathlib import Path |
| 5 | +from queue import Empty |
| 6 | +from unittest import mock |
| 7 | + |
| 8 | +from murfey.cli import repost_failed_calls |
| 9 | +from tests.conftest import mock_security_config_name |
| 10 | + |
| 11 | + |
| 12 | +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") |
| 13 | +@mock.patch("murfey.cli.repost_failed_calls.Queue") |
| 14 | +def test_dlq_purge(mock_queue, mock_transport, tmp_path): |
| 15 | + """Test the dlq purging function. |
| 16 | + Currently doesn't test saving the message, as the subscribe is mocked out""" |
| 17 | + mock_queue().get.return_value = {"message": "dummy"} |
| 18 | + mock_queue().get.side_effect = [None, Empty] |
| 19 | + |
| 20 | + exported_messages = repost_failed_calls.dlq_purge( |
| 21 | + tmp_path / "DLQ", "dummy", tmp_path / "config_file" |
| 22 | + ) |
| 23 | + |
| 24 | + # The transport should be connected to and subscribes to the queue |
| 25 | + mock_transport.assert_called_once() |
| 26 | + mock_transport().load_configuration_file.assert_called_with( |
| 27 | + tmp_path / "config_file" |
| 28 | + ) |
| 29 | + mock_transport().connect.assert_called_once() |
| 30 | + mock_transport().subscribe.assert_called_with( |
| 31 | + "dlq.dummy", |
| 32 | + mock.ANY, |
| 33 | + acknowledgement=True, |
| 34 | + ) |
| 35 | + mock_transport().disconnect.assert_called_once() |
| 36 | + |
| 37 | + # Should read from the queue |
| 38 | + mock_queue().get.assert_any_call(True, 0.1) |
| 39 | + |
| 40 | + # Ideally this test would return the message, but the partial isn't called yet |
| 41 | + assert exported_messages == [] |
| 42 | + |
| 43 | + |
| 44 | +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") |
| 45 | +def test_handle_dlq_messages(mock_transport, tmp_path): |
| 46 | + """Reinject some example messages""" |
| 47 | + # Create two sample messages |
| 48 | + messages_paths_list: list[Path] = [tmp_path / "not_a_message"] |
| 49 | + messages_dict: dict[str, dict] = { |
| 50 | + "msg1": { |
| 51 | + "header": { |
| 52 | + "x-death": [{"queue": "queue_msg1"}], |
| 53 | + "message-id": 1, |
| 54 | + "routing_key": "dlq.queue_msg1", |
| 55 | + "redelivered": True, |
| 56 | + "exchange": "", |
| 57 | + "consumer_tag": "1", |
| 58 | + "delivery_mode": 2, |
| 59 | + "other_key": "value", |
| 60 | + }, |
| 61 | + "message": {"parameters": "msg1"}, |
| 62 | + }, |
| 63 | + "msg2": { |
| 64 | + "header": {"x-death": [{"queue": "queue_msg2"}]}, |
| 65 | + "message": {"content": "msg2"}, |
| 66 | + }, |
| 67 | + } |
| 68 | + for message in messages_dict.keys(): |
| 69 | + messages_paths_list.append(tmp_path / message) |
| 70 | + with open(tmp_path / message, "w") as msg_file: |
| 71 | + json.dump(messages_dict[message], msg_file) |
| 72 | + |
| 73 | + # Send the two messages, plus a file that is not a message |
| 74 | + repost_failed_calls.handle_dlq_messages( |
| 75 | + messages_path=messages_paths_list, |
| 76 | + rabbitmq_credentials=tmp_path / "config_file", |
| 77 | + ) |
| 78 | + |
| 79 | + mock_transport.assert_called_once() |
| 80 | + mock_transport().load_configuration_file.assert_called_with( |
| 81 | + tmp_path / "config_file" |
| 82 | + ) |
| 83 | + mock_transport().connect.assert_called_once() |
| 84 | + |
| 85 | + # Only two messages should have been sent, the rest are invalid so are skipped |
| 86 | + assert mock_transport().send.call_count == 2 |
| 87 | + mock_transport().send.assert_any_call( |
| 88 | + "queue_msg1", |
| 89 | + {"parameters": "msg1"}, |
| 90 | + headers={ |
| 91 | + "x-death": "[{'queue': 'queue_msg1'}]", |
| 92 | + "other_key": "value", |
| 93 | + "dlq-reinjected": "True", |
| 94 | + }, |
| 95 | + ) |
| 96 | + mock_transport().send.assert_any_call( |
| 97 | + "queue_msg2", |
| 98 | + {"content": "msg2"}, |
| 99 | + headers={"x-death": "[{'queue': 'queue_msg2'}]", "dlq-reinjected": "True"}, |
| 100 | + ) |
| 101 | + |
| 102 | + # Removal and waiting |
| 103 | + assert not (tmp_path / "msg1").is_file() |
| 104 | + assert not (tmp_path / "msg2").is_file() |
| 105 | + mock_transport().disconnect.assert_called_once() |
| 106 | + |
| 107 | + |
| 108 | +@mock.patch("murfey.cli.repost_failed_calls.requests") |
| 109 | +def test_handle_failed_posts(mock_requests, tmp_path): |
| 110 | + """Test that the API is called with any failed client post messages""" |
| 111 | + # Create some sample messages |
| 112 | + messages_paths_list: list[Path] = [] |
| 113 | + messages_dict: dict[str, dict] = { |
| 114 | + "msg1": { |
| 115 | + "message": {"url": "sample/url", "json": {"content": "msg1"}}, |
| 116 | + }, |
| 117 | + "msg2": { |
| 118 | + "message": {"url": "sample/url", "json": {"content": "msg2"}}, |
| 119 | + }, |
| 120 | + "msg3": { |
| 121 | + "message": {"content": "msg3"}, # not a failed client post |
| 122 | + }, |
| 123 | + "msg4": { |
| 124 | + "header": {"content": "msg3"}, # does not have a message |
| 125 | + }, |
| 126 | + } |
| 127 | + for message in messages_dict.keys(): |
| 128 | + messages_paths_list.append(tmp_path / message) |
| 129 | + with open(tmp_path / message, "w") as msg_file: |
| 130 | + json.dump(messages_dict[message], msg_file) |
| 131 | + |
| 132 | + class Response: |
| 133 | + def __init__(self, status_code): |
| 134 | + self.status_code = status_code |
| 135 | + |
| 136 | + mock_requests.post.side_effect = [Response(200), Response(300)] |
| 137 | + |
| 138 | + repost_failed_calls.handle_failed_posts(messages_paths_list, "dummy_token") |
| 139 | + |
| 140 | + # Check the failed posts were resent |
| 141 | + assert mock_requests.post.call_count == 2 |
| 142 | + mock_requests.post.assert_any_call( |
| 143 | + "sample/url", |
| 144 | + json={"content": "msg1"}, |
| 145 | + headers={"Authorization": "Bearer dummy_token"}, |
| 146 | + ) |
| 147 | + mock_requests.post.assert_any_call( |
| 148 | + "sample/url", |
| 149 | + json={"content": "msg2"}, |
| 150 | + headers={"Authorization": "Bearer dummy_token"}, |
| 151 | + ) |
| 152 | + |
| 153 | + # Check only the failed post which was successfully reinjected got deleted |
| 154 | + assert not (tmp_path / "msg1").is_file() # got resent |
| 155 | + assert (tmp_path / "msg2").is_file() # failed reinjection |
| 156 | + assert (tmp_path / "msg3").is_file() # not a failed client post |
| 157 | + assert (tmp_path / "msg4").is_file() # does not have a message |
| 158 | + |
| 159 | + |
| 160 | +@mock.patch("murfey.cli.repost_failed_calls.dlq_purge") |
| 161 | +@mock.patch("murfey.cli.repost_failed_calls.handle_failed_posts") |
| 162 | +@mock.patch("murfey.cli.repost_failed_calls.handle_dlq_messages") |
| 163 | +@mock.patch("murfey.cli.repost_failed_calls.jwt") |
| 164 | +def test_run_repost_failed_calls( |
| 165 | + mock_jwt, |
| 166 | + mock_reinject, |
| 167 | + mock_repost, |
| 168 | + mock_purge, |
| 169 | + mock_security_configuration, |
| 170 | + tmp_path, |
| 171 | +): |
| 172 | + mock_jwt.encode.return_value = "dummy_token" |
| 173 | + mock_purge.return_value = ["/path/to/msg1"] |
| 174 | + |
| 175 | + config_file = tmp_path / mock_security_config_name |
| 176 | + with open(config_file) as f: |
| 177 | + security_config = json.load(f) |
| 178 | + |
| 179 | + sys.argv = [ |
| 180 | + "murfey.repost_failed_calls", |
| 181 | + "--config", |
| 182 | + str(config_file), |
| 183 | + "--username", |
| 184 | + "user", |
| 185 | + "--dir", |
| 186 | + "DLQ_dir", |
| 187 | + ] |
| 188 | + repost_failed_calls.run() |
| 189 | + |
| 190 | + mock_jwt.encode.assert_called_with( |
| 191 | + {"user": "user"}, |
| 192 | + security_config["auth_key"], |
| 193 | + algorithm=security_config["auth_algorithm"], |
| 194 | + ) |
| 195 | + |
| 196 | + mock_purge.assert_called_once_with( |
| 197 | + Path("DLQ_dir"), |
| 198 | + "murfey_feedback", |
| 199 | + Path(security_config["rabbitmq_credentials"]), |
| 200 | + ) |
| 201 | + mock_repost.assert_called_once_with(["/path/to/msg1"], "dummy_token") |
| 202 | + mock_reinject.assert_called_once_with( |
| 203 | + ["/path/to/msg1"], Path(security_config["rabbitmq_credentials"]) |
| 204 | + ) |
| 205 | + |
| 206 | + |
| 207 | +def test_repost_failed_calls_exists(): |
| 208 | + """Test the CLI is made""" |
| 209 | + result = subprocess.run( |
| 210 | + [ |
| 211 | + "murfey.repost_failed_calls", |
| 212 | + "--help", |
| 213 | + ], |
| 214 | + capture_output=True, |
| 215 | + ) |
| 216 | + assert not result.returncode |
| 217 | + |
| 218 | + # Find the first line of the help and strip out all the spaces and newlines |
| 219 | + stdout_as_string = result.stdout.decode("utf8", "replace") |
| 220 | + cleaned_help_line = ( |
| 221 | + stdout_as_string.split("\n\n")[0].replace("\n", "").replace(" ", "") |
| 222 | + ) |
| 223 | + assert cleaned_help_line == ( |
| 224 | + "usage:murfey.repost_failed_calls[-h]-cCONFIG-uUSERNAME[-dDIR]" |
| 225 | + ) |
0 commit comments