Skip to content

Commit e0e4b59

Browse files
committed
Add utility to find code blocks in RST files.
1 parent 9214ee7 commit e0e4b59

File tree

2 files changed

+474
-0
lines changed

2 files changed

+474
-0
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
# Author: Felix Fontein <[email protected]>
2+
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or
3+
# https://www.gnu.org/licenses/gpl-3.0.txt)
4+
# SPDX-License-Identifier: GPL-3.0-or-later
5+
# SPDX-FileCopyrightText: 2024, Ansible Project
6+
7+
"""
8+
Find code blocks in RST files.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import io
14+
import os
15+
import typing as t
16+
from collections.abc import Mapping
17+
from dataclasses import dataclass
18+
19+
from docutils import nodes
20+
from docutils.core import Publisher
21+
from docutils.io import StringInput
22+
from docutils.parsers.rst import Directive
23+
from docutils.parsers.rst.directives import register_directive
24+
from docutils.parsers.rst.directives import unchanged as directive_param_unchanged
25+
from docutils.utils import Reporter, SystemMessage
26+
27+
28+
class IgnoreDirective(Directive):
29+
"""
30+
Directive that simply ignores its content.
31+
"""
32+
33+
has_content = True
34+
35+
def run(self) -> list:
36+
return []
37+
38+
39+
def mark_antsibull_code_block(
40+
node: nodes.literal_block, *, language: str | None, line: int
41+
) -> None:
42+
"""
43+
Mark a literal block as an Antsibull code block with given language and line number.
44+
"""
45+
node["antsibull-code-language"] = language
46+
node["antsibull-code-block"] = True
47+
node["antsibull-code-lineno"] = line
48+
49+
50+
class CodeBlockDirective(Directive):
51+
"""
52+
Fake code block directive.
53+
54+
Acts similar to Sphinx's code block directives, except that it calls
55+
``mark_antsibull_code_block()`` on the generated literal blocks.
56+
"""
57+
58+
has_content = True
59+
optional_arguments = 1
60+
61+
# These are all options Sphinx allows for code blocks.
62+
# We need to have them here so that docutils successfully parses this extension.
63+
option_spec = {
64+
"caption": directive_param_unchanged,
65+
"class": directive_param_unchanged,
66+
"dedent": directive_param_unchanged,
67+
"emphasize-lines": directive_param_unchanged,
68+
"name": directive_param_unchanged,
69+
"force": directive_param_unchanged,
70+
"linenos": directive_param_unchanged,
71+
"lineno-start": directive_param_unchanged,
72+
}
73+
74+
def run(self) -> list[nodes.literal_block]:
75+
code = "\n".join(self.content)
76+
literal = nodes.literal_block(code, code)
77+
literal["classes"].append("code-block")
78+
mark_antsibull_code_block(
79+
literal,
80+
language=self.arguments[0] if self.arguments else None,
81+
line=self.lineno,
82+
)
83+
return [literal]
84+
85+
86+
class CodeBlockVisitor(nodes.SparseNodeVisitor):
87+
"""
88+
Visitor that calls callbacks for all code blocks.
89+
"""
90+
91+
def __init__(
92+
self,
93+
document: nodes.document,
94+
content: str,
95+
callback: t.Callable[[str, int, int, bool, str], None],
96+
warn_unknown_block: t.Callable[[int | str, int, nodes.literal_block], None],
97+
):
98+
super().__init__(document)
99+
self.__content_lines = content.splitlines()
100+
self.__callback = callback
101+
self.__warn_unknown_block = warn_unknown_block
102+
103+
def visit_system_message(self, node: nodes.system_message) -> None:
104+
"""
105+
Ignore system messages.
106+
"""
107+
raise nodes.SkipNode
108+
109+
def visit_error(self, node: nodes.error) -> None:
110+
"""
111+
Ignore errors.
112+
"""
113+
raise nodes.SkipNode
114+
115+
@staticmethod
116+
def _find_indent(content: str) -> int | None:
117+
min_indent = None
118+
for line in content.split("\n"):
119+
stripped_line = line.lstrip()
120+
if stripped_line:
121+
indent = len(line) - len(line.lstrip())
122+
if min_indent is None or min_indent > indent:
123+
min_indent = indent
124+
return min_indent
125+
126+
def _find_offset(self, lineno: int, content: str) -> tuple[int, int]:
127+
row_offset = lineno
128+
found_empty_line = False
129+
found_content_lines = False
130+
content_lines = content.count("\n") + 1
131+
min_indent = None
132+
for offset, line in enumerate(self.__content_lines[lineno:]):
133+
stripped_line = line.strip()
134+
if not stripped_line:
135+
if not found_empty_line:
136+
row_offset = lineno + offset + 1
137+
found_empty_line = True
138+
elif not found_content_lines:
139+
found_content_lines = True
140+
row_offset = lineno + offset
141+
142+
if found_content_lines and content_lines > 0:
143+
if stripped_line:
144+
indent = len(line) - len(line.lstrip())
145+
if min_indent is None or min_indent > indent:
146+
min_indent = indent
147+
content_lines -= 1
148+
elif not content_lines:
149+
break
150+
151+
min_source_indent = self._find_indent(content)
152+
col_offset = max(0, (min_indent or 0) - (min_source_indent or 0))
153+
return row_offset, col_offset
154+
155+
def _find_in_code(self, row_offset: int, col_offset: int, content: str) -> bool:
156+
for index, line in enumerate(content.split("\n")):
157+
if row_offset + index >= len(self.__content_lines):
158+
return False
159+
found_line = self.__content_lines[row_offset + index]
160+
if found_line[:col_offset].strip():
161+
return False
162+
eol = found_line[col_offset:]
163+
if eol[: len(line)] != line:
164+
return False
165+
if eol[len(line) :].strip():
166+
return False
167+
return True
168+
169+
def visit_literal_block(self, node: nodes.literal_block) -> None:
170+
"""
171+
Visit a code block.
172+
"""
173+
if "antsibull-code-block" not in node.attributes:
174+
if node.attributes["classes"]:
175+
# This could be a `::` block, or something else (unknown)
176+
self.__warn_unknown_block(node.line or "unknown", 0, node)
177+
raise nodes.SkipNode
178+
179+
language = node.attributes["antsibull-code-language"]
180+
lineno = node.attributes["antsibull-code-lineno"]
181+
row_offset, col_offset = self._find_offset(lineno, node.rawsource)
182+
self.__callback(
183+
language,
184+
row_offset,
185+
col_offset,
186+
self._find_in_code(row_offset, col_offset, node.rawsource),
187+
node.rawsource.rstrip() + "\n",
188+
)
189+
raise nodes.SkipNode
190+
191+
192+
_DIRECTIVES: dict[str, t.Type[Directive]] = {
193+
# Replace Sphinx code blocks with our code block directive:
194+
"code": CodeBlockDirective,
195+
"code-block": CodeBlockDirective,
196+
"sourcecode": CodeBlockDirective,
197+
# The following docutils directives should better be ignored:
198+
"parsed-literal": IgnoreDirective,
199+
}
200+
201+
202+
def _parse_document(
203+
content: str,
204+
*,
205+
path: str | os.PathLike[str] | None,
206+
root_prefix: str | os.PathLike[str] | None,
207+
directives: dict[str, t.Type[Directive]],
208+
) -> nodes.document:
209+
# pylint: disable-next=fixme
210+
# TODO: figure out how to register a directive only temporarily
211+
for directive_name, directive_class in directives.items():
212+
register_directive(directive_name, directive_class)
213+
214+
# We create a Publisher only to have a mechanism which gives us the settings object.
215+
# Doing this more explicit is a bad idea since the classes used are deprecated and will
216+
# eventually get replaced. Publisher.get_settings() looks like a stable enough API that
217+
# we can 'just use'.
218+
publisher = Publisher(source_class=StringInput)
219+
publisher.set_components("standalone", "restructuredtext", "pseudoxml")
220+
override = {
221+
"root_prefix": root_prefix,
222+
"input_encoding": "utf-8",
223+
"file_insertion_enabled": False,
224+
"raw_enabled": False,
225+
"_disable_config": True,
226+
"report_level": Reporter.ERROR_LEVEL,
227+
"warning_stream": io.StringIO(),
228+
}
229+
publisher.process_programmatic_settings(None, override, None)
230+
publisher.set_source(content, path)
231+
232+
# Parse the document
233+
try:
234+
return publisher.reader.read(
235+
publisher.source, publisher.parser, publisher.settings
236+
)
237+
except SystemMessage as exc:
238+
raise ValueError(f"Cannot parse document: {exc}") from exc
239+
except Exception as exc:
240+
raise ValueError(f"Unexpected error while parsing document: {exc}") from exc
241+
242+
243+
@dataclass
244+
class CodeBlockInfo:
245+
"""
246+
Information on a code block
247+
"""
248+
249+
# The code block's language (if known)
250+
language: str | None
251+
252+
# The code block's line and column offset
253+
row_offset: int
254+
col_offset: int
255+
256+
# Whether the code block's contents can be found as-is in the RST file,
257+
# only indented by whitespace, and with potentially trailing whitespace
258+
directly_in_content: bool
259+
260+
# The code block's contents
261+
content: str
262+
263+
264+
def find_code_blocks(
265+
content: str,
266+
*,
267+
path: str | os.PathLike[str] | None = None,
268+
root_prefix: str | os.PathLike[str] | None = None,
269+
extra_directives: Mapping[str, t.Type[Directive]] | None = None,
270+
warn_unknown_block: t.Callable[[int | str, int, str], None] | None = None,
271+
) -> t.Generator[CodeBlockInfo]:
272+
"""
273+
Given a RST document, finds all code blocks.
274+
"""
275+
directives = _DIRECTIVES.copy()
276+
if extra_directives:
277+
directives.update(extra_directives)
278+
279+
doc = _parse_document(
280+
content, directives=directives, path=path, root_prefix=root_prefix
281+
)
282+
283+
# If someone can figure out how to yield from a sub-function, we can avoid
284+
# using this ugly list
285+
results = []
286+
287+
def callback(
288+
language: str,
289+
row_offset: int,
290+
col_offset: int,
291+
directly_in_content: bool,
292+
content: str,
293+
) -> None:
294+
results.append(
295+
CodeBlockInfo(
296+
language=language,
297+
row_offset=row_offset,
298+
col_offset=col_offset,
299+
directly_in_content=directly_in_content,
300+
content=content,
301+
)
302+
)
303+
304+
def warn_unknown_block_cb(
305+
line: int | str,
306+
col: int,
307+
node: nodes.literal_block,
308+
) -> None:
309+
if warn_unknown_block:
310+
warn_unknown_block(line, col, node.rawsource)
311+
312+
# Process the document
313+
try:
314+
visitor = CodeBlockVisitor(doc, content, callback, warn_unknown_block_cb)
315+
doc.walk(visitor)
316+
except Exception as exc:
317+
raise ValueError(f"Cannot process document: {exc}") from exc
318+
finally:
319+
yield from results
320+
321+
322+
__all__ = ("CodeBlockInfo", "mark_antsibull_code_block", "find_code_blocks")

0 commit comments

Comments
 (0)