@@ -208,6 +208,8 @@ def __init__(self, log_dir: str) -> None:
208
208
self .log_dir = log_dir
209
209
self .metrics_file_path = os .path .join (self .log_dir , self .NAME_METRICS_FILE )
210
210
211
+ self ._is_local_fs = isinstance (self ._fs , local .LocalFileSystem )
212
+
211
213
self ._check_log_dir_exists ()
212
214
self ._fs .makedirs (self .log_dir , exist_ok = True )
213
215
@@ -231,38 +233,53 @@ def save(self) -> None:
231
233
if not self .metrics :
232
234
return
233
235
236
+ # Update column list with any new metrics keys
234
237
new_keys = self ._record_new_keys ()
235
- file_exists = self ._fs .isfile (self .metrics_file_path )
236
- rewrite_file = not isinstance (self ._fs , local .LocalFileSystem ) or new_keys
237
-
238
- if rewrite_file and file_exists :
239
- self ._append_recorded_metrics ()
240
238
241
- with self ._fs .open (self .metrics_file_path , mode = ("a" if not rewrite_file else "w" ), newline = "" ) as file :
242
- writer = csv .DictWriter (file , fieldnames = self .metrics_keys )
243
- if rewrite_file :
244
- writer .writeheader ()
245
- writer .writerows (self .metrics )
239
+ file_exists = self ._fs .isfile (self .metrics_file_path )
246
240
247
- self .metrics = [] # reset
241
+ # Decision logic: when can we safely append?
242
+ # 1. Must be local filesystem (remote FS don't support append)
243
+ # 2. File must already exist
244
+ # 3. No new columns (otherwise CSV header would be wrong)
245
+ can_append = self ._is_local_fs and file_exists and not new_keys
246
+
247
+ if can_append :
248
+ # Safe to append: local FS + existing file + same columns
249
+ self ._write_metrics (self .metrics , mode = "a" , write_header = False )
250
+ else :
251
+ # Need to rewrite: new file OR remote FS OR new columns
252
+ all_metrics = self .metrics
253
+ if file_exists :
254
+ # Include existing data when rewriting
255
+ all_metrics = self ._read_existing_metrics () + self .metrics
256
+ self ._write_metrics (all_metrics , mode = "w" , write_header = True )
257
+
258
+ self .metrics = []
248
259
249
260
def _record_new_keys (self ) -> set [str ]:
250
- """Records new keys that have not been logged before ."""
261
+ """Identifies and records any new metric keys that have not been previously logged ."""
251
262
current_keys = set ().union (* self .metrics )
252
263
new_keys = current_keys - set (self .metrics_keys )
253
264
self .metrics_keys .extend (new_keys )
254
265
self .metrics_keys .sort ()
255
266
return new_keys
256
267
257
- def _append_recorded_metrics (self ) -> None :
258
- """Appends the previous recorded metrics to the current ``self.metrics``."""
259
- metrics = self ._fetch_recorded_metrics ()
260
- self .metrics = metrics + self .metrics
261
-
262
- def _fetch_recorded_metrics (self ) -> list [dict [str , Any ]]:
263
- """Fetches the previous recorded metrics."""
264
- with self ._fs .open (self .metrics_file_path , "r" , newline = "" ) as file :
265
- return list (csv .DictReader (file ))
268
+ def _read_existing_metrics (self ) -> list [dict [str , Any ]]:
269
+ """Read all existing metrics from the CSV file."""
270
+ try :
271
+ with self ._fs .open (self .metrics_file_path , "r" , newline = "" ) as file :
272
+ return list (csv .DictReader (file ))
273
+ except (FileNotFoundError , OSError ):
274
+ return []
275
+
276
+ def _write_metrics (self , metrics : list [dict [str , Any ]], mode : str , write_header : bool ) -> None :
277
+ """Write metrics to CSV file with the specified mode and header option."""
278
+ with self ._fs .open (self .metrics_file_path , mode = mode , newline = "" ) as file :
279
+ writer = csv .DictWriter (file , fieldnames = self .metrics_keys )
280
+ if write_header :
281
+ writer .writeheader ()
282
+ writer .writerows (metrics )
266
283
267
284
def _check_log_dir_exists (self ) -> None :
268
285
if self ._fs .exists (self .log_dir ) and self ._fs .listdir (self .log_dir ):
0 commit comments