diff --git a/dk-installer.py b/dk-installer.py index 1fca6da..c100cc3 100755 --- a/dk-installer.py +++ b/dk-installer.py @@ -7,6 +7,7 @@ import datetime import functools import hashlib +import io import ipaddress import json import logging @@ -162,44 +163,55 @@ def get_installer_version(): return "N/A" -class StreamIterator: - def __init__(self, proc, stream, file_path): - self.proc = proc - self.stream = stream - self.file_path = file_path - self.file = None - self.bytes_written = 0 - - def __iter__(self): - return self - - def __next__(self): - for return_anyway in (False, True): - # We poll the process status before consuming the stream to make sure the StopIteration condition - # is not vulnerable to a race condition. - ret = self.proc.poll() - line = self.stream.readline() - if line: - if not self.file: - self.file = open(self.file_path, "wb") - self.file.write(line) - self.bytes_written += len(line) - return line - if ret is not None and not line: - raise StopIteration - if not return_anyway: - time.sleep(0.1) - return line +@contextlib.contextmanager +def stream_iterator(proc: subprocess.Popen, stream_name: str, file_path: pathlib.Path, timeout: float = 1.0): + comm_index, exc_attr = { + "stdout": (0, "output"), + "stderr": (1, "stderr"), + }[stream_name] + buffer = io.TextIOWrapper(io.BytesIO()) - def __enter__(self): - return self + def _iter(): + proc_exited = False + read_pos = 0 + while not proc_exited: + try: + partial = proc.communicate(timeout=timeout)[comm_index] + except subprocess.TimeoutExpired as exc: + partial = getattr(exc, exc_attr) + else: + proc_exited = True - def __exit__(self, exc_type, exc_val, exc_tb): - for _ in iter(self): + if partial is not None: + buffer.buffer.seek(0) + buffer.buffer.write(partial) + + buffer.seek(read_pos) + while True: + try: + line = buffer.readline() + # When some unicode char is incomplete, we skip yielding + except UnicodeDecodeError: + break + + # When the line is empty we skip yielding + # When the line is incomplete and the process is still running, we skip yielding + if not line or (not line.endswith(os.linesep) and not proc_exited): + break + + yield line.strip(os.linesep) + + read_pos = buffer.tell() + + iterator = _iter() + try: + yield iterator + finally: + # Making sure all output was consumed before writing the buffer to the file + for _ in iterator: pass - if self.file: - self.file.close() - return False + if buffer.buffer.tell(): + file_path.write_bytes(buffer.buffer.getvalue()) # @@ -651,17 +663,16 @@ def run_cmd( with self.start_cmd(*cmd, raise_on_non_zero=raise_on_non_zero, env=env, **popen_args) as (proc, stdout, stderr): if input: proc.stdin.write(input) - proc.stdin.close() if echo: for line in stdout: if line: - CONSOLE.msg(line.decode().strip()) + CONSOLE.msg(line) elif capture_text: - return b"".join(stdout).decode() + return "\n".join(stdout) elif capture_json: try: - return json.loads(b"".join(stdout).decode()) + return json.loads("".join(stdout)) except json.JSONDecodeError: LOG.warning("Error decoding JSON from stdout") return {} @@ -669,7 +680,7 @@ def run_cmd( json_lines = [] for idx, output_line in enumerate(stdout): try: - json_lines.append(json.loads(output_line.decode())) + json_lines.append(json.loads(output_line)) except json.JSONDecodeError: LOG.warning(f"Error decoding JSON from stdout line #{idx}") return json_lines @@ -701,15 +712,15 @@ def start_cmd(self, *cmd, raise_on_non_zero=True, env=None, **popen_args): slug_cmd = re.sub(r"[^a-zA-Z]+", "-", cmd_str)[:100].strip("-") - def get_stream_iterator(stream_name): - file_name = f"{self._cmd_idx:04d}-{stream_name}-{slug_cmd}.txt" - file_path = self.session_folder.joinpath(file_name) - return StreamIterator(proc, getattr(proc, stream_name), file_path) + stdout_path, stderr_path = [ + self.session_folder.joinpath(f"{self._cmd_idx:04d}-{stream_name}-{slug_cmd}.txt") + for stream_name in ("stdout", "stderr") + ] try: with ( - get_stream_iterator("stdout") as stdout_iter, - get_stream_iterator("stderr") as stderr_iter, + stream_iterator(proc, "stdout", stdout_path) as stdout_iter, + stream_iterator(proc, "stderr", stderr_path) as stderr_iter, ): try: yield proc, stdout_iter, stderr_iter @@ -724,12 +735,12 @@ def get_stream_iterator(stream_name): finally: elapsed = time.time() - started LOG.info( - "Command [%04d] returned [%d] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR", + "Command [%04d] returned [%s] in [%.3f] seconds. [%d] bytes in STDOUT, [%d] bytes in STDERR", self._cmd_idx, proc.returncode, elapsed, - stdout_iter.bytes_written, - stderr_iter.bytes_written, + stdout_path.stat().st_size if stdout_path.exists() else 0, + stderr_path.stat().st_size if stderr_path.exists() else 0, ) @@ -1455,65 +1466,65 @@ def get_parser(self, sub_parsers): def execute(self, args): CONSOLE.title("Expose Observability ports") - try: - with self.start_cmd( - "minikube", - "kubectl", - "--profile", - args.profile, - "--", - "--namespace", - args.namespace, - "--address", - "0.0.0.0", - "port-forward", - "service/observability-ui", - f"{args.port}:http", - raise_on_non_zero=False, - ) as (proc, stdout, stderr): - for output in stdout: - if output: - break - - if proc.poll() is None: - url = f"http://localhost:{args.port}" - for service, label in SERVICES_LABELS.items(): - CONSOLE.msg(f"{label:>20}: {SERVICES_URLS[service].format(url)}") - CONSOLE.space() - CONSOLE.msg("Listening on all interfaces (0.0.0.0)") - CONSOLE.msg("Keep this process running while using the above URLs") - CONSOLE.msg("Press Ctrl + C to stop exposing the ports") + success = False + with self.start_cmd( + "minikube", + "kubectl", + "--profile", + args.profile, + "--", + "--namespace", + args.namespace, + "--address", + "0.0.0.0", + "port-forward", + "service/observability-ui", + f"{args.port}:http", + raise_on_non_zero=False, + ) as (proc, stdout, stderr): + for output in stdout: + if output: + break - try: - with open(self.data_folder / DEMO_CONFIG_FILE, "r") as file: - json_config = json.load(file) - json_config["api_host"] = BASE_API_URL_TPL.format( - f"http://host.docker.internal:{args.port}" - ) - - with open(self.data_folder / DEMO_CONFIG_FILE, "w") as file: - file.write(json.dumps(json_config)) - except Exception: - LOG.exception(f"Unable to update {DEMO_CONFIG_FILE} file with exposed port") - else: - for output in stderr: - if output: - CONSOLE.msg(output.decode().strip()) - raise CommandFailed + if proc.poll() is None: + url = f"http://localhost:{args.port}" + for service, label in SERVICES_LABELS.items(): + CONSOLE.msg(f"{label:>20}: {SERVICES_URLS[service].format(url)}") + CONSOLE.space() + CONSOLE.msg("Listening on all interfaces (0.0.0.0)") + CONSOLE.msg("Keep this process running while using the above URLs") + CONSOLE.msg("Press Ctrl + C to stop exposing the ports") try: - while proc.poll() is None: - time.sleep(10) - except KeyboardInterrupt: - # The empty print forces the terminal cursor to move to the first column - print() + with open(self.data_folder / DEMO_CONFIG_FILE, "r") as file: + json_config = json.load(file) + json_config["api_host"] = BASE_API_URL_TPL.format(f"http://host.docker.internal:{args.port}") + + with open(self.data_folder / DEMO_CONFIG_FILE, "w") as file: + file.write(json.dumps(json_config)) + except Exception: + LOG.exception(f"Unable to update {DEMO_CONFIG_FILE} file with exposed port") - proc.terminate() + while True: + try: + proc.wait(10) + except subprocess.TimeoutExpired: + continue + except KeyboardInterrupt: + # The empty print forces the terminal cursor to move to the first column + print() + proc.terminate() + success = True + break + else: + break + if success: CONSOLE.msg("The services are no longer exposed.") + else: + for output in stderr: + CONSOLE.msg(output) - except Exception as e: - LOG.exception("Something went wrong exposing the services ports") CONSOLE.space() CONSOLE.msg("The platform could not have its ports exposed.") CONSOLE.msg( @@ -1521,7 +1532,7 @@ def execute(self, args): ) CONSOLE.space() CONSOLE.msg(f"If port {args.port} is in use, use the command option --port to specify an alternate value.") - raise AbortAction from e + raise AbortAction class ObsDeleteAction(Action): @@ -1558,7 +1569,7 @@ class DemoContainerAction(Action): requirements = [REQ_DOCKER, REQ_DOCKER_DAEMON] def run_dk_demo_container(self, command: str): - with self.start_cmd( + self.run_cmd( "docker", "run", "--rm", @@ -1572,14 +1583,8 @@ def run_dk_demo_container(self, command: str): "host.docker.internal:host-gateway", DEMO_IMAGE, command, - ) as (proc, stdout, stderr): - try: - for line in stdout: - if line: - CONSOLE.msg(line.decode().strip()) - except KeyboardInterrupt: - print("") - proc.terminate() + echo=True, + ) class ObsRunDemoAction(DemoContainerAction): diff --git a/tests/conftest.py b/tests/conftest.py index a8c05f1..0714b5a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -67,7 +67,7 @@ def popen_mock(proc_mock): @pytest.fixture def stream_iter_mock(): - with patch("tests.installer.StreamIterator") as si_mock: + with patch("tests.installer.stream_iterator") as si_mock: si_mock.__enter__.return_value = si_mock yield si_mock diff --git a/tests/test_action.py b/tests/test_action.py index 40adcdf..c3c5053 100644 --- a/tests/test_action.py +++ b/tests/test_action.py @@ -76,7 +76,7 @@ def test_get_failed_cmd_log(action, exc_levels, glob_side_effect, expected_calls @pytest.mark.unit def test_run_cmd_text(action, start_cmd_mock, stdout_mock, console_msg_mock): - stdout_mock.return_value = [b"hi there"] + stdout_mock.return_value = ["hi there"] result = action.run_cmd("cmd", capture_text=True) assert result == "hi there" console_msg_mock.assert_not_called() @@ -85,7 +85,7 @@ def test_run_cmd_text(action, start_cmd_mock, stdout_mock, console_msg_mock): @pytest.mark.unit def test_run_cmd_json(action, start_cmd_mock, stdout_mock): - stdout_mock.return_value = [b'{"foo": 123}'] + stdout_mock.return_value = ['{"foo": 123}'] result = action.run_cmd("cmd", capture_json=True) assert result == {"foo": 123} start_cmd_mock.assert_called_once() @@ -93,7 +93,7 @@ def test_run_cmd_json(action, start_cmd_mock, stdout_mock): @pytest.mark.unit def test_run_cmd_invalid_json(action, start_cmd_mock, stdout_mock): - stdout_mock.return_value = [b"no JSON here"] + stdout_mock.return_value = ["no JSON here"] result = action.run_cmd("cmd", capture_json=True) assert result == {} start_cmd_mock.assert_called_once() @@ -101,7 +101,7 @@ def test_run_cmd_invalid_json(action, start_cmd_mock, stdout_mock): @pytest.mark.unit def test_run_cmd_json_lines(action, start_cmd_mock, stdout_mock): - stdout_mock.return_value = [b'{"foo": 123}', b"something else", b'{"foo": 321}'] + stdout_mock.return_value = ['{"foo": 123}', "something else", '{"foo": 321}'] result = action.run_cmd("cmd", capture_json_lines=True) assert result == [{"foo": 123}, {"foo": 321}] start_cmd_mock.assert_called_once() @@ -109,7 +109,7 @@ def test_run_cmd_json_lines(action, start_cmd_mock, stdout_mock): @pytest.mark.unit def test_run_cmd_echo(action, start_cmd_mock, stdout_mock, console_msg_mock): - stdout_mock.return_value = [b"some output", b"will be echoed"] + stdout_mock.return_value = ["some output", "will be echoed"] result = action.run_cmd("cmd", echo=True) assert result is None assert console_msg_mock.call_count == 2 @@ -155,8 +155,8 @@ def test_start_cmd(action, popen_mock, stream_iter_mock): stream_iter_mock.assert_has_calls( [ - call(popen_mock(), popen_mock().stdout, ANY), - call(popen_mock(), popen_mock().stderr, ANY), + call(popen_mock(), "stdout", action.session_folder.joinpath()), + call(popen_mock(), "stderr", action.session_folder.joinpath()), call().__enter__(), call().__enter__(), call().__exit__(None, None, None), diff --git a/tests/test_obs_demo.py b/tests/test_obs_demo.py index bd4f862..0f77ac1 100644 --- a/tests/test_obs_demo.py +++ b/tests/test_obs_demo.py @@ -38,6 +38,8 @@ def test_obs_demo_action(action_class, arg_action, demo_cmd, args_mock, start_cm "host.docker.internal:host-gateway", "datakitchen/data-observability-demo:latest", demo_cmd, + raise_on_non_zero=True, + env=None, ), ], any_order=True, diff --git a/tests/test_obs_expose.py b/tests/test_obs_expose.py index 3095caa..8457976 100644 --- a/tests/test_obs_expose.py +++ b/tests/test_obs_expose.py @@ -1,10 +1,11 @@ import json +import subprocess from functools import partial from unittest.mock import call, patch import pytest -from tests.installer import ObsExposeAction, CommandFailed, AbortAction +from tests.installer import ObsExposeAction, AbortAction @pytest.fixture @@ -18,8 +19,9 @@ def obs_expose_action(action_cls, args_mock, tmp_data_folder, start_cmd_mock): @pytest.mark.integration def test_obs_expose(obs_expose_action, start_cmd_mock, stdout_mock, proc_mock, demo_config_path, console_msg_mock): - proc_mock.poll.side_effect = [None, 0] - stdout_mock.return_value = [b"some output"] + proc_mock.poll.return_value = None + stdout_mock.return_value = ["some output"] + proc_mock.wait.side_effect = [subprocess.TimeoutExpired("x", 2), KeyboardInterrupt] obs_expose_action.execute() @@ -42,6 +44,7 @@ def test_obs_expose(obs_expose_action, start_cmd_mock, stdout_mock, proc_mock, d ), ] ) + assert proc_mock.wait.call_count == 2 assert json.loads(demo_config_path.read_text()) == { "api_host": "http://host.docker.internal:8501/api", "api_key": "demo-api-key", @@ -58,8 +61,8 @@ def test_obs_expose(obs_expose_action, start_cmd_mock, stdout_mock, proc_mock, d @pytest.mark.integration -def test_obs_expose_abort(obs_expose_action, start_cmd_mock): - start_cmd_mock.__exit__.side_effect = CommandFailed +def test_obs_expose_abort(obs_expose_action, start_cmd_mock, stderr_mock): + stderr_mock.return_value = ["error output"] with pytest.raises(AbortAction): obs_expose_action.execute() diff --git a/tests/test_obs_install.py b/tests/test_obs_install.py index e800e59..38096e5 100644 --- a/tests/test_obs_install.py +++ b/tests/test_obs_install.py @@ -25,11 +25,11 @@ def test_obs_install(obs_install_action, start_cmd_mock, tmp_data_folder, stdout def _stdout_side_effect(): for idx in count(): if idx == 0: - yield [b"{}"] + yield ["{}"] elif idx == 7: - yield [b'[{"Name": "observability-ui", "URLs": ["http://localhost:8501"]}]'] + yield ['[{"Name": "observability-ui", "URLs": ["http://localhost:8501"]}]'] elif idx == 8: - yield [b'{"service_account_key": "demo-account-key", "project_id": "test-project-id"}'] + yield ['{"service_account_key": "demo-account-key", "project_id": "test-project-id"}'] else: yield [] @@ -112,7 +112,7 @@ def _stdout_side_effect(): @pytest.mark.integration def test_obs_existing_install_abort(obs_install_action, stdout_mock): - stdout_mock.side_effect = [[b'{"Name":"dk-observability","Host":"Running","Kubelet":"Running"}']] + stdout_mock.side_effect = [['{"Name":"dk-observability","Host":"Running","Kubelet":"Running"}']] with patch.object(obs_install_action, "steps", new=[MinikubeProfileStep]): with pytest.raises(AbortAction): obs_install_action.execute() diff --git a/tests/test_stream_iterator.py b/tests/test_stream_iterator.py new file mode 100644 index 0000000..6f45054 --- /dev/null +++ b/tests/test_stream_iterator.py @@ -0,0 +1,53 @@ +import itertools +import pathlib +import subprocess + +import pytest + +from tests.installer import stream_iterator, AbortAction, CommandFailed + + +@pytest.fixture +def popen_stdout_buffer(popen_mock): + buffer = "\n".join(["πŸ”·πŸ”ΆπŸ”ΊπŸ”»"[i % 4] + " xxxx" * 20 for i in range(100)]).encode() + popen_mock.communicate.side_effect = [ + *[subprocess.TimeoutExpired("cmd", 1, output=buffer[:idx]) for idx in range(0, len(buffer), 38)], + (buffer, b""), + ] + return buffer + + +@pytest.mark.unit +def test_stream_iterator(popen_mock, popen_stdout_buffer, tmp_logs_folder): + cmd_log_path = pathlib.Path(tmp_logs_folder) / "cmd-log.txt" + + with stream_iterator(popen_mock, "stdout", cmd_log_path) as stdout_iter: + for stdout_line, buffer_line in itertools.zip_longest(stdout_iter, popen_stdout_buffer.splitlines()): + assert stdout_line == buffer_line.decode() + + assert cmd_log_path.read_bytes() == popen_stdout_buffer + + +@pytest.mark.unit +@pytest.mark.parametrize("exception", (CommandFailed(2, "cmd", 1), AbortAction(), RuntimeError())) +def test_stream_iterator_exception(exception, popen_mock, popen_stdout_buffer, tmp_logs_folder): + cmd_log_path = pathlib.Path(tmp_logs_folder) / "cmd-log.txt" + + with pytest.raises(exception.__class__): + with stream_iterator(popen_mock, "stdout", cmd_log_path) as stdout_iter: + for _ in itertools.islice(stdout_iter, 200): + pass + raise exception + + assert cmd_log_path.read_bytes() == popen_stdout_buffer + + +@pytest.mark.unit +def test_stream_iterator_partially_consumed(popen_mock, popen_stdout_buffer, tmp_logs_folder): + cmd_log_path = pathlib.Path(tmp_logs_folder) / "cmd-log.txt" + + with stream_iterator(popen_mock, "stdout", cmd_log_path) as stdout_iter: + for _ in itertools.islice(stdout_iter, 200): + pass + + assert cmd_log_path.read_bytes() == popen_stdout_buffer diff --git a/tests/test_tg_delete.py b/tests/test_tg_delete.py index 2c190ed..1a2b7b6 100644 --- a/tests/test_tg_delete.py +++ b/tests/test_tg_delete.py @@ -20,7 +20,7 @@ def tg_delete_action(action_cls, args_mock, tmp_data_folder, start_cmd_mock): def test_tg_delete(fail_network, tg_delete_action, start_cmd_mock, stdout_mock): stdout_mock.side_effect = [ [], - [b'{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A", "Name": "postgresql"}'], + ['{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A", "Name": "postgresql"}'], [], ] start_cmd_mock.__exit__.side_effect = [CommandFailed if fail_network else None, None, None] @@ -76,7 +76,7 @@ def test_tg_delete_compose( def test_tg_delete_abort(tg_delete_action, start_cmd_mock, compose_path, stdout_mock, console_msg_mock): stdout_mock.side_effect = [ [], - [b'{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A", "Name": "postgresql"}'], + ['{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A", "Name": "postgresql"}'], [], ] start_cmd_mock.__exit__.side_effect = [None, None, CommandFailed] diff --git a/tests/test_tg_install.py b/tests/test_tg_install.py index fb6d07d..227ee2e 100644 --- a/tests/test_tg_install.py +++ b/tests/test_tg_install.py @@ -47,8 +47,8 @@ def test_tg_install(tg_install_action, start_cmd_mock, stdout_mock, tmp_data_fol @pytest.mark.parametrize( "stdout_effect", ( - [[b'[{"Name":"testgen","Status":"running(2)"}]'], []], - [[], [b'{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A"}']], + [['[{"Name":"testgen","Status":"running(2)"}]'], []], + [[], ['{"Labels":"com.docker.compose.project=testgen,", "Status":"N/A"}']], ), ids=("container", "volume"), ) diff --git a/tests/test_tg_run_demo.py b/tests/test_tg_run_demo.py index 56cee24..4d69b3c 100644 --- a/tests/test_tg_run_demo.py +++ b/tests/test_tg_run_demo.py @@ -19,7 +19,7 @@ def tg_run_demo_action(action_cls, args_mock, tmp_data_folder, start_cmd_mock): @pytest.mark.parametrize("obs_export", (False, True)) def test_tg_run_demo(obs_export, tg_run_demo_action, args_mock, start_cmd_mock, stdout_mock, compose_path, request): args_mock.obs_export = obs_export - stdout_mock.side_effect = [[b'[{"Name":"testgen","Status":"running(2)"}]']] + [[]] * 10 + stdout_mock.side_effect = [['[{"Name":"testgen","Status":"running(2)"}]']] + [[]] * 10 compose_args = ("docker", "compose", "-f", compose_path, "exec", "engine", "testgen") kwargs = dict(raise_on_non_zero=True, env=None) @@ -59,6 +59,8 @@ def test_tg_run_demo(obs_export, tg_run_demo_action, args_mock, start_cmd_mock, "host.docker.internal:host-gateway", "datakitchen/data-observability-demo:latest", "tg-run-demo", + raise_on_non_zero=True, + env=None, ), ] else: @@ -81,7 +83,7 @@ def test_tg_run_demo_abort_not_running(tg_run_demo_action, start_cmd_mock, conso @pytest.mark.integration def test_tg_run_demo_abort_missing_config(tg_run_demo_action, args_mock, start_cmd_mock, stdout_mock, console_msg_mock): - stdout_mock.side_effect = [[b'[{"Name":"testgen","Status":"running(2)"}]']] + [[]] * 10 + stdout_mock.side_effect = [['[{"Name":"testgen","Status":"running(2)"}]']] + [[]] * 10 args_mock.obs_export = True with pytest.raises(AbortAction): diff --git a/tests/test_tg_upgrade.py b/tests/test_tg_upgrade.py index d1accce..436696a 100644 --- a/tests/test_tg_upgrade.py +++ b/tests/test_tg_upgrade.py @@ -19,15 +19,15 @@ def tg_upgrade_action(action_cls, args_mock, tmp_data_folder, start_cmd_mock, re def tg_upgrade_stdout_side_effect(stdout_mock): side_effect = [ # Pre-execute calls - [b"TestGen 1.0.0\n"], # Version check + ["TestGen 1.0.0\n"], # Version check # Execute calls [], # Down [], # Pull [], # Up [], # Upgrade DB # Post-execute calls - [b"TestGen 1.1.0\n"], # Confirmation version check - [b"[]"], # Image data collection + ["TestGen 1.1.0\n"], # Confirmation version check + ["[]"], # Image data collection ] stdout_mock.side_effect = side_effect @@ -60,7 +60,7 @@ def get_compose_content(*extra_vars): def set_version_check_mock(version_check_mock, latest_version): version_check_mock.return_value.code = 200 - version_values = { "docker": {"datakitchen/dataops-testgen": latest_version } } + version_values = {"docker": {"datakitchen/dataops-testgen": latest_version}} version_check_mock.return_value.read.return_value = json.dumps(version_values).encode("utf-8")