|
| 1 | +"""String-manipulation utility functions for the notebook project.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import textwrap |
| 6 | +from string import templatelib |
| 7 | +from typing import TYPE_CHECKING |
| 8 | + |
| 9 | +if TYPE_CHECKING: |
| 10 | + from os import PathLike |
| 11 | + |
| 12 | + import pytest_subtests |
| 13 | + from pyfakefs.fake_filesystem import FakeFilesystem |
| 14 | + |
| 15 | + |
| 16 | +def indent(template: templatelib.Template) -> str: |
| 17 | + """Template-rendering function to render a t-string while preserving indentation, |
| 18 | + processing all the accustomed f-string formatters. |
| 19 | + References: https://www.pythonmorsels.com/t-strings-in-python/ |
| 20 | + https://github.com/t-strings/pep750-examples/blob/main/pep/fstring.py""" |
| 21 | + parts = [] |
| 22 | + indent = 0 |
| 23 | + for item in template: |
| 24 | + match item: |
| 25 | + case templatelib.Interpolation(value, _expression, conversion, format_spec): |
| 26 | + value = templatelib.convert(value, conversion) |
| 27 | + value = format(value, format_spec) |
| 28 | + for i, line in enumerate(value.splitlines(keepends=True)): |
| 29 | + parts.append(line if i == 0 else " " * indent + line) |
| 30 | + case str() as item: |
| 31 | + parts.extend(item.splitlines(keepends=True)) |
| 32 | + indent = len(parts[-1]) - len(parts[-1].lstrip()) |
| 33 | + case _: |
| 34 | + raise ValueError(f"Cannot happen: Unsupported item type: {type(item)}") |
| 35 | + return "".join(parts) |
| 36 | + |
| 37 | + |
| 38 | +class TestProcessTemplateWithIndents: |
| 39 | + def test_process_template_with_indents(self, subtests: pytest_subtests.plugin.SubTests) -> None: |
| 40 | + a = "a\na" |
| 41 | + b = "b\n b" |
| 42 | + test_cases = [ |
| 43 | + (t"", "", "empty string"), |
| 44 | + (t"a", "a", "single line"), |
| 45 | + (t"{a}", a, "single multiline substitution"), |
| 46 | + (t" {a}", textwrap.indent(a, prefix=" "), "substitution with leading whitespace"), |
| 47 | + (t" {b}", textwrap.indent(b, prefix=" "), "substitution whitespace before as well as inside"), |
| 48 | + ] |
| 49 | + for inp, expected, description in test_cases: |
| 50 | + with subtests.test(description): |
| 51 | + assert process_template_with_indents(inp) == expected |
| 52 | + |
| 53 | + |
| 54 | +def blockinfile( |
| 55 | + filename: str | PathLike, |
| 56 | + contents: str, |
| 57 | + prefix: str | None = None, |
| 58 | + *, |
| 59 | + comment: str = "#", |
| 60 | +) -> None: |
| 61 | + """This is similar to the functions in |
| 62 | + * https://homely.readthedocs.io/en/latest/ref/files.html#homely-files-blockinfile-1 |
| 63 | + * ansible.modules.lineinfile |
| 64 | + """ |
| 65 | + begin_marker = f"{comment * 3} BEGIN{' ' + prefix if prefix else ''}" |
| 66 | + end_marker = f"{comment * 3} END{' ' + prefix if prefix else ''}" |
| 67 | + |
| 68 | + begin = end = -1 |
| 69 | + try: |
| 70 | + with open(filename, "rt") as fp: |
| 71 | + original_lines = fp.readlines() |
| 72 | + except OSError as e: |
| 73 | + raise RuntimeError(f"Failed to read {filename}: {e}") from e |
| 74 | + for line_no, line in enumerate(original_lines): |
| 75 | + if line.rstrip() == begin_marker: |
| 76 | + begin = line_no |
| 77 | + elif line.rstrip() == end_marker: |
| 78 | + end = line_no |
| 79 | + |
| 80 | + if begin != -1 and end == -1: |
| 81 | + raise ValueError(f"Found begin marker but no matching end marker in {filename}") |
| 82 | + if begin == -1 and end != -1: |
| 83 | + raise ValueError(f"Found end marker but no matching begin marker in {filename}") |
| 84 | + if begin > end: |
| 85 | + raise ValueError(f"Begin marker appears after end marker in {filename}") |
| 86 | + |
| 87 | + lines = original_lines[:] |
| 88 | + # NOTE: textwrap.dedent() with raw strings leaves leading and trailing newline |
| 89 | + # we want to preserve the trailing one because HEREDOC has to have an empty trailing line for hadolint |
| 90 | + new_contents = contents.lstrip("\n").splitlines(keepends=True) |
| 91 | + if new_contents and new_contents[-1] == "\n": |
| 92 | + new_contents = new_contents[:-1] |
| 93 | + if begin == end == -1: |
| 94 | + # no markers found |
| 95 | + return |
| 96 | + else: |
| 97 | + lines[begin: end + 1] = [f"{begin_marker}\n", *new_contents, f"\n{end_marker}\n"] |
| 98 | + |
| 99 | + if lines == original_lines: |
| 100 | + return |
| 101 | + with open(filename, "wt") as fp: |
| 102 | + fp.writelines(lines) |
| 103 | + |
| 104 | + |
| 105 | +class TestBlockinfile: |
| 106 | + def test_adding_new_block(self, fs: FakeFilesystem): |
| 107 | + """the file should not be modified if there is no block already""" |
| 108 | + fs.create_file("/config.txt", contents="hello\nworld") |
| 109 | + |
| 110 | + blockinfile("/config.txt", "key=value") |
| 111 | + |
| 112 | + assert fs.get_object("/config.txt").contents == "hello\nworld" |
| 113 | + |
| 114 | + def test_updating_value_in_block(self, fs: FakeFilesystem): |
| 115 | + fs.create_file("/config.txt", contents="hello\nworld\n### BEGIN\nkey=value1\n### END\n") |
| 116 | + |
| 117 | + blockinfile("/config.txt", "key=value2") |
| 118 | + |
| 119 | + assert fs.get_object("/config.txt").contents == "hello\nworld\n### BEGIN\nkey=value2\n### END\n" |
| 120 | + |
| 121 | + def test_lastnewline_removal(self, fs: FakeFilesystem): |
| 122 | + fs.create_file("/config.txt", contents="hello\nworld\n### BEGIN\n### END\n") |
| 123 | + |
| 124 | + blockinfile("/config.txt", "key=value\n\n") |
| 125 | + |
| 126 | + assert fs.get_object("/config.txt").contents == "hello\nworld\n### BEGIN\nkey=value\n\n### END\n" |
0 commit comments