From 763c1a91ea474d23f74eb1f1faf4df1c802efd2e Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 13:24:37 +0200 Subject: [PATCH 01/17] naive fist fix --- src/lightning/fabric/loggers/csv_logs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 4e47707444ea4..d5da3696b0e9b 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -18,6 +18,7 @@ from argparse import Namespace from typing import Any, Dict, List, Optional, Set, Union +from fsspec.implementations import local from torch import Tensor from lightning.fabric.loggers.logger import Logger, rank_zero_experiment @@ -228,6 +229,11 @@ def save(self) -> None: # we need to re-write the file if the keys (header) change self._rewrite_with_new_header(self.metrics_keys) + if not isinstance(self._fs, local.LocalFileSystem): + file_exists = False + with self._fs.open(self.metrics_file_path, "r", newline="") as file: + self.metrics = list(csv.DictReader(file)) + self.metrics + with self._fs.open(self.metrics_file_path, mode=("a" if file_exists else "w"), newline="") as file: writer = csv.DictWriter(file, fieldnames=self.metrics_keys) if not file_exists: From d506b4382e0ec75390757eb0571fa5c3ae457454 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 14:17:41 +0200 Subject: [PATCH 02/17] refactor solution --- src/lightning/fabric/loggers/csv_logs.py | 28 ++++++++++-------------- tests/tests_fabric/loggers/test_csv.py | 18 --------------- 2 files changed, 11 insertions(+), 35 deletions(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index d5da3696b0e9b..0a1045e18c415 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -224,20 +224,14 @@ def save(self) -> None: new_keys = self._record_new_keys() file_exists = self._fs.isfile(self.metrics_file_path) + rewrite_file = not isinstance(self._fs, local.LocalFileSystem) or new_keys - if new_keys and file_exists: - # we need to re-write the file if the keys (header) change - self._rewrite_with_new_header(self.metrics_keys) + if rewrite_file and file_exists: + self._append_recorded_metrics() - if not isinstance(self._fs, local.LocalFileSystem): - file_exists = False - with self._fs.open(self.metrics_file_path, "r", newline="") as file: - self.metrics = list(csv.DictReader(file)) + self.metrics - - with self._fs.open(self.metrics_file_path, mode=("a" if file_exists else "w"), newline="") as file: + with self._fs.open(self.metrics_file_path, "a" if not rewrite_file else "w", newline="") as file: writer = csv.DictWriter(file, fieldnames=self.metrics_keys) - if not file_exists: - # only write the header if we're writing a fresh file + if rewrite_file: writer.writeheader() writer.writerows(self.metrics) @@ -250,11 +244,11 @@ def _record_new_keys(self) -> Set[str]: self.metrics_keys.extend(new_keys) return new_keys - def _rewrite_with_new_header(self, fieldnames: List[str]) -> None: + def _append_recorded_metrics(self) -> None: + metrics = self._fetch_recorded_metrics() + self.metrics = metrics + self.metrics + + def _fetch_recorded_metrics(self) -> List[str]: with self._fs.open(self.metrics_file_path, "r", newline="") as file: metrics = list(csv.DictReader(file)) - - with self._fs.open(self.metrics_file_path, "w", newline="") as file: - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(metrics) + return metrics diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 67842cdbd8dd6..975cf0519d8dd 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -162,21 +162,3 @@ def test_append_columns(tmp_path): with open(logger.experiment.metrics_file_path) as file: header = file.readline().strip() assert set(header.split(",")) == {"step", "a", "b", "c"} - - -def test_rewrite_with_new_header(tmp_path): - # write a csv file manually - with open(tmp_path / "metrics.csv", "w") as file: - file.write("step,metric1,metric2\n") - file.write("0,1,22\n") - - writer = _ExperimentWriter(log_dir=str(tmp_path)) - new_columns = ["step", "metric1", "metric2", "metric3"] - writer._rewrite_with_new_header(new_columns) - - # the rewritten file should have the new columns - with open(tmp_path / "metrics.csv") as file: - header = file.readline().strip().split(",") - assert header == new_columns - logs = file.readline().strip().split(",") - assert logs == ["0", "1", "22", ""] From 5ddb16ba0a5bf0296c7ff87913ed0d9d3f988fd8 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 14:24:43 +0200 Subject: [PATCH 03/17] add docs --- src/lightning/fabric/loggers/csv_logs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 0a1045e18c415..24353c4e4988f 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -245,10 +245,11 @@ def _record_new_keys(self) -> Set[str]: return new_keys def _append_recorded_metrics(self) -> None: + """Appends the previous recorded metrics to the current ``self.metrics``.""" metrics = self._fetch_recorded_metrics() self.metrics = metrics + self.metrics def _fetch_recorded_metrics(self) -> List[str]: + """Fetches the previous recorded metrics.""" with self._fs.open(self.metrics_file_path, "r", newline="") as file: - metrics = list(csv.DictReader(file)) - return metrics + return list(csv.DictReader(file)) From 48404318f50956f4795b77ea6db2292c28e01011 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 14:56:49 +0200 Subject: [PATCH 04/17] add more tests --- tests/tests_fabric/loggers/test_csv.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 975cf0519d8dd..61ce1d00fc5fe 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -11,11 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import csv import os +from typing import Dict, List, Set, Tuple from unittest.mock import MagicMock import pytest import torch + from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter @@ -152,13 +155,24 @@ def test_append_columns(tmp_path): logger.log_metrics({"a": 1, "b": 2}) # new key appears - logger.log_metrics({"a": 1, "b": 2, "c": 3}) - with open(logger.experiment.metrics_file_path) as file: - header = file.readline().strip() - assert set(header.split(",")) == {"step", "a", "b", "c"} + logger.log_metrics({"a": 11, "b": 22, "c": 33}) + + headers, content = _read_csv(logger.experiment.metrics_file_path) + assert headers == {"step", "a", "b", "c"} + assert content[0] == {"step": "0", "a": "1", "b": "2", "c": ""} + assert content[1] == {"step": "0", "a": "11", "b": "22", "c": "33"} # key disappears logger.log_metrics({"a": 1, "c": 3}) with open(logger.experiment.metrics_file_path) as file: header = file.readline().strip() assert set(header.split(",")) == {"step", "a", "b", "c"} + + +def _read_csv(path: str) -> Tuple[Set[str], List[Dict[str, str]]]: + """Reads a local csv file and returns the headers and content.""" + with open(path) as file: + reader = csv.DictReader(file) + headers = set(reader.fieldnames) + content = list(reader) + return headers, content From 0f46c826f55167acc22c3e10065bbe70497eba89 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 13:00:28 +0000 Subject: [PATCH 05/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/tests_fabric/loggers/test_csv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 61ce1d00fc5fe..451021b254eb1 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -18,7 +18,6 @@ import pytest import torch - from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter From b12ea6c5b531c0b1d257cb5331724b5431c129c8 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 15:07:43 +0200 Subject: [PATCH 06/17] add more tests --- tests/tests_fabric/loggers/test_csv.py | 45 ++++++++++++++++++++------ 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 61ce1d00fc5fe..cf224d95ee3a8 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import csv +import itertools import os from typing import Dict, List, Set, Tuple from unittest.mock import MagicMock @@ -153,20 +154,46 @@ def test_append_columns(tmp_path): # initial metrics logger.log_metrics({"a": 1, "b": 2}) + _assert_csv_content( + logger.experiment.metrics_file_path, + expected_headers={"step", "a", "b"}, + expected_content=[{"step": "0", "a": "1", "b": "2"}], + ) # new key appears logger.log_metrics({"a": 11, "b": 22, "c": 33}) - - headers, content = _read_csv(logger.experiment.metrics_file_path) - assert headers == {"step", "a", "b", "c"} - assert content[0] == {"step": "0", "a": "1", "b": "2", "c": ""} - assert content[1] == {"step": "0", "a": "11", "b": "22", "c": "33"} + _assert_csv_content( + logger.experiment.metrics_file_path, + expected_headers={"step", "a", "b", "c"}, + expected_content=[ + {"step": "0", "a": "1", "b": "2", "c": ""}, + {"step": "0", "a": "11", "b": "22", "c": "33"}, + ], + ) # key disappears - logger.log_metrics({"a": 1, "c": 3}) - with open(logger.experiment.metrics_file_path) as file: - header = file.readline().strip() - assert set(header.split(",")) == {"step", "a", "b", "c"} + logger.log_metrics({"a": 111, "c": 333}) + _assert_csv_content( + logger.experiment.metrics_file_path, + expected_headers={"step", "a", "b", "c"}, + expected_content=[ + {"step": "0", "a": "1", "b": "2", "c": ""}, + {"step": "0", "a": "11", "b": "22", "c": "33"}, + {"step": "0", "a": "111", "b": "", "c": "333"}, + ], + ) + + +def _assert_csv_content( + path: str, + expected_headers: Set[str], + expected_content: List[Dict[str, str]], +) -> None: + """Verifies the content of a local csv file with the expected ones.""" + headers, content = _read_csv(path) + assert headers == expected_headers + for actual, expected in itertools.zip_longest(content, expected_content): + assert actual == expected def _read_csv(path: str) -> Tuple[Set[str], List[Dict[str, str]]]: From 068a37f940e612e34a70c3ed84285d97db684f73 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 15:08:32 +0200 Subject: [PATCH 07/17] fmt fix --- tests/tests_fabric/loggers/test_csv.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 33889477daf2a..cf224d95ee3a8 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -19,6 +19,7 @@ import pytest import torch + from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter From 435ec6a0fc9596924e5d0cef1720dccecd30790e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 13:10:30 +0000 Subject: [PATCH 08/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/tests_fabric/loggers/test_csv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index cf224d95ee3a8..33889477daf2a 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -19,7 +19,6 @@ import pytest import torch - from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter From 946dc8f0942c35f106fff6d6309a0bcef9f583e2 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 15:21:36 +0200 Subject: [PATCH 09/17] fix typing --- src/lightning/fabric/loggers/csv_logs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 24353c4e4988f..2a422709b1b10 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -16,7 +16,7 @@ import logging import os from argparse import Namespace -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Set, Union, Iterable from fsspec.implementations import local from torch import Tensor @@ -249,7 +249,7 @@ def _append_recorded_metrics(self) -> None: metrics = self._fetch_recorded_metrics() self.metrics = metrics + self.metrics - def _fetch_recorded_metrics(self) -> List[str]: + def _fetch_recorded_metrics(self) -> List[Dict[str, Any]]: """Fetches the previous recorded metrics.""" with self._fs.open(self.metrics_file_path, "r", newline="") as file: return list(csv.DictReader(file)) From 93079d86b322369daca41b677cd1ffa9c5631308 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Wed, 25 Oct 2023 15:22:37 +0200 Subject: [PATCH 10/17] fix typing --- tests/tests_fabric/loggers/test_csv.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 33889477daf2a..cf224d95ee3a8 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -19,6 +19,7 @@ import pytest import torch + from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter From 4dedd0071041ea6c0102d951fd9a35515f766984 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 13:23:56 +0000 Subject: [PATCH 11/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning/fabric/loggers/csv_logs.py | 2 +- tests/tests_fabric/loggers/test_csv.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 2a422709b1b10..03e5a5f166156 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -16,7 +16,7 @@ import logging import os from argparse import Namespace -from typing import Any, Dict, List, Optional, Set, Union, Iterable +from typing import Any, Dict, List, Optional, Set, Union from fsspec.implementations import local from torch import Tensor diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index cf224d95ee3a8..33889477daf2a 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -19,7 +19,6 @@ import pytest import torch - from lightning.fabric.loggers import CSVLogger from lightning.fabric.loggers.csv_logs import _ExperimentWriter From 8f2fb4a3073e0043ee01671c7f181fe2acaf0f50 Mon Sep 17 00:00:00 2001 From: ioangatop Date: Thu, 26 Oct 2023 10:01:04 +0200 Subject: [PATCH 12/17] add named arg --- src/lightning/fabric/loggers/csv_logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 03e5a5f166156..6d3e950ded8cf 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -229,7 +229,7 @@ def save(self) -> None: if rewrite_file and file_exists: self._append_recorded_metrics() - with self._fs.open(self.metrics_file_path, "a" if not rewrite_file else "w", newline="") as file: + with self._fs.open(self.metrics_file_path, mode=("a" if not rewrite_file else "w"), newline="") as file: writer = csv.DictWriter(file, fieldnames=self.metrics_keys) if rewrite_file: writer.writeheader() From 7e0ae4e598934ff71bf52d7fffdc7e95b0aa3016 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Apr 2025 08:49:30 +0000 Subject: [PATCH 13/17] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/tests_fabric/loggers/test_csv.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index e8fe8f2a22420..944b7c8340f11 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -14,7 +14,6 @@ import csv import itertools import os -from typing import Dict, List, Set, Tuple from unittest import mock from unittest.mock import MagicMock @@ -209,8 +208,8 @@ def test_append_columns(tmp_path): def _assert_csv_content( path: str, - expected_headers: Set[str], - expected_content: List[Dict[str, str]], + expected_headers: set[str], + expected_content: list[dict[str, str]], ) -> None: """Verifies the content of a local csv file with the expected ones.""" headers, content = _read_csv(path) @@ -219,7 +218,7 @@ def _assert_csv_content( assert actual == expected -def _read_csv(path: str) -> Tuple[Set[str], List[Dict[str, str]]]: +def _read_csv(path: str) -> tuple[set[str], list[dict[str, str]]]: """Reads a local csv file and returns the headers and content.""" with open(path) as file: reader = csv.DictReader(file) From 9588ef893b74a2ee6ce9441d159b8de81f092952 Mon Sep 17 00:00:00 2001 From: Jirka B Date: Mon, 14 Apr 2025 11:28:35 +0200 Subject: [PATCH 14/17] typing --- src/lightning/fabric/loggers/csv_logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index 9d78405e855e5..b2e5912677f58 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -259,7 +259,7 @@ def _append_recorded_metrics(self) -> None: metrics = self._fetch_recorded_metrics() self.metrics = metrics + self.metrics - def _fetch_recorded_metrics(self) -> List[Dict[str, Any]]: + def _fetch_recorded_metrics(self) -> list[dict[str, Any]]: """Fetches the previous recorded metrics.""" with self._fs.open(self.metrics_file_path, "r", newline="") as file: return list(csv.DictReader(file)) From d85dbe413bb5af5cddc19457d5f4a6d8e51ad9ed Mon Sep 17 00:00:00 2001 From: Bhimraj Yadav Date: Thu, 4 Sep 2025 09:25:44 +0000 Subject: [PATCH 15/17] refactor: enhance CSVLogger's metric writing logic for local and remote filesystem support --- src/lightning/fabric/loggers/csv_logs.py | 59 +++++++++++++++--------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/src/lightning/fabric/loggers/csv_logs.py b/src/lightning/fabric/loggers/csv_logs.py index b2e5912677f58..efa0221158344 100644 --- a/src/lightning/fabric/loggers/csv_logs.py +++ b/src/lightning/fabric/loggers/csv_logs.py @@ -208,6 +208,8 @@ def __init__(self, log_dir: str) -> None: self.log_dir = log_dir self.metrics_file_path = os.path.join(self.log_dir, self.NAME_METRICS_FILE) + self._is_local_fs = isinstance(self._fs, local.LocalFileSystem) + self._check_log_dir_exists() self._fs.makedirs(self.log_dir, exist_ok=True) @@ -231,38 +233,53 @@ def save(self) -> None: if not self.metrics: return + # Update column list with any new metrics keys new_keys = self._record_new_keys() - file_exists = self._fs.isfile(self.metrics_file_path) - rewrite_file = not isinstance(self._fs, local.LocalFileSystem) or new_keys - - if rewrite_file and file_exists: - self._append_recorded_metrics() - with self._fs.open(self.metrics_file_path, mode=("a" if not rewrite_file else "w"), newline="") as file: - writer = csv.DictWriter(file, fieldnames=self.metrics_keys) - if rewrite_file: - writer.writeheader() - writer.writerows(self.metrics) + file_exists = self._fs.isfile(self.metrics_file_path) - self.metrics = [] # reset + # Decision logic: when can we safely append? + # 1. Must be local filesystem (remote FS don't support append) + # 2. File must already exist + # 3. No new columns (otherwise CSV header would be wrong) + can_append = self._is_local_fs and file_exists and not new_keys + + if can_append: + # Safe to append: local FS + existing file + same columns + self._write_metrics(self.metrics, mode="a", write_header=False) + else: + # Need to rewrite: new file OR remote FS OR new columns + all_metrics = self.metrics + if file_exists: + # Include existing data when rewriting + all_metrics = self._read_existing_metrics() + self.metrics + self._write_metrics(all_metrics, mode="w", write_header=True) + + self.metrics = [] def _record_new_keys(self) -> set[str]: - """Records new keys that have not been logged before.""" + """Identifies and records any new metric keys that have not been previously logged.""" current_keys = set().union(*self.metrics) new_keys = current_keys - set(self.metrics_keys) self.metrics_keys.extend(new_keys) self.metrics_keys.sort() return new_keys - def _append_recorded_metrics(self) -> None: - """Appends the previous recorded metrics to the current ``self.metrics``.""" - metrics = self._fetch_recorded_metrics() - self.metrics = metrics + self.metrics - - def _fetch_recorded_metrics(self) -> list[dict[str, Any]]: - """Fetches the previous recorded metrics.""" - with self._fs.open(self.metrics_file_path, "r", newline="") as file: - return list(csv.DictReader(file)) + def _read_existing_metrics(self) -> list[dict[str, Any]]: + """Read all existing metrics from the CSV file.""" + try: + with self._fs.open(self.metrics_file_path, "r", newline="") as file: + return list(csv.DictReader(file)) + except (FileNotFoundError, OSError): + return [] + + def _write_metrics(self, metrics: list[dict[str, Any]], mode: str, write_header: bool) -> None: + """Write metrics to CSV file with the specified mode and header option.""" + with self._fs.open(self.metrics_file_path, mode=mode, newline="") as file: + writer = csv.DictWriter(file, fieldnames=self.metrics_keys) + if write_header: + writer.writeheader() + writer.writerows(metrics) def _check_log_dir_exists(self) -> None: if self._fs.exists(self.log_dir) and self._fs.listdir(self.log_dir): From e5f784981fff16dc0b3d83a99831a3f1e2d22ef6 Mon Sep 17 00:00:00 2001 From: Bhimraj Yadav Date: Thu, 4 Sep 2025 09:27:04 +0000 Subject: [PATCH 16/17] test: enhance CSVLogger tests for column handling and remote filesystem behavior --- tests/tests_fabric/loggers/test_csv.py | 166 ++++++++++++++++++------- 1 file changed, 118 insertions(+), 48 deletions(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index 944b7c8340f11..cfde95ba4e097 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import csv -import itertools import os from unittest import mock from unittest.mock import MagicMock @@ -172,56 +170,128 @@ def test_append_metrics_file(_, tmp_path): def test_append_columns(tmp_path): """Test that the CSV file gets rewritten with new headers if the columns change.""" - logger = CSVLogger(tmp_path, flush_logs_every_n_steps=1) + logger = CSVLogger(tmp_path, flush_logs_every_n_steps=2) # initial metrics logger.log_metrics({"a": 1, "b": 2}) - _assert_csv_content( - logger.experiment.metrics_file_path, - expected_headers={"step", "a", "b"}, - expected_content=[{"step": "0", "a": "1", "b": "2"}], - ) # new key appears - logger.log_metrics({"a": 11, "b": 22, "c": 33}) - _assert_csv_content( - logger.experiment.metrics_file_path, - expected_headers={"step", "a", "b", "c"}, - expected_content=[ - {"step": "0", "a": "1", "b": "2", "c": ""}, - {"step": "0", "a": "11", "b": "22", "c": "33"}, - ], - ) + logger.log_metrics({"a": 1, "b": 2, "c": 3}) + with open(logger.experiment.metrics_file_path) as file: + lines = file.readlines() + header = lines[0].strip() + assert header.split(",") == ["a", "b", "c", "step"] + assert len(lines) == 3 # header + 2 data rows # key disappears - logger.log_metrics({"a": 111, "c": 333}) - _assert_csv_content( - logger.experiment.metrics_file_path, - expected_headers={"step", "a", "b", "c"}, - expected_content=[ - {"step": "0", "a": "1", "b": "2", "c": ""}, - {"step": "0", "a": "11", "b": "22", "c": "33"}, - {"step": "0", "a": "111", "b": "", "c": "333"}, - ], - ) - - -def _assert_csv_content( - path: str, - expected_headers: set[str], - expected_content: list[dict[str, str]], -) -> None: - """Verifies the content of a local csv file with the expected ones.""" - headers, content = _read_csv(path) - assert headers == expected_headers - for actual, expected in itertools.zip_longest(content, expected_content): - assert actual == expected - - -def _read_csv(path: str) -> tuple[set[str], list[dict[str, str]]]: - """Reads a local csv file and returns the headers and content.""" - with open(path) as file: - reader = csv.DictReader(file) - headers = set(reader.fieldnames) - content = list(reader) - return headers, content + logger.log_metrics({"a": 1, "c": 3}) + logger.save() + with open(logger.experiment.metrics_file_path) as file: + lines = file.readlines() + header = lines[0].strip() + assert header.split(",") == ["a", "b", "c", "step"] + assert len(lines) == 4 # header + 3 data rows + + +@mock.patch( + # Mock the existence check, so we can simulate appending to the metrics file + "lightning.fabric.loggers.csv_logs._ExperimentWriter._check_log_dir_exists" +) +def test_rewrite_with_new_header(_, tmp_path): + """Test that existing files get rewritten correctly when new columns are added.""" + # write a csv file manually to simulate existing data + csv_path = tmp_path / "metrics.csv" + with open(csv_path, "w") as file: + file.write("a,b,step\n") + file.write("1,2,0\n") + + writer = _ExperimentWriter(log_dir=str(tmp_path)) + + # Add metrics with a new column + writer.log_metrics({"a": 2, "b": 3, "c": 4}, step=1) + writer.save() + # The rewritten file should have the new columns and preserve old data + with open(csv_path) as file: + lines = file.readlines() + assert len(lines) == 3 # header + 2 data rows + header = lines[0].strip() + assert header.split(",") == ["a", "b", "c", "step"] + # verify old data is preserved + assert lines[1].strip().split(",") == ["1", "2", "", "0"] # old row with empty new column + assert lines[2].strip().split(",") == ["2", "3", "4", "1"] + + +def test_log_metrics_column_order_sorted(tmp_path): + """Test that the columns in the output metrics file are sorted by name.""" + logger = CSVLogger(tmp_path) + logger.log_metrics({"c": 0.1}) + logger.log_metrics({"c": 0.2}) + logger.log_metrics({"b": 0.3}) + logger.log_metrics({"a": 0.4}) + logger.save() + logger.log_metrics({"d": 0.5}) + logger.save() + + with open(logger.experiment.metrics_file_path) as fp: + lines = fp.readlines() + + assert lines[0].strip() == "a,b,c,d,step" + + +@mock.patch("lightning.fabric.loggers.csv_logs.get_filesystem") +@mock.patch("lightning.fabric.loggers.csv_logs._ExperimentWriter._read_existing_metrics") +def test_remote_filesystem_uses_write_mode(mock_read_existing, mock_get_fs, tmp_path): + """Test that remote filesystems use write mode.""" + mock_fs = MagicMock() + mock_fs.isfile.return_value = False # File doesn't exist + mock_fs.makedirs = MagicMock() + mock_get_fs.return_value = mock_fs + + logger = CSVLogger(tmp_path) + assert not logger.experiment._is_local_fs + + logger.log_metrics({"a": 0.3}, step=1) + logger.save() + + # Verify _read_existing_metrics was NOT called (file doesn't exist) + mock_read_existing.assert_not_called() + + # Verify write mode was used (remote FS should never use append) + mock_fs.open.assert_called() + call_args = mock_fs.open.call_args_list[-1] # Get the last call + + # Extract the mode parameter specifically + args, kwargs = call_args + mode = kwargs.get("mode", "r") # Default to 'r' if mode not specified + assert mode == "w", f"Expected write mode 'w', but got mode: '{mode}'" + + +@mock.patch("lightning.fabric.loggers.csv_logs.get_filesystem") +@mock.patch("lightning.fabric.loggers.csv_logs._ExperimentWriter._read_existing_metrics") +def test_remote_filesystem_preserves_existing_data(mock_read_existing, mock_get_fs, tmp_path): + """Test that remote filesystem reads existing data and preserves it when rewriting.""" + # Mock remote filesystem with existing file + mock_fs = MagicMock() + mock_fs.isfile.return_value = True + mock_fs.makedirs = MagicMock() + mock_get_fs.return_value = mock_fs + + # Mock existing data + mock_read_existing.return_value = [{"a": 0.1, "step": 0}, {"a": 0.2, "step": 1}] + + logger = CSVLogger(tmp_path) + assert not logger.experiment._is_local_fs + + # Add new metrics - should read existing and combine + logger.log_metrics({"a": 0.3}, step=2) + logger.save() + + # Verify that _read_existing_metrics was called (should read existing data) + mock_read_existing.assert_called_once() + + # Verify write mode was used + mock_fs.open.assert_called() + last_call = mock_fs.open.call_args_list[-1] + args, kwargs = last_call + mode = kwargs.get("mode", "r") + assert mode == "w", f"Expected write mode 'w', but got mode: '{mode}'" From 86f6a6e222a2e3f9ee1cc98be72bd1d71a2dfa66 Mon Sep 17 00:00:00 2001 From: Bhimraj Yadav Date: Thu, 4 Sep 2025 09:29:48 +0000 Subject: [PATCH 17/17] revert flush_logs_every_n_steps value --- tests/tests_fabric/loggers/test_csv.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_fabric/loggers/test_csv.py b/tests/tests_fabric/loggers/test_csv.py index cfde95ba4e097..832d0257d71f2 100644 --- a/tests/tests_fabric/loggers/test_csv.py +++ b/tests/tests_fabric/loggers/test_csv.py @@ -170,7 +170,7 @@ def test_append_metrics_file(_, tmp_path): def test_append_columns(tmp_path): """Test that the CSV file gets rewritten with new headers if the columns change.""" - logger = CSVLogger(tmp_path, flush_logs_every_n_steps=2) + logger = CSVLogger(tmp_path, flush_logs_every_n_steps=1) # initial metrics logger.log_metrics({"a": 1, "b": 2})