|
6 | 6 | import subprocess |
7 | 7 | import sys |
8 | 8 | from concurrent.futures import Future, ThreadPoolExecutor |
9 | | -from typing import Any, Callable, Dict, List, Optional, Tuple |
| 9 | +from typing import Callable, Dict, Optional, Tuple |
10 | 10 |
|
11 | 11 | from PySide6.QtCore import QObject, Signal |
12 | 12 |
|
|
33 | 33 | ) |
34 | 34 | from qgitc.basemodel import ValidationError |
35 | 35 | from qgitc.gitutils import Git |
| 36 | +from qgitc.tools.applypatch import DiffError, process_patch |
36 | 37 |
|
37 | 38 |
|
38 | 39 | class AgentToolResult: |
@@ -78,184 +79,6 @@ def _normalize_patch_path(path: str) -> str: |
78 | 79 | p = (path or "").strip().strip('"').strip("'") |
79 | 80 | return p |
80 | 81 |
|
81 | | - @staticmethod |
82 | | - def _read_text_file_lines(path: str) -> Tuple[List[str], bool, Optional[bytes], str]: |
83 | | - """Return (lines_without_newlines, endswith_newline, bom_bytes, encoding).""" |
84 | | - bom, encoding = AgentToolExecutor._detect_bom(path) |
85 | | - with open(path, 'r', encoding=encoding) as f: |
86 | | - text = f.read() |
87 | | - endswith_newline = text.endswith('\n') |
88 | | - lines = text.splitlines() |
89 | | - return lines, endswith_newline, bom, encoding |
90 | | - |
91 | | - @staticmethod |
92 | | - def _write_text_file_lines(path: str, lines: List[str], endswith_newline: bool, bom: Optional[bytes], encoding: str): |
93 | | - text = "\n".join(lines) |
94 | | - if endswith_newline: |
95 | | - text += "\n" |
96 | | - if bom is not None: |
97 | | - encoded = text.encode(encoding.replace('-sig', '')) |
98 | | - with open(path, 'wb') as f: |
99 | | - f.write(bom) |
100 | | - f.write(encoded) |
101 | | - else: |
102 | | - with open(path, 'w', encoding='utf-8', newline='\n') as f: |
103 | | - f.write(text) |
104 | | - |
105 | | - @staticmethod |
106 | | - def _find_subsequence(haystack: List[str], needle: List[str]) -> List[int]: |
107 | | - if not needle: |
108 | | - return [] |
109 | | - hits: List[int] = [] |
110 | | - n = len(needle) |
111 | | - for i in range(0, len(haystack) - n + 1): |
112 | | - if haystack[i:i + n] == needle: |
113 | | - hits.append(i) |
114 | | - return hits |
115 | | - |
116 | | - @staticmethod |
117 | | - def _parse_v4a_patch(patch_text: str) -> Tuple[bool, str, List[Tuple[str, str, List[str]]]]: |
118 | | - """Parse V4A patch and return list of (action, file_path, block_lines).""" |
119 | | - text = (patch_text or "").lstrip("\ufeff") |
120 | | - if "*** Begin Patch" not in text or "*** End Patch" not in text: |
121 | | - return False, "Patch must include '*** Begin Patch' and '*** End Patch'.", [] |
122 | | - |
123 | | - lines = text.splitlines() |
124 | | - try: |
125 | | - start = lines.index("*** Begin Patch") + 1 |
126 | | - end = lines.index("*** End Patch") |
127 | | - except ValueError: |
128 | | - return False, "Patch markers not found or malformed.", [] |
129 | | - |
130 | | - content = lines[start:end] |
131 | | - ops: List[Tuple[str, str, List[str]]] = [] |
132 | | - current: Optional[Dict[str, Any]] = None |
133 | | - |
134 | | - def flush(): |
135 | | - nonlocal current |
136 | | - if current is not None: |
137 | | - ops.append(( |
138 | | - str(current.get("action") or ""), |
139 | | - str(current.get("file_path") or ""), |
140 | | - list(current.get("lines") or []), |
141 | | - )) |
142 | | - current = None |
143 | | - |
144 | | - for line in content: |
145 | | - if line.startswith("*** ") and " File:" in line: |
146 | | - flush() |
147 | | - # Format: *** Update File: /path/to/file |
148 | | - parts = line.split(" File:", 1) |
149 | | - action_part = parts[0].replace("***", "").strip() |
150 | | - action = action_part.split()[0] |
151 | | - file_path = AgentToolExecutor._normalize_patch_path( |
152 | | - parts[1].lstrip(":").strip()) |
153 | | - current = {"action": action, |
154 | | - "file_path": file_path, "lines": []} |
155 | | - continue |
156 | | - |
157 | | - if current is None: |
158 | | - # Ignore stray lines between headers. |
159 | | - continue |
160 | | - |
161 | | - current["lines"].append(line) |
162 | | - |
163 | | - flush() |
164 | | - |
165 | | - # Basic validation |
166 | | - for action, file_path, _ in ops: |
167 | | - if action not in ("Add", "Update", "Delete"): |
168 | | - return False, f"Unsupported patch action: {action}", [] |
169 | | - if not file_path: |
170 | | - return False, "Patch contains an empty file path.", [] |
171 | | - |
172 | | - if not ops: |
173 | | - return False, "Patch contains no file operations.", [] |
174 | | - |
175 | | - return True, "ok", ops |
176 | | - |
177 | | - @staticmethod |
178 | | - def _apply_v4a_update(abs_path: str, block_lines: List[str]) -> Tuple[bool, str]: |
179 | | - if not os.path.isfile(abs_path): |
180 | | - return False, f"File does not exist for update: {abs_path}" |
181 | | - |
182 | | - file_lines, endswith_newline, bom, encoding = AgentToolExecutor._read_text_file_lines( |
183 | | - abs_path) |
184 | | - |
185 | | - # Build edit blocks. |
186 | | - context: List[str] = [] |
187 | | - edits = [] # list of (pre, old, new, post) |
188 | | - in_change = False |
189 | | - pre: List[str] = [] |
190 | | - old: List[str] = [] |
191 | | - new: List[str] = [] |
192 | | - post: List[str] = [] |
193 | | - |
194 | | - def finalize(): |
195 | | - nonlocal in_change, pre, old, new, post |
196 | | - if in_change and (old or new): |
197 | | - edits.append((pre[-3:], old, new, post[:3])) |
198 | | - in_change = False |
199 | | - pre = [] |
200 | | - old = [] |
201 | | - new = [] |
202 | | - post = [] |
203 | | - |
204 | | - for line in block_lines: |
205 | | - if line.startswith("@@"): |
206 | | - continue |
207 | | - if line.startswith("-") or line.startswith("+"): |
208 | | - if not in_change: |
209 | | - pre = context[:] # snapshot |
210 | | - in_change = True |
211 | | - if post: |
212 | | - # A new change block starts after post-context. |
213 | | - finalize() |
214 | | - pre = context[:] |
215 | | - in_change = True |
216 | | - if line.startswith("-"): |
217 | | - old.append(line[1:]) |
218 | | - else: |
219 | | - new.append(line[1:]) |
220 | | - continue |
221 | | - |
222 | | - # Plain context line |
223 | | - if in_change and (old or new): |
224 | | - post.append(line) |
225 | | - context.append(line) |
226 | | - |
227 | | - finalize() |
228 | | - |
229 | | - if not edits: |
230 | | - return False, "No edits found in Update block (need '-'/'+' lines)." |
231 | | - |
232 | | - # Apply edits sequentially. |
233 | | - for pre3, old_lines, new_lines, post3 in edits: |
234 | | - target = pre3 + old_lines + post3 |
235 | | - hits = AgentToolExecutor._find_subsequence( |
236 | | - file_lines, target) if target else [] |
237 | | - if len(hits) == 1: |
238 | | - i = hits[0] + len(pre3) |
239 | | - file_lines[i:i + len(old_lines)] = new_lines |
240 | | - continue |
241 | | - |
242 | | - # Fallback: match old_lines only. |
243 | | - hits_old = AgentToolExecutor._find_subsequence( |
244 | | - file_lines, old_lines) |
245 | | - if len(hits_old) == 1: |
246 | | - i = hits_old[0] |
247 | | - file_lines[i:i + len(old_lines)] = new_lines |
248 | | - continue |
249 | | - |
250 | | - if not hits and not hits_old: |
251 | | - snippet = "\\n".join(old_lines[:10]) |
252 | | - return False, f"Failed to apply edit: old content not found. Snippet:\n{snippet}" |
253 | | - return False, "Failed to apply edit: patch context is ambiguous (multiple matches)." |
254 | | - |
255 | | - AgentToolExecutor._write_text_file_lines( |
256 | | - abs_path, file_lines, endswith_newline, bom, encoding) |
257 | | - return True, "Patch applied." |
258 | | - |
259 | 82 | @staticmethod |
260 | 83 | def _resolve_repo_path(repo_dir: str, file_path: str) -> Tuple[bool, str]: |
261 | 84 | """Resolve file_path against repo_dir and ensure it stays within the repo.""" |
@@ -716,43 +539,45 @@ def _handle_apply_patch(self, tool_name: str, params: Dict) -> AgentToolResult: |
716 | 539 | if not patch_text.strip(): |
717 | 540 | return AgentToolResult(tool_name, False, "Patch is empty.") |
718 | 541 |
|
719 | | - ok, msg, ops = self._parse_v4a_patch(patch_text) |
720 | | - if not ok: |
721 | | - return AgentToolResult(tool_name, False, msg) |
| 542 | + def _open_file(path: str) -> str: |
| 543 | + file_path = self._normalize_patch_path(path) |
| 544 | + ok, abs_path = self._resolve_repo_path(repo_dir, file_path) |
| 545 | + if not ok: |
| 546 | + raise DiffError(abs_path) |
| 547 | + _, encoding = AgentToolExecutor._detect_bom(abs_path) |
| 548 | + with open(abs_path, 'r', encoding=encoding) as f: |
| 549 | + return f.read() |
722 | 550 |
|
723 | | - applied = 0 |
724 | | - for action, path, block in ops: |
| 551 | + def _write_file(path: str, content: str): |
725 | 552 | file_path = self._normalize_patch_path(path) |
726 | | - ok2, abs_path = self._resolve_repo_path(repo_dir, file_path) |
727 | | - if not ok2: |
728 | | - return AgentToolResult(tool_name, False, abs_path) |
729 | | - |
730 | | - if action == "Add": |
731 | | - if os.path.exists(abs_path): |
732 | | - return AgentToolResult(tool_name, False, f"File already exists: {abs_path}") |
733 | | - parent = os.path.dirname(abs_path) |
734 | | - os.makedirs(parent, exist_ok=True) |
735 | | - # For Add, treat '+' lines as file content. |
736 | | - content_lines = [ln[1:] for ln in block if ln.startswith("+")] |
737 | | - with open(abs_path, 'w', encoding='utf-8', newline='\n') as f: |
738 | | - f.write("\n".join(content_lines) + |
739 | | - ("\n" if content_lines else "")) |
740 | | - applied += 1 |
741 | | - continue |
742 | | - |
743 | | - if action == "Delete": |
744 | | - if not os.path.exists(abs_path): |
745 | | - return AgentToolResult(tool_name, False, f"File does not exist for delete: {abs_path}") |
746 | | - if os.path.isdir(abs_path): |
747 | | - return AgentToolResult(tool_name, False, f"Refusing to delete a directory: {abs_path}") |
748 | | - os.remove(abs_path) |
749 | | - applied += 1 |
750 | | - continue |
751 | | - |
752 | | - # Update |
753 | | - ok3, msg3 = self._apply_v4a_update(abs_path, block) |
754 | | - if not ok3: |
755 | | - return AgentToolResult(tool_name, False, msg3) |
756 | | - applied += 1 |
757 | | - |
758 | | - return AgentToolResult(tool_name, True, f"Applied {applied} patch operation(s).") |
| 553 | + ok, abs_path = self._resolve_repo_path(repo_dir, file_path) |
| 554 | + if not ok: |
| 555 | + raise DiffError(abs_path) |
| 556 | + parent = os.path.dirname(abs_path) |
| 557 | + os.makedirs(parent, exist_ok=True) |
| 558 | + bom, encoding = AgentToolExecutor._detect_bom(abs_path) |
| 559 | + if bom: |
| 560 | + encoded = content.encode(encoding.replace('-sig', '')) |
| 561 | + with open(abs_path, 'wb') as f: |
| 562 | + f.write(bom) |
| 563 | + f.write(encoded) |
| 564 | + else: |
| 565 | + with open(abs_path, 'wt', encoding='utf-8') as f: |
| 566 | + f.write(content) |
| 567 | + |
| 568 | + def _remove_file(path: str): |
| 569 | + file_path = self._normalize_patch_path(path) |
| 570 | + ok, abs_path = self._resolve_repo_path(repo_dir, file_path) |
| 571 | + if not ok: |
| 572 | + raise DiffError(abs_path) |
| 573 | + if os.path.isfile(abs_path): |
| 574 | + os.unlink(abs_path) |
| 575 | + |
| 576 | + try: |
| 577 | + message = process_patch( |
| 578 | + patch_text, _open_file, _write_file, _remove_file) |
| 579 | + return AgentToolResult(tool_name, True, message) |
| 580 | + except DiffError as e: |
| 581 | + return AgentToolResult(tool_name, False, str(e)) |
| 582 | + except Exception as e: |
| 583 | + return AgentToolResult(tool_name, False, f"Failed to apply patch: {e}") |
0 commit comments