|
19 | 19 | import subprocess |
20 | 20 | import sys |
21 | 21 | import time |
| 22 | +from enum import Enum |
22 | 23 | from typing import Any |
23 | 24 |
|
24 | 25 | from re import Pattern |
25 | 26 |
|
| 27 | +# Import handlers |
| 28 | +try: |
| 29 | + from handlers import ( |
| 30 | + apply_json_suggestion, validate_json_suggestion, |
| 31 | + apply_yaml_suggestion, validate_yaml_suggestion, |
| 32 | + apply_toml_suggestion, validate_toml_suggestion |
| 33 | + ) |
| 34 | + HANDLERS_AVAILABLE = True |
| 35 | +except ImportError: |
| 36 | + HANDLERS_AVAILABLE = False |
| 37 | + |
26 | 38 | CR_DIR = pathlib.Path(".cr") |
27 | 39 | COMMENTS_JSON = CR_DIR / "pr-review-comments.json" |
28 | 40 | PATCH_DIR = CR_DIR / "patches" |
|
40 | 52 | RE_OPTION_HEADER = re.compile(r'\*\*([^*]+)\*\*\s*$', re.MULTILINE) |
41 | 53 |
|
42 | 54 |
|
| 55 | +class FileType(Enum): |
| 56 | + """File type enumeration for routing suggestions to appropriate handlers.""" |
| 57 | + PYTHON = "python" |
| 58 | + TYPESCRIPT = "typescript" |
| 59 | + JSON = "json" |
| 60 | + YAML = "yaml" |
| 61 | + TOML = "toml" |
| 62 | + PLAINTEXT = "plaintext" |
| 63 | + |
| 64 | + |
| 65 | +def detect_file_type(path: str) -> FileType: |
| 66 | + """Detect file type from extension.""" |
| 67 | + suffix = pathlib.Path(path).suffix.lower() |
| 68 | + mapping = { |
| 69 | + ".py": FileType.PYTHON, |
| 70 | + ".ts": FileType.TYPESCRIPT, |
| 71 | + ".tsx": FileType.TYPESCRIPT, |
| 72 | + ".js": FileType.TYPESCRIPT, |
| 73 | + ".jsx": FileType.TYPESCRIPT, |
| 74 | + ".json": FileType.JSON, |
| 75 | + ".yaml": FileType.YAML, |
| 76 | + ".yml": FileType.YAML, |
| 77 | + ".toml": FileType.TOML, |
| 78 | + } |
| 79 | + return mapping.get(suffix, FileType.PLAINTEXT) |
| 80 | + |
| 81 | + |
| 82 | +def route_suggestion(file_type: FileType, path: str, suggestion: str, |
| 83 | + start_line: int, end_line: int) -> bool: |
| 84 | + """Route suggestion to appropriate handler.""" |
| 85 | + if not HANDLERS_AVAILABLE: |
| 86 | + print(f"Warning: Handlers not available, using plaintext fallback for {path}") |
| 87 | + return apply_plaintext_suggestion(path, suggestion, start_line, end_line) |
| 88 | + |
| 89 | + if file_type == FileType.JSON: |
| 90 | + return apply_json_suggestion(path, suggestion, start_line, end_line) |
| 91 | + elif file_type == FileType.YAML: |
| 92 | + return apply_yaml_suggestion(path, suggestion, start_line, end_line) |
| 93 | + elif file_type == FileType.TOML: |
| 94 | + return apply_toml_suggestion(path, suggestion, start_line, end_line) |
| 95 | + else: |
| 96 | + return apply_plaintext_suggestion(path, suggestion, start_line, end_line) |
| 97 | + |
| 98 | + |
| 99 | +def validate_suggestion(file_type: FileType, path: str, suggestion: str, |
| 100 | + start_line: int, end_line: int) -> tuple[bool, str]: |
| 101 | + """Validate suggestion without applying it.""" |
| 102 | + if not HANDLERS_AVAILABLE: |
| 103 | + return True, "No validation available" |
| 104 | + |
| 105 | + if file_type == FileType.JSON: |
| 106 | + return validate_json_suggestion(path, suggestion, start_line, end_line) |
| 107 | + elif file_type == FileType.YAML: |
| 108 | + return validate_yaml_suggestion(path, suggestion, start_line, end_line) |
| 109 | + elif file_type == FileType.TOML: |
| 110 | + return validate_toml_suggestion(path, suggestion, start_line, end_line) |
| 111 | + else: |
| 112 | + return True, "No validation available" |
| 113 | + |
| 114 | + |
| 115 | +def apply_plaintext_suggestion(path: str, suggestion: str, |
| 116 | + start_line: int, end_line: int) -> bool: |
| 117 | + """Apply suggestion using the original plaintext method.""" |
| 118 | + file_path = pathlib.Path(path) |
| 119 | + if not file_path.exists(): |
| 120 | + return False |
| 121 | + |
| 122 | + original = read_lines(file_path) |
| 123 | + replacement = suggestion.split("\n") |
| 124 | + a = max(0, start_line - 1) |
| 125 | + b = max(0, end_line) |
| 126 | + |
| 127 | + if a > len(original): |
| 128 | + return False |
| 129 | + if b > len(original): |
| 130 | + b = len(original) |
| 131 | + |
| 132 | + new = original[:] |
| 133 | + new[a:b] = replacement |
| 134 | + write_lines(file_path, new) |
| 135 | + return True |
| 136 | + |
| 137 | + |
43 | 138 | def load_review_comments() -> list[dict[str, Any]]: |
44 | 139 | """Load review comments from JSON file.""" |
45 | 140 | if not COMMENTS_JSON.exists(): |
@@ -583,16 +678,37 @@ def main() -> None: |
583 | 678 | report.append(f"- SKIPPED overlapping change at L{start}-{end} (conflicts with earlier change)") |
584 | 679 | continue |
585 | 680 |
|
586 | | - replacement = text.split("\n") |
587 | | - a = max(0, start - 1); b = max(0, end) |
588 | | - if a > len(new): |
589 | | - report.append(f"- Skip suggestion at L{start}-{end}: out of bounds.") |
590 | | - continue |
591 | | - if b > len(new): |
592 | | - b = len(new) |
593 | | - before = new[:] |
594 | | - new[a:b] = replacement |
595 | | - changed = (before != new) |
| 681 | + # Detect file type and route to appropriate handler |
| 682 | + file_type = detect_file_type(path) |
| 683 | + |
| 684 | + # Validate suggestion first if in validation mode |
| 685 | + if args.validate: |
| 686 | + is_valid, validation_msg = validate_suggestion(file_type, path, text, start, end) |
| 687 | + if not is_valid: |
| 688 | + report.append(f"- VALIDATION FAILED at L{start}-{end}: {validation_msg}") |
| 689 | + continue |
| 690 | + else: |
| 691 | + report.append(f"- VALIDATION PASSED at L{start}-{end}: {validation_msg}") |
| 692 | + |
| 693 | + # Apply suggestion using appropriate handler |
| 694 | + if file_type in [FileType.JSON, FileType.YAML, FileType.TOML]: |
| 695 | + # Use specialized handler for structured files |
| 696 | + changed = route_suggestion(file_type, path, text, start, end) |
| 697 | + if changed: |
| 698 | + # Reload file content after handler modification |
| 699 | + new = read_lines(file_path) |
| 700 | + else: |
| 701 | + # Use original plaintext method for other files |
| 702 | + replacement = text.split("\n") |
| 703 | + a = max(0, start - 1); b = max(0, end) |
| 704 | + if a > len(new): |
| 705 | + report.append(f"- Skip suggestion at L{start}-{end}: out of bounds.") |
| 706 | + continue |
| 707 | + if b > len(new): |
| 708 | + b = len(new) |
| 709 | + before = new[:] |
| 710 | + new[a:b] = replacement |
| 711 | + changed = (before != new) |
596 | 712 | url = meta.get("url", "") |
597 | 713 | who = meta.get("author", "") |
598 | 714 | src = meta.get("source", "suggestion") |
|
0 commit comments