-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathtest_containerd_integration.py
More file actions
320 lines (260 loc) · 15.1 KB
/
test_containerd_integration.py
File metadata and controls
320 lines (260 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import asyncio
import logging
import jinja2
from juju.unit import Unit
from juju.application import Application
from pathlib import Path
import pytest
import pytest_asyncio
import shlex
import toml
from typing import Dict
from pytest_operator.plugin import OpsTest
from tenacity import retry, stop_after_attempt, wait_exponential
from utils import JujuRun
import yaml
log = logging.getLogger(__name__)
def format_kubectl_cmd(cmd: str) -> str:
"""Return a kubectl command with the kubeconfig path."""
return f"kubectl --kubeconfig /root/.kube/config {cmd}"
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test):
"""Build and deploy Containerd in bundle."""
log.info("Build Charm...")
charm = next(Path.cwd().glob("containerd*.charm"), None)
if not charm:
log.info("Build Charm...")
charm = await ops_test.build_charm(".")
build_script = Path.cwd() / "build-resources.sh"
resources = await ops_test.build_resources(build_script, with_sudo=False)
expected_resources = {"containerd-multiarch"}
if resources and all(rsc.stem in expected_resources for rsc in resources):
resources = {rsc.stem.replace("-", "_"): rsc for rsc in resources}
else:
log.info("Failed to build resources, downloading from latest/edge")
arch_resources = ops_test.arch_specific_resources(charm)
resources = await ops_test.download_resources(charm, resources=arch_resources)
resources = {name.replace("-", "_"): rsc for name, rsc in resources.items()}
assert resources, "Failed to build or download charm resources."
context = dict(charm=charm, **resources)
overlays = [
ops_test.Bundle("kubernetes-core", channel="edge"),
Path("tests/data/charm.yaml"),
]
log.info("Build Bundle...")
bundle, *overlays = await ops_test.async_render_bundles(*overlays, **context)
log.info("Deploy Bundle...")
model = ops_test.model_full_name
cmd = f"juju deploy -m {model} {bundle} "
cmd += " ".join(f"--overlay={f}" for f in overlays)
rc, stdout, stderr = await ops_test.run(*shlex.split(cmd))
assert rc == 0, f"Bundle deploy failed: {(stderr or stdout).strip()}"
apps = [app for fragment in (bundle, *overlays) for app in yaml.safe_load(fragment.open())["applications"]]
await ops_test.model.wait_for_idle(apps=apps, status="active", timeout=60 * 60)
async def test_status_messages(ops_test):
"""Validate that the status messages are correct."""
for unit in ops_test.model.applications["containerd"].units:
assert unit.workload_status == "active"
assert unit.workload_status_message == "Container runtime available"
async def process_elapsed_time(unit, process):
"""Get elasped time of a running process."""
result = await JujuRun.command(unit, f"ps -p `pidof {process}` -o etimes=")
return int(result.stdout)
@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5))
async def pods_in_state(unit: Unit, selector: Dict[str, str], state: str = "Running"):
"""Retry checking until the pods all match a specified state."""
format = ",".join("=".join(pairs) for pairs in selector.items())
cmd = format_kubectl_cmd(f"get pods -l={format} --no-headers")
result = await JujuRun.command(unit, cmd)
pod_set = result.stdout.splitlines()
assert pod_set and all(state in line for line in pod_set)
return pod_set
@pytest.mark.parametrize("which_action", ("containerd", "packages"))
async def test_upgrade_action(ops_test, which_action):
"""Test running upgrade action."""
unit = ops_test.model.applications["containerd"].units[0]
start = await process_elapsed_time(unit, "containerd")
output = await JujuRun.action(unit, f"upgrade-{which_action}")
results = output.results
log.info(f"Upgrade results = '{results}'")
assert results["containerd"]["available"] == results["containerd"]["installed"]
assert results["containerd"]["upgrade-available"] == "False"
assert not results["containerd"].get("upgrade-completed"), "No upgrade should have been run"
end = await process_elapsed_time(unit, "containerd")
assert end >= start, "containerd service shouldn't have been restarted"
@pytest.mark.parametrize("which_action", ("containerd", "packages"))
async def test_upgrade_dry_run_action(ops_test, which_action):
"""Test running upgrade action in dry-run mode."""
unit = ops_test.model.applications["containerd"].units[0]
start = await process_elapsed_time(unit, "containerd")
output = await JujuRun.action(unit, f"upgrade-{which_action}", **{"dry-run": True})
results = output.results
log.info(f"Upgrade dry-run results = '{results}'")
assert results["containerd"]["available"] == results["containerd"]["installed"]
assert results["containerd"]["upgrade-available"] == "False"
end = await process_elapsed_time(unit, "containerd")
assert end >= start, "containerd service shouldn't have been restarted"
async def test_upgrade_action_containerd_force(ops_test):
"""Test running upgrade action without GPU and with force."""
unit = ops_test.model.applications["containerd"].units[0]
start = await process_elapsed_time(unit, "containerd")
action = await JujuRun.action(unit, "upgrade-packages", force=True)
results = action.results
log.info(f"Upgrade results = '{results}'")
assert results["containerd"]["available"] == results["containerd"]["installed"]
assert results["containerd"]["upgrade-available"] == "False"
assert not results["containerd"].get("upgrade-completed"), "No upgrade should have been run"
end = await process_elapsed_time(unit, "containerd")
assert end >= start, "containerd service shouldn't have been restarted"
async def test_upgrade_action_gpu_uninstalled_but_gpu_forced(ops_test):
"""Test running GPU force upgrade-action with no GPU drivers installed.
upgrade-action with `GPU` and `force` flags both set but without GPU drivers currently
installed should not upgrade any GPU drivers.
"""
unit = ops_test.model.applications["containerd"].units[0]
start = await process_elapsed_time(unit, "containerd")
action = await JujuRun.action(unit, "upgrade-packages", containerd=False, gpu=True, force=True)
results = action.results
log.info(f"Upgrade results = '{results}'")
action = await JujuRun.command(unit, "dpkg-query --list cuda-drivers", check=False)
assert "cuda-drivers" in action.stderr, "cuda-drivers shouldn't be installed"
end = await process_elapsed_time(unit, "containerd")
assert end >= start, "containerd service shouldn't have been restarted"
@pytest_asyncio.fixture(scope="module")
async def juju_config(ops_test):
"""Apply configuration for a test, then revert after the test is completed."""
async def setup(application, _timeout=10 * 60, **new_config):
"""Apply config by application name and the config values.
@param: str application: name of app to configure
@param: dict new_config: configuration key=values to adjust
@param: float _timeout: time in seconds to wait for applications to be stable
"""
await update_reverts(application, new_config.keys(), _timeout)
await ops_test.model.applications[application].set_config(new_config)
await ops_test.model.wait_for_idle(apps=[application], status="active", timeout=_timeout)
async def update_reverts(application, configs, _timeout):
"""Control what config is reverted per app during the test module teardown.
Because juju_config is a module scoped fixture, it isn't torn down until all the tests
in the module are completed. The `setup` method could be called multiple times
by various tests, but only the first call should gather the original config
Subsequent calls, should update which keys are reverted, and the greatest timeout
selected to revert all keys.
"""
reverts = to_revert.get(application)
if not reverts:
reverts = (await ops_test.model.applications[application].get_config(), set(configs), _timeout)
else:
reverts = (reverts[0], reverts[1] | set(configs), max(reverts[2], _timeout))
to_revert[application] = reverts
to_revert = {}
yield setup
for app, (pre_test, settable, timeout) in to_revert.items():
revert_config = {key: pre_test[key]["value"] for key in settable}
await ops_test.model.applications[app].set_config(revert_config)
await ops_test.model.wait_for_idle(apps=list(to_revert.keys()), status="active")
@pytest_asyncio.fixture(scope="module", params=["v1", "v2"])
async def config_version(request, juju_config):
"""Set the containerd config_version based on a parameter."""
await juju_config("containerd", config_version=request.param)
return request.param
async def containerd_config(unit):
"""Gather containerd config and load as a dict from its toml representation."""
output = await JujuRun.command(unit, "cat /etc/containerd/config.toml")
assert output.stdout, "Containerd output was empty"
return toml.loads(output.stdout)
async def test_containerd_registry_has_dockerio_mirror(config_version, ops_test):
"""Test gathering the list of registries."""
plugin = "cri" if config_version == "v1" else "io.containerd.grpc.v1.cri"
for unit in ops_test.model.applications["containerd"].units:
config = await containerd_config(unit)
mirrors = config["plugins"][plugin]["registry"]["mirrors"]
assert "docker.io" in mirrors, "docker.io missing from containerd config"
assert mirrors["docker.io"]["endpoint"] == ["https://registry-1.docker.io"]
async def test_containerd_registry_with_private_registry(config_version, ops_test):
"""Test whether private registry config is represented in containerd."""
registry_unit = ops_test.model.applications.get("docker-registry").units[0]
plugin = "cri" if config_version == "v1" else "io.containerd.grpc.v1.cri"
for unit in ops_test.model.applications["containerd"].units:
config = await containerd_config(unit)
configs = config["plugins"][plugin]["registry"]["configs"]
assert len(configs) == 1, "registry config isn't represented in config.toml"
docker_registry = next(iter(configs))
assert configs[docker_registry]["tls"], "TLS config isn't represented in the config.toml"
assert docker_registry in registry_unit.workload_status_message
async def test_containerd_disable_gpu_support(ops_test, juju_config):
"""Test that disabling gpu support removes nvidia drivers."""
await juju_config("containerd", gpu_driver="none")
for unit in ops_test.model.applications["containerd"].units:
output = await JujuRun.command(unit, "cat /etc/apt/sources.list.d/nvidia.list", check=False)
assert "No such file " in output.stderr, "NVIDIA sources list was populated"
output = await JujuRun.command(unit, "dpkg-query --list cuda-drivers", check=False)
assert "cuda-drivers" in output.stderr, "cuda-drivers shouldn't be installed"
async def test_containerd_nvidia_gpu_support(ops_test, juju_config):
"""Test that enabling gpu support installed nvidia drivers."""
await juju_config("containerd", gpu_driver="nvidia", _timeout=15 * 60)
for unit in ops_test.model.applications["containerd"].units:
output = await JujuRun.command(unit, "cat /etc/apt/sources.list.d/nvidia.list")
assert output.stdout, "NVIDIA sources list was empty"
output = await JujuRun.command(unit, "dpkg-query --list cuda-drivers")
assert "cuda-drivers" in output.stdout, "cuda-drivers not installed"
@pytest.mark.xfail(reason="No apt repo for nvidia-container-runtime")
async def test_upgrade_action_gpu_force(ops_test):
"""Test running upgrade action with GPU and force."""
unit = ops_test.model.applications["containerd"].units[0]
start = await process_elapsed_time(unit, "containerd")
action = await JujuRun.action(unit, "upgrade-packages", containerd=False, gpu=True, force=True)
results = action.results
log.info(f"Upgrade results = '{results}'")
assert results["cuda-drivers"]["available"] == results["cuda-drivers"]["installed"]
assert results["cuda-drivers"]["upgrade-available"] == "False"
assert results["cuda-drivers"]["upgrade-complete"] == "True"
end = await process_elapsed_time(unit, "containerd")
assert end >= start, "containerd service shouldn't have been restarted"
@pytest_asyncio.fixture()
async def microbots(ops_test: OpsTest, tmp_path: Path):
"""Start microbots workload on each k8s-worker, cleanup at the end of the test."""
workers: Application = ops_test.model.applications["kubernetes-worker"]
any_worker: Unit = workers.units[0]
arch = any_worker.machine.safe_data["hardware-characteristics"]["arch"]
context = {
"public_address": any_worker.public_address,
"replicas": len(workers.units),
"arch": arch,
}
rendered = str(tmp_path / "microbot.yaml")
microbot = jinja2.Template(Path("tests/data/microbot.yaml.j2").read_text())
microbot.stream(**context).dump(rendered)
kubectl = format_kubectl_cmd("{command} -f /tmp/microbot.yaml")
try:
cmd = f"scp {rendered} {any_worker.name}:/tmp/microbot.yaml"
await ops_test.juju(*shlex.split(cmd), check=True)
await JujuRun.command(any_worker, kubectl.format(command="apply"))
pods = await pods_in_state(any_worker, {"app": "microbot"}, "Running")
yield len(pods)
finally:
await JujuRun.command(any_worker, kubectl.format(command="delete"))
async def test_restart_containerd(microbots, ops_test: OpsTest):
"""Test microbots continue running while containerd stopped."""
containerds = ops_test.model.applications["containerd"]
num_units = len(containerds.units)
any_containerd = containerds.units[0]
try:
await asyncio.gather(*(JujuRun.command(_, "service containerd stop") for _ in containerds.units))
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=["containerd"], status="blocked", timeout=6 * 60)
nodes = await JujuRun.command(any_containerd, format_kubectl_cmd("get nodes"))
assert nodes.stdout.count("NotReady") == num_units, "Ensure all nodes aren't ready"
# test that pods are still running while containerd is offline
pods = await JujuRun.command(any_containerd, format_kubectl_cmd("get pods -l=app=microbot"))
assert pods.stdout.count("microbot") == microbots, f"Ensure {microbots} pod(s) are installed"
assert pods.stdout.count("Running") == microbots, f"Ensure {microbots} pod(s) are running with containerd down"
cluster_ip = await JujuRun.command(
any_containerd,
format_kubectl_cmd("get service -l=app=microbot -ojsonpath='{.items[*].spec.clusterIP}'"),
)
endpoint = f"http://{cluster_ip.stdout.strip()}"
await JujuRun.command(any_containerd, f"curl {endpoint}")
finally:
await asyncio.gather(*(JujuRun.command(_, "service containerd start") for _ in containerds.units))
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=["containerd"], status="active", timeout=6 * 60)