@@ -123,158 +123,96 @@ def calculate_hash(content: str) -> str:
123
123
"""
124
124
return hashlib .sha256 (content .encode ()).hexdigest ()
125
125
126
+ async def _read_file (self , file_path : str ) -> Tuple [List [str ], str , int ]:
127
+ """Read file and return lines, content, and total lines."""
128
+ self ._validate_file_path (file_path )
129
+ encoding = self ._detect_encoding (file_path )
130
+ try :
131
+ with open (file_path , "r" , encoding = encoding ) as f :
132
+ lines = f .readlines ()
133
+ file_content = "" .join (lines )
134
+ return lines , file_content , len (lines )
135
+ except FileNotFoundError as err :
136
+ raise FileNotFoundError (f"File not found: { file_path } " ) from err
137
+
126
138
async def read_multiple_ranges (
127
139
self , ranges : List [FileRanges ]
128
140
) -> Dict [str , Dict [str , Any ]]:
129
- """
130
- Read multiple line ranges from multiple files.
131
-
132
- Args:
133
- ranges (List[FileRanges]): List of files and their line ranges to read
134
-
135
- Returns:
136
- Dict[str, Dict[str, Any]]: Dictionary with file paths as keys and
137
- file information as values. Each value includes:
138
- - file_hash: str (Hash of the entire file)
139
- - ranges: List of range information, each containing:
140
- - content: str
141
- - start_line: int
142
- - end_line: int
143
- - range_hash: str
144
- - total_lines: int
145
- - content_size: int
146
-
147
- Raises:
148
- ValueError: If file paths or line numbers are invalid
149
- FileNotFoundError: If any file does not exist
150
- IOError: If any file cannot be read
151
- """
152
141
result : Dict [str , Dict [str , Any ]] = {}
153
142
154
143
for file_range in ranges :
155
144
file_path = file_range ["file_path" ]
156
- self ._validate_file_path (file_path )
157
- result [file_path ] = {"ranges" : [], "file_hash" : "" }
158
-
159
- try :
160
- # Detect the file encoding before reading
161
- encoding = self ._detect_encoding (file_path )
162
-
163
- with open (file_path , "r" , encoding = encoding ) as f :
164
- lines = f .readlines ()
165
- total_lines = len (lines )
166
- file_content = "" .join (lines )
167
- file_hash = self .calculate_hash (file_content )
168
- result [file_path ]["file_hash" ] = file_hash
169
-
170
- for range_spec in file_range ["ranges" ]:
171
- # Adjust line numbers to 0-based index
172
- line_start = max (1 , range_spec ["start" ]) - 1
173
- end_value = range_spec .get ("end" )
174
- line_end = (
175
- min (total_lines , end_value )
176
- if end_value is not None
177
- else total_lines
178
- )
179
- line_end = (
180
- total_lines
181
- if end_value is None
182
- else min (end_value , total_lines )
183
- )
184
-
185
- if line_start >= total_lines :
186
- # Return empty content for out of bounds start line
187
- empty_content = ""
188
- result [file_path ]["ranges" ].append (
189
- {
190
- "content" : empty_content ,
191
- "start_line" : line_start + 1 ,
192
- "end_line" : line_start + 1 ,
193
- "range_hash" : self .calculate_hash (empty_content ),
194
- "total_lines" : total_lines ,
195
- "content_size" : 0 ,
196
- }
197
- )
198
- continue
199
-
200
- selected_lines = lines [line_start :line_end ]
201
- content = "" .join (selected_lines )
202
- range_hash = self .calculate_hash (content )
145
+ lines , file_content , total_lines = await self ._read_file (file_path )
146
+ file_hash = self .calculate_hash (file_content )
147
+ result [file_path ] = {"ranges" : [], "file_hash" : file_hash }
148
+
149
+ for range_spec in file_range ["ranges" ]:
150
+ line_start = max (1 , range_spec ["start" ]) - 1
151
+ end_value = range_spec .get ("end" )
152
+ line_end = (
153
+ min (total_lines , end_value )
154
+ if end_value is not None
155
+ else total_lines
156
+ )
203
157
158
+ if line_start >= total_lines :
159
+ empty_content = ""
204
160
result [file_path ]["ranges" ].append (
205
161
{
206
- "content" : content ,
162
+ "content" : empty_content ,
207
163
"start_line" : line_start + 1 ,
208
- "end_line" : line_end ,
209
- "range_hash" : range_hash ,
164
+ "end_line" : line_start + 1 ,
165
+ "range_hash" : self . calculate_hash ( empty_content ) ,
210
166
"total_lines" : total_lines ,
211
- "content_size" : len ( content ) ,
167
+ "content_size" : 0 ,
212
168
}
213
169
)
214
- except FileNotFoundError as e :
215
- raise FileNotFoundError (f"File not found: { file_path } " ) from e
216
- except IOError as e :
217
- raise IOError (f"Error reading file: { str (e )} " ) from e
170
+ continue
171
+
172
+ selected_lines = lines [line_start :line_end ]
173
+ content = "" .join (selected_lines )
174
+ range_hash = self .calculate_hash (content )
175
+
176
+ result [file_path ]["ranges" ].append (
177
+ {
178
+ "content" : content ,
179
+ "start_line" : line_start + 1 ,
180
+ "end_line" : line_end ,
181
+ "range_hash" : range_hash ,
182
+ "total_lines" : total_lines ,
183
+ "content_size" : len (content ),
184
+ }
185
+ )
218
186
219
187
return result
220
188
221
189
async def read_file_contents (
222
190
self , file_path : str , line_start : int = 1 , line_end : Optional [int ] = None
223
191
) -> Tuple [str , int , int , str , int , int ]:
224
- """
225
- Read file contents within specified line range.
226
-
227
- Args:
228
- file_path (str): Path to the file
229
- line_start (int): Starting line number (1-based)
230
- line_end (Optional[int]): Ending line number (inclusive)
231
-
232
- Returns:
233
- Tuple[str, int, int, str, int, int]: (contents, start_line, end_line, hash, file_lines, file_size)
234
-
235
- Raises:
236
- ValueError: If file path or line numbers are invalid
237
- FileNotFoundError: If file does not exist
238
- IOError: If file cannot be read
239
- """
240
- self ._validate_file_path (file_path )
241
-
242
- try :
243
- # Detect the file encoding before reading
244
- encoding = self ._detect_encoding (file_path )
245
-
246
- with open (file_path , "r" , encoding = encoding ) as f :
247
- lines = f .readlines ()
248
- # Adjust line numbers to 0-based index
249
- line_start = max (1 , line_start ) - 1
250
- line_end = len (lines ) if line_end is None else min (line_end , len (lines ))
251
-
252
- if line_start >= len (lines ):
253
- empty_content = ""
254
- empty_hash = self .calculate_hash (empty_content )
255
- return empty_content , line_start , line_start , empty_hash , len (lines ), 0
256
- if line_end < line_start :
257
- raise ValueError ("End line must be greater than or equal to start line" )
258
- selected_lines = lines [line_start :line_end ]
259
- content = "" .join (selected_lines )
260
-
261
- # Calculate content hash and size
262
- content_hash = self .calculate_hash (content )
263
- content_size = len (content .encode (encoding ))
264
-
265
- return (
266
- content ,
267
- line_start + 1 ,
268
- line_end ,
269
- content_hash ,
270
- len (lines ),
271
- content_size ,
272
- )
273
-
274
- except FileNotFoundError as e :
275
- raise FileNotFoundError (f"File not found: { file_path } " ) from e
276
- except (IOError , UnicodeDecodeError ) as e :
277
- raise IOError (f"Error reading file: { str (e )} " ) from e
192
+ lines , file_content , total_lines = await self ._read_file (file_path )
193
+ line_start = max (1 , line_start ) - 1
194
+ line_end = total_lines if line_end is None else min (line_end , total_lines )
195
+
196
+ if line_start >= total_lines :
197
+ empty_content = ""
198
+ empty_hash = self .calculate_hash (empty_content )
199
+ return empty_content , line_start , line_start , empty_hash , total_lines , 0
200
+ if line_end < line_start :
201
+ raise ValueError ("End line must be greater than or equal to start line" )
202
+
203
+ selected_lines = lines [line_start :line_end ]
204
+ content = "" .join (selected_lines )
205
+ content_hash = self .calculate_hash (content )
206
+ content_size = len (content .encode (self ._detect_encoding (file_path )))
207
+
208
+ return (
209
+ content ,
210
+ line_start + 1 ,
211
+ line_end ,
212
+ content_hash ,
213
+ total_lines ,
214
+ content_size ,
215
+ )
278
216
279
217
async def edit_file_contents (
280
218
self , file_path : str , expected_hash : str , patches : List [Dict [str , Any ]]
0 commit comments