@@ -48,68 +48,6 @@ def _validate_file_path(self, file_path: str) -> None:
48
48
if ".." in file_path :
49
49
raise ValueError ("Path traversal not allowed" )
50
50
51
- def _detect_encoding (self , file_path : str ) -> str :
52
- """
53
- Detect file encoding with Shift-JIS prioritized.
54
-
55
- Args:
56
- file_path (str): Path to the file
57
-
58
- Returns:
59
- str: Detected encoding, falls back to utf-8 if detection fails.
60
- """
61
-
62
- def try_decode (data : bytes , encoding : str ) -> bool :
63
- """Try to decode data with the given encoding."""
64
- try :
65
- data .decode (encoding )
66
- return True
67
- except UnicodeDecodeError :
68
- return False
69
-
70
- # Read file content for encoding detection
71
- try :
72
- with open (file_path , "rb" ) as f :
73
- raw_data = f .read ()
74
-
75
- # Try encodings in order of priority
76
- if try_decode (raw_data , "shift_jis" ):
77
- return "shift_jis"
78
- if try_decode (raw_data , "utf-8" ):
79
- return "utf-8"
80
-
81
- # As a last resort, use chardet
82
- try :
83
- import chardet
84
-
85
- result = chardet .detect (raw_data )
86
- encoding = result .get ("encoding" ) or ""
87
- encoding = encoding .lower ()
88
-
89
- if encoding :
90
- # Map encoding aliases
91
- if encoding in [
92
- "shift_jis" ,
93
- "shift-jis" ,
94
- "shiftjis" ,
95
- "sjis" ,
96
- "csshiftjis" ,
97
- ]:
98
- return "shift_jis"
99
- if encoding in ["ascii" ]:
100
- return "utf-8"
101
- # Try detected encoding
102
- if try_decode (raw_data , encoding ):
103
- return encoding
104
- except ImportError :
105
- pass
106
-
107
- # Fall back to UTF-8
108
- return "utf-8"
109
-
110
- except (IOError , OSError , UnicodeDecodeError ):
111
- return "utf-8"
112
-
113
51
@staticmethod
114
52
def calculate_hash (content : str ) -> str :
115
53
"""
@@ -123,26 +61,49 @@ def calculate_hash(content: str) -> str:
123
61
"""
124
62
return hashlib .sha256 (content .encode ()).hexdigest ()
125
63
126
- async def _read_file (self , file_path : str ) -> Tuple [List [str ], str , int ]:
127
- """Read file and return lines, content, and total lines."""
64
+ async def _read_file (
65
+ self , file_path : str , encoding : str = "utf-8"
66
+ ) -> Tuple [List [str ], str , int ]:
67
+ """Read file and return lines, content, and total lines.
68
+
69
+ Args:
70
+ file_path (str): Path to the file to read
71
+ encoding (str, optional): File encoding. Defaults to "utf-8"
72
+
73
+ Returns:
74
+ Tuple[List[str], str, int]: Lines, content, and total line count
75
+
76
+ Raises:
77
+ FileNotFoundError: If file not found
78
+ UnicodeDecodeError: If file cannot be decoded with specified encoding
79
+ """
128
80
self ._validate_file_path (file_path )
129
- encoding = self ._detect_encoding (file_path )
130
81
try :
131
82
with open (file_path , "r" , encoding = encoding ) as f :
132
83
lines = f .readlines ()
133
84
file_content = "" .join (lines )
134
85
return lines , file_content , len (lines )
135
86
except FileNotFoundError as err :
136
87
raise FileNotFoundError (f"File not found: { file_path } " ) from err
88
+ except UnicodeDecodeError as err :
89
+ raise UnicodeDecodeError (
90
+ encoding ,
91
+ err .object ,
92
+ err .start ,
93
+ err .end ,
94
+ f"Failed to decode file '{ file_path } ' with { encoding } encoding" ,
95
+ ) from err
137
96
138
97
async def read_multiple_ranges (
139
- self , ranges : List [FileRanges ]
98
+ self , ranges : List [FileRanges ], encoding : str = "utf-8"
140
99
) -> Dict [str , Dict [str , Any ]]:
141
100
result : Dict [str , Dict [str , Any ]] = {}
142
101
143
102
for file_range in ranges :
144
103
file_path = file_range ["file_path" ]
145
- lines , file_content , total_lines = await self ._read_file (file_path )
104
+ lines , file_content , total_lines = await self ._read_file (
105
+ file_path , encoding = encoding
106
+ )
146
107
file_hash = self .calculate_hash (file_content )
147
108
result [file_path ] = {"ranges" : [], "file_hash" : file_hash }
148
109
@@ -187,9 +148,15 @@ async def read_multiple_ranges(
187
148
return result
188
149
189
150
async def read_file_contents (
190
- self , file_path : str , line_start : int = 1 , line_end : Optional [int ] = None
151
+ self ,
152
+ file_path : str ,
153
+ line_start : int = 1 ,
154
+ line_end : Optional [int ] = None ,
155
+ encoding : str = "utf-8" ,
191
156
) -> Tuple [str , int , int , str , int , int ]:
192
- lines , file_content , total_lines = await self ._read_file (file_path )
157
+ lines , file_content , total_lines = await self ._read_file (
158
+ file_path , encoding = encoding
159
+ )
193
160
line_start = max (1 , line_start ) - 1
194
161
line_end = total_lines if line_end is None else min (line_end , total_lines )
195
162
@@ -203,7 +170,7 @@ async def read_file_contents(
203
170
selected_lines = lines [line_start :line_end ]
204
171
content = "" .join (selected_lines )
205
172
content_hash = self .calculate_hash (content )
206
- content_size = len (content .encode (self . _detect_encoding ( file_path ) ))
173
+ content_size = len (content .encode (encoding ))
207
174
208
175
return (
209
176
content ,
@@ -215,7 +182,11 @@ async def read_file_contents(
215
182
)
216
183
217
184
async def edit_file_contents (
218
- self , file_path : str , expected_hash : str , patches : List [Dict [str , Any ]]
185
+ self ,
186
+ file_path : str ,
187
+ expected_hash : str ,
188
+ patches : List [Dict [str , Any ]],
189
+ encoding : str = "utf-8" ,
219
190
) -> Dict [str , Any ]:
220
191
"""
221
192
Edit file contents with hash-based conflict detection and multiple patches.
@@ -294,7 +265,7 @@ async def edit_file_contents(
294
265
else :
295
266
# Read current file content and verify hash
296
267
current_content , _ , _ , current_hash , total_lines , _ = (
297
- await self .read_file_contents (file_path )
268
+ await self .read_file_contents (file_path , encoding = encoding )
298
269
)
299
270
300
271
if current_hash != expected_hash :
@@ -400,11 +371,6 @@ async def edit_file_contents(
400
371
401
372
# Write the final content back to file
402
373
final_content = "" .join (lines )
403
- encoding = (
404
- "utf-8"
405
- if not os .path .exists (file_path )
406
- else self ._detect_encoding (file_path )
407
- )
408
374
with open (file_path , "w" , encoding = encoding ) as f :
409
375
f .write (final_content )
410
376
0 commit comments