2
2
3
3
import hashlib
4
4
import os
5
- from typing import Any , Dict , List , Optional , Tuple , TypedDict
5
+ from typing import Any , Dict , List , Optional , Tuple
6
6
7
-
8
- class Range (TypedDict ):
9
- """Represents a line range in a file."""
10
-
11
- start : int
12
- end : Optional [int ]
13
-
14
-
15
- class FileRanges (TypedDict ):
16
- """Represents a file and its line ranges."""
17
-
18
- file_path : str
19
- ranges : List [Range ]
20
-
21
-
22
- class EditPatch (TypedDict ):
23
- """Represents a patch to be applied to a file."""
24
-
25
- contents : str
26
- line_start : int
27
- line_end : Optional [int ]
7
+ from mcp_text_editor .models import EditPatch , FileRanges
28
8
29
9
30
10
class TextEditor :
@@ -106,21 +86,22 @@ async def _read_file(
106
86
) from err
107
87
108
88
async def read_multiple_ranges (
109
- self , ranges : List [FileRanges ], encoding : str = "utf-8"
89
+ self , ranges : List [Dict [ str , Any ] ], encoding : str = "utf-8"
110
90
) -> Dict [str , Dict [str , Any ]]:
111
91
result : Dict [str , Dict [str , Any ]] = {}
112
92
113
- for file_range in ranges :
114
- file_path = file_range ["file_path" ]
93
+ for file_range_dict in ranges :
94
+ file_range = FileRanges .model_validate (file_range_dict )
95
+ file_path = file_range .file_path
115
96
lines , file_content , total_lines = await self ._read_file (
116
97
file_path , encoding = encoding
117
98
)
118
99
file_hash = self .calculate_hash (file_content )
119
100
result [file_path ] = {"ranges" : [], "file_hash" : file_hash }
120
101
121
- for range_spec in file_range [ " ranges" ] :
122
- line_start = max (1 , range_spec [ " start" ] ) - 1
123
- end_value = range_spec .get ( " end" )
102
+ for range_spec in file_range . ranges :
103
+ line_start = max (1 , range_spec . start ) - 1
104
+ end_value = range_spec .end
124
105
line_end = (
125
106
min (total_lines , end_value )
126
107
if end_value is not None
@@ -209,8 +190,8 @@ async def edit_file_contents(
209
190
Args:
210
191
file_path (str): Path to the file to edit
211
192
expected_hash (str): Expected hash of the file before editing
212
- patches (List[Dict[str, Any]] ): List of patches to apply, each containing:
213
- - line_start (int): Starting line number (1-based)
193
+ patches (List[EditPatch] ): List of patches to apply
194
+ - line_start (int): Starting line number (1-based, optional, default: 1 )
214
195
- line_end (Optional[int]): Ending line number (inclusive)
215
196
- contents (str): New content to insert
216
197
Edit file contents with hash-based conflict detection and multiple patches (supporting new file creation).
@@ -291,12 +272,15 @@ async def edit_file_contents(
291
272
else :
292
273
lines = current_content .splitlines (keepends = True )
293
274
275
+ # Convert patches to EditPatch objects
276
+ patch_objects = [EditPatch .model_validate (p ) for p in patches ]
277
+
294
278
# Sort patches from bottom to top to avoid line number shifts
295
279
sorted_patches = sorted (
296
- patches ,
280
+ patch_objects ,
297
281
key = lambda x : (
298
- - (x .get ( " line_start" , 1 ) ),
299
- - (x .get ( " line_end" , x . get ( " line_start" , 1 )) or float ("inf" )),
282
+ - (x .line_start ),
283
+ - (x .line_end or x . line_start or float ("inf" )),
300
284
),
301
285
)
302
286
@@ -305,10 +289,10 @@ async def edit_file_contents(
305
289
for j in range (i + 1 , len (sorted_patches )):
306
290
patch1 = sorted_patches [i ]
307
291
patch2 = sorted_patches [j ]
308
- start1 = patch1 .get ( " line_start" , 1 )
309
- end1 = patch1 .get ( " line_end" , start1 )
310
- start2 = patch2 .get ( " line_start" , 1 )
311
- end2 = patch2 .get ( " line_end" , start2 )
292
+ start1 = patch1 .line_start
293
+ end1 = patch1 .line_end or start1
294
+ start2 = patch2 .line_start
295
+ end2 = patch2 .line_end or start2
312
296
313
297
if (start1 <= end2 and end1 >= start2 ) or (
314
298
start2 <= end1 and end2 >= start1
@@ -323,8 +307,14 @@ async def edit_file_contents(
323
307
# Apply patches
324
308
for patch in sorted_patches :
325
309
# Get line numbers (1-based)
326
- line_start = patch .get ("line_start" , 1 )
327
- line_end = patch .get ("line_end" , line_start )
310
+ line_start : int
311
+ line_end : Optional [int ]
312
+ if isinstance (patch , EditPatch ):
313
+ line_start = patch .line_start
314
+ line_end = patch .line_end
315
+ else :
316
+ line_start = patch ["line_start" ] if "line_start" in patch else 1
317
+ line_end = patch ["line_end" ] if "line_end" in patch else line_start
328
318
329
319
# Check for invalid line range
330
320
if line_end is not None and line_end < line_start :
@@ -348,100 +338,97 @@ async def edit_file_contents(
348
338
"content" : current_content ,
349
339
}
350
340
351
- # Get expected hash for validation
352
- expected_range_hash = patch .get ("range_hash" )
353
-
354
- # Determine if this is an insertion operation
355
- # Cases:
356
- # 1. New file
357
- # 2. Empty file
358
- # 3. Empty range_hash (explicit insertion)
359
- is_insertion = (
360
- not os .path .exists (file_path )
361
- or not current_content
362
- or expected_range_hash == ""
363
- or patch .get ("range_hash" ) == self .calculate_hash ("" )
341
+ # Calculate line ranges for zero-based indexing
342
+ line_start_zero = line_start - 1
343
+ line_end_zero = (
344
+ len (lines ) - 1
345
+ if line_end is None
346
+ else min (line_end - 1 , len (lines ) - 1 )
364
347
)
365
348
366
- # Skip range_hash check for insertions
367
- if is_insertion :
368
- expected_range_hash = ""
369
-
370
- # For existing, non-empty files and non-insertions, range_hash is required
371
- if is_insertion :
372
- expected_range_hash = ""
373
-
374
- # For existing, non-empty files and non-insertions, range_hash is required
375
- if not os .path .exists (file_path ) or not current_content or is_insertion :
376
- expected_range_hash = ""
377
-
378
- # For existing, non-empty files and non-insertions, range_hash is required
379
- elif not expected_range_hash :
380
- return {
381
- "result" : "error" ,
382
- "reason" : "range_hash is required for each patch (except for new files and insertions)" ,
383
- "file_hash" : None ,
384
- "content" : current_content ,
385
- }
349
+ # Get expected hash for validation
350
+ expected_range_hash = None
351
+ if isinstance (patch , dict ):
352
+ expected_range_hash = patch .get ("range_hash" )
353
+ else :
354
+ # For EditPatch objects, use model fields
355
+ expected_range_hash = patch .range_hash
386
356
387
- # Handle insertion or replacement
388
- if is_insertion :
389
- target_content = "" # For insertion, we verify empty content
390
- line_end = line_start # For insertion operations
357
+ # Determine operation type and validate hash requirements
358
+ if not os . path . exists ( file_path ) or not current_content :
359
+ # New file or empty file - treat as insertion
360
+ is_insertion = True
391
361
else :
392
- # Convert to 0-based indexing for existing content
393
- line_start_zero = line_start - 1
394
- line_end_zero = (
395
- len (lines )
396
- if line_end is None
397
- else min (line_end - 1 , len (lines ) - 1 )
398
- )
362
+ # For existing files:
363
+ # range_hash is required for all modifications
364
+ if not expected_range_hash :
365
+ return {
366
+ "result" : "error" ,
367
+ "reason" : "range_hash is required for file modifications" ,
368
+ "file_hash" : None ,
369
+ "content" : current_content ,
370
+ }
399
371
400
- # Calculate target content for hash verification
401
- if line_start_zero >= len (lines ):
402
- target_content = ""
403
- else :
404
- target_lines = lines [line_start_zero : line_end_zero + 1 ]
405
- target_content = "" .join (target_lines )
372
+ # Hash provided - verify content
373
+ target_lines = lines [line_start_zero : line_end_zero + 1 ]
374
+ target_content = "" .join (target_lines )
375
+ actual_range_hash = self .calculate_hash (target_content )
376
+
377
+ # Debug output for hash comparison
378
+ print (
379
+ f"Debug - Range hash comparison:"
380
+ f"\n Expected: { expected_range_hash } "
381
+ f"\n Actual: { actual_range_hash } "
382
+ f"\n Content: { target_content !r} "
383
+ f"\n Range: { line_start_zero } :{ line_end_zero + 1 } "
384
+ )
406
385
407
- # Calculate actual range hash and verify only for non-insertions
408
- actual_range_hash = self .calculate_hash (target_content )
409
- if not is_insertion and actual_range_hash != expected_range_hash :
410
- return {
411
- "result" : "error" ,
412
- "reason" : f"Range hash mismatch for lines { line_start } -{ line_end if line_end else len (lines )} ({ actual_range_hash } != { expected_range_hash } )" ,
413
- "file_hash" : None ,
414
- "content" : current_content ,
415
- }
386
+ # Compare hashes
387
+ # Empty range_hash means explicit insertion
388
+ is_insertion = (
389
+ not expected_range_hash
390
+ or expected_range_hash == self .calculate_hash ("" )
391
+ )
416
392
417
- # Convert to 0-based indexing
418
- line_start = line_start - 1
393
+ # For non-insertion operations, verify content hash
394
+ if not is_insertion :
395
+ if actual_range_hash != expected_range_hash :
396
+ print (
397
+ f"Debug - Hash mismatch:\n "
398
+ f"Expected hash: { expected_range_hash } \n "
399
+ f"Actual hash: { actual_range_hash } \n "
400
+ f"Content being replaced: { target_content !r} \n "
401
+ f"Range: { line_start_zero } :{ line_end_zero + 1 } "
402
+ )
403
+ return {
404
+ "result" : "error" ,
405
+ "reason" : "Content hash mismatch - file has been modified" ,
406
+ "file_hash" : None ,
407
+ "content" : current_content ,
408
+ }
419
409
420
- # Calculate effective end line for operations
421
- if is_insertion :
422
- effective_line_end = line_start
410
+ # Prepare new content
411
+ contents : str
412
+ if isinstance (patch , EditPatch ):
413
+ contents = patch .contents
423
414
else :
424
- effective_line_end = (
425
- len (lines ) if line_end is None else line_end - 1
426
- )
415
+ contents = patch ["contents" ]
427
416
428
- # Prepare new content
429
- new_content = patch ["contents" ]
430
- if not new_content .endswith ("\n " ):
431
- new_content += "\n "
417
+ new_content = contents if contents .endswith ("\n " ) else contents + "\n "
432
418
new_lines = new_content .splitlines (keepends = True )
433
419
434
- # Apply changes depending on operation type
420
+ # Apply changes - line ranges were calculated earlier
435
421
if is_insertion :
436
- lines [line_start :line_start ] = new_lines
422
+ # Insert at the specified line
423
+ lines [line_start_zero :line_start_zero ] = new_lines
437
424
else :
438
- # For replacement, we replace the range
439
- effective_line_end = len (lines ) if line_end is None else line_end
440
- lines [line_start :effective_line_end ] = new_lines
425
+ # Replace the specified range
426
+ lines [line_start_zero : line_end_zero + 1 ] = new_lines
441
427
442
- print (f"patch: { patch } " )
443
- print (f"is_insertion: { is_insertion } " )
444
- print (f"is_insertion: { is_insertion } " )
428
+ # Debug output - shows the operation type
429
+ print (
430
+ f"Applied patch: line_start={ line_start } line_end={ line_end } is_insertion={ is_insertion } contents={ patch .contents !r} "
431
+ )
445
432
446
433
# Write the final content back to file
447
434
final_content = "" .join (lines )
0 commit comments