Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions pacta/rules/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,11 @@ def parse_file(self, path: str | Path) -> RulesDocumentAst:
def parse_text(self, text: str, *, filename: str | None = None) -> RulesDocumentAst:
filename = filename or "<rules>"

blocks = self._split_rule_blocks(text)
rules: list[RuleAst] = []

for i, block in enumerate(blocks, start=1):
try:
rules.append(self._parse_rule_block(block, filename=filename))
except RulesParseError:
raise
except Exception as e:
raise RulesParseError(
message=f"Failed to parse rule block #{i}: {e}",
file=filename,
) from e

return RulesDocumentAst(rules=tuple(rules), span=SourceSpan(file=filename))

def _parse_rule_block(self, block: str, *, filename: str) -> RuleAst:
parsed_rules = self._parse_rule_block(text,filename=filename)

return RulesDocumentAst(rules=tuple(parsed_rules), span=SourceSpan(file=filename))

def _parse_rule_block(self, block: str, *, filename: str) -> list[RuleAst]:
try:
import yaml
except ImportError as e:
Expand All @@ -109,13 +97,26 @@ def _parse_rule_block(self, block: str, *, filename: str) -> RuleAst:
except Exception as e:
raise RulesParseError(message=f"Invalid YAML in rule block: {e}", file=filename) from e

if not isinstance(doc, Mapping) or doc.get("rule") is None:
raise RulesParseError(message="Rule block must contain top-level 'rule:' mapping", file=filename)
if not isinstance(doc, Mapping) or doc.get("rules") is None:
raise RulesParseError(message="Rule block must contain top-level 'rules:' list", file=filename)

rules_list = doc["rules"]
if not isinstance(rules_list, list):
raise RulesParseError(message="'rules list' must be a list", file=filename)

parsed_rules = []

root = doc["rule"]
if not isinstance(root, Mapping):
raise RulesParseError(message="'rule' must be a mapping", file=filename)
for root in rules_list:
if not isinstance(root, Mapping):
raise RulesParseError(message="'rule' must be a mapping", file=filename)
rule_item = self._add_rule_in_list(root, filename = filename)
parsed_rules.append(rule_item)

return parsed_rules



def _add_rule_in_list(self, root: Mapping, *, filename:str) -> RuleAst:
# Required fields
rid = self._req_str(root, "id", filename)
name = self._req_str(root, "name", filename)
Expand Down Expand Up @@ -152,10 +153,6 @@ def _parse_rule_block(self, block: str, *, filename: str) -> RuleAst:
metadata={"parser": "dsl-v0+pyyaml"},
)

# Keep your existing _split_rule_blocks, _parse_when_predicate, _parse_pred_item,
# _parse_inline_compare, _parse_literal, _req_str, _opt_str as-is (they already handle
# strings like "from.layer == domain" and nested any/all/not).

def _split_rule_blocks(self, text: str) -> list[str]:
lines = [ln.rstrip("\n") for ln in text.splitlines()]
blocks: list[list[str]] = []
Expand All @@ -177,7 +174,7 @@ def flush():
filtered = []
for b in blocks:
first = next((ln for ln in b if ln.strip() and not ln.strip().startswith("#")), "")
if first.strip() != "rule:":
if first.strip() != "rules:":
continue
filtered.append("\n".join(b))
return filtered
Expand Down
Loading
Loading