-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathtest_integration.py
More file actions
186 lines (146 loc) · 5.92 KB
/
test_integration.py
File metadata and controls
186 lines (146 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import os
import subprocess
from collections.abc import Callable
from pathlib import Path
from time import sleep
from typing import Any
from unittest import mock
import kombu
import pulse_utils
import pytest
from git import Repo
from mozlog import get_proxy_logger
from utils import hg_cat, hg_log, hg_rev
from git_hg_sync.__main__ import get_connection, get_queue, start_app
from git_hg_sync.config import Config, PulseConfig
from git_hg_sync.events import Event
from git_hg_sync.pulse_worker import PulseWorker
NO_RABBITMQ = os.getenv("RABBITMQ") != "true"
HERE = Path(__file__).parent
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_send_and_receive(pulse_config: PulseConfig, get_payload: Callable) -> None:
payload = get_payload()
def callback(body: Any, message: kombu.Message) -> None:
message.ack()
assert body["payload"] == payload
pulse_utils.send_pulse_message(pulse_config, payload, purge=True)
connection = get_connection(pulse_config)
queue = get_queue(pulse_config)
with connection.Consumer(queue, auto_declare=False, callbacks=[callback]):
connection.drain_events(timeout=5)
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_full_app(
tmp_path: Path,
get_payload: Callable,
) -> None:
# With the test configuration, our local branch and tags should map to those
# destinations.
local_branch = "esr128"
local_tag = "FIREFOX_128_0esr_RELEASE"
destination_branch = "default"
destination_tags_branch = "tags-esr128"
# Create a remote mercurial repository
hg_remote_repo_path = tmp_path / "hg-remotes" / "mozilla-esr128"
hg_remote_repo_path.mkdir(parents=True)
subprocess.run(["hg", "init"], cwd=hg_remote_repo_path, check=True)
# Create a remote git repository
git_remote_repo_path = tmp_path / "git-remotes" / "firefox-releases"
# Create an initial commit on git
repo = Repo.init(git_remote_repo_path, b="esr128")
foo_path = git_remote_repo_path / "foo.txt"
foo_path.write_text("FOO CONTENT")
repo.index.add([foo_path])
repo.index.commit("add foo.txt")
# Push the base to mercurial repository
subprocess.run(
[
"git",
"push",
"hg::" + str(hg_remote_repo_path),
f"{local_branch}:refs/heads/branches/{destination_branch}/tip",
],
cwd=git_remote_repo_path,
check=True,
)
assert "FOO CONTENT" in hg_cat(hg_remote_repo_path, "foo.txt", destination_branch)
bar_path = git_remote_repo_path / "bar.txt"
bar_path.write_text("BAR CONTENT")
repo.index.add([bar_path])
git_commit_sha = repo.index.commit("add bar.txt").hexsha
# modify config file to match the tmp dirs
config_content = Path(HERE / "data" / "config.toml").read_text()
config_content = config_content.replace("{directory}", str(tmp_path))
(tmp_path / "config.toml").write_text(config_content)
# send message
config = Config.from_file(tmp_path / "config.toml")
payload = get_payload(
repo_url=str(git_remote_repo_path),
branches={local_branch: git_commit_sha},
tags={local_tag: git_commit_sha},
)
pulse_utils.send_pulse_message(config.pulse, payload, purge=True)
# execute app
start_app(config, get_proxy_logger("test"), one_shot=True)
# test
assert "BAR CONTENT" in hg_cat(hg_remote_repo_path, "bar.txt", destination_branch)
assert "FIREFOX_128_0esr_RELEASE" in hg_cat(
hg_remote_repo_path, ".hgtags", destination_tags_branch
)
# test tag commit message
tag_log = hg_log(hg_remote_repo_path, destination_tags_branch, ["-T", "{desc}"])
assert "No bug - Tagging" in tag_log
assert "FIREFOX_128_0esr_RELEASE" in tag_log
assert hg_rev(hg_remote_repo_path, destination_branch) in tag_log
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_no_duplicated_ack_messages(
test_config: Config,
get_payload: Callable,
) -> None:
"""This test checks that long-running messages are not processed more than once.
It may also timeout, which is likely indicative of the same issue.
"""
payload = get_payload()
wait = 30
connection = get_connection(test_config.pulse)
queue = get_queue(test_config.pulse)
queue(connection).queue_declare()
queue(connection).queue_bind()
worker = PulseWorker(connection, queue, one_shot=True)
callback = mock.MagicMock()
callback.side_effect = lambda _event: sleep(wait)
worker.event_handler = callback
pulse_utils.send_pulse_message(test_config.pulse, payload, purge=True)
worker.run()
callback.assert_called_once()
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_messages_in_order(
test_config: Config,
get_payload: Callable,
) -> None:
"""This test checks that long-running messages are not processed more than once.
It may also timeout, which is likely indicative of the same issue.
"""
connection = get_connection(test_config.pulse)
queue = get_queue(test_config.pulse)
queue(connection).queue_declare()
queue(connection).queue_bind()
worker = PulseWorker(connection, queue, one_shot=False)
events_log = []
def event_handler(event: Event) -> None:
push_id = event.push_id
already_seen = push_id in events_log
events_log.append(push_id)
# Terminate the worker after processing the expected number of messages.
if len(events_log) == 4:
worker.should_stop = True
if not already_seen:
raise Exception("Not seen yet")
worker.event_handler = event_handler
pulse_utils.send_pulse_message(
test_config.pulse, get_payload(push_id=0), purge=True
)
pulse_utils.send_pulse_message(
test_config.pulse, get_payload(push_id=1), purge=False
)
worker.run()
assert events_log == [0, 0, 1, 1]