diff --git a/deepmd/pd/utils/dataloader.py b/deepmd/pd/utils/dataloader.py index 0cb8adbc63..2979ddb67c 100644 --- a/deepmd/pd/utils/dataloader.py +++ b/deepmd/pd/utils/dataloader.py @@ -13,7 +13,6 @@ Thread, ) -import h5py import numpy as np import paddle import paddle.distributed as dist @@ -90,9 +89,12 @@ def __init__( ): if seed is not None: setup_seed(seed) - if isinstance(systems, str): - with h5py.File(systems) as file: - systems = [os.path.join(systems, item) for item in file.keys()] + # Use process_systems to handle HDF5 expansion and other system processing + from deepmd.utils.data_system import ( + process_systems, + ) + + systems = process_systems(systems) self.systems: list[DeepmdDataSetForLoader] = [] if len(systems) >= 100: diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index c434341ab9..f766e12214 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import logging -import os from multiprocessing.dummy import ( Pool, ) @@ -10,7 +9,6 @@ Union, ) -import h5py import numpy as np import torch import torch.distributed as dist @@ -88,9 +86,12 @@ def __init__( ) -> None: if seed is not None: setup_seed(seed) - if isinstance(systems, str): - with h5py.File(systems) as file: - systems = [os.path.join(systems, item) for item in file.keys()] + # Use process_systems to handle HDF5 expansion and other system processing + from deepmd.utils.data_system import ( + process_systems, + ) + + systems = process_systems(systems) def construct_dataset(system: str) -> DeepmdDataSetForLoader: return DeepmdDataSetForLoader( diff --git a/deepmd/utils/data_system.py b/deepmd/utils/data_system.py index cf6e81aad1..17fa1f6650 100644 --- a/deepmd/utils/data_system.py +++ b/deepmd/utils/data_system.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import collections +import itertools import logging +import os import warnings from functools import ( cached_property, @@ -11,6 +13,7 @@ Union, ) +import h5py import numpy as np import deepmd.utils.random as dp_random @@ -784,12 +787,63 @@ def prob_sys_size_ext(keywords: str, nsystems: int, nbatch: int) -> list[float]: return sys_probs +def _process_single_system(system: str) -> list[str]: + """Process a single system string and return list of systems. + + Parameters + ---------- + system : str + A single system path + + Returns + ------- + list[str] + List of processed system paths + """ + # Check if this is an HDF5 file without explicit system specification + if _is_hdf5_file(system) and "#" not in system: + try: + with h5py.File(system, "r") as file: + # Check if this looks like a single system (has type.raw and set.* groups at root) + has_type_raw = "type.raw" in file + has_sets = any(key.startswith("set.") for key in file.keys()) + + if has_type_raw and has_sets: + # This is a single system HDF5 file, use standard HDF5 format + return [f"{system}#/"] + + # Look for system-like groups and expand them + expanded = [] + for key in file.keys(): + if isinstance(file[key], h5py.Group): + # Check if this group looks like a system + group = file[key] + group_has_type = "type.raw" in group + group_has_sets = any( + subkey.startswith("set.") for subkey in group.keys() + ) + if group_has_type and group_has_sets: + expanded.append(f"{system}#/{key}") + + # If we found system-like groups, return them; otherwise treat as regular system + return expanded if expanded else [system] + + except OSError as e: + log.warning(f"Could not read HDF5 file {system}: {e}") + # If we can't read as HDF5, treat as regular system + return [system] + else: + # Regular system or HDF5 with explicit system specification + return [system] + + def process_systems( systems: Union[str, list[str]], patterns: Optional[list[str]] = None ) -> list[str]: """Process the user-input systems. If it is a single directory, search for all the systems in the directory. + If it's a list, handle HDF5 files by expanding their internal systems. Check if the systems are valid. Parameters @@ -810,10 +864,98 @@ def process_systems( else: systems = rglob_sys_str(systems, patterns) elif isinstance(systems, list): - systems = systems.copy() + # Process each system individually and flatten results + systems = list( + itertools.chain.from_iterable( + _process_single_system(system) for system in systems + ) + ) return systems +def _is_hdf5_file(path: str) -> bool: + """Check if a path points to an HDF5 file. + + Parameters + ---------- + path : str + Path to check + + Returns + ------- + bool + True if the path is an HDF5 file + """ + # Extract the actual file path (before any # separator for HDF5 internal paths) + file_path = path.split("#")[0] + return os.path.isfile(file_path) and ( + file_path.endswith((".h5", ".hdf5")) or _is_hdf5_format(file_path) + ) + + +def _is_hdf5_multisystem(file_path: str) -> bool: + """Check if an HDF5 file contains multiple systems vs being a single system. + + Parameters + ---------- + file_path : str + Path to the HDF5 file + + Returns + ------- + bool + True if the file contains multiple systems, False if it's a single system + """ + try: + with h5py.File(file_path, "r") as f: + # Check if this looks like a single system (has type.raw and set.* groups) + has_type_raw = "type.raw" in f + has_sets = any(key.startswith("set.") for key in f.keys()) + + if has_type_raw and has_sets: + # This looks like a single system + return False + + # Check if it contains multiple groups that could be systems + system_groups = [] + for key in f.keys(): + if isinstance(f[key], h5py.Group): + group = f[key] + # Check if this group looks like a system (has type.raw and sets) + group_has_type = "type.raw" in group + group_has_sets = any( + subkey.startswith("set.") for subkey in group.keys() + ) + if group_has_type and group_has_sets: + system_groups.append(key) + + # If we found multiple system-like groups, it's a multisystem file + return len(system_groups) > 1 + + except OSError: + return False + + +def _is_hdf5_format(file_path: str) -> bool: + """Check if a file is in HDF5 format by trying to open it. + + Parameters + ---------- + file_path : str + Path to the file + + Returns + ------- + bool + True if the file is in HDF5 format + """ + try: + with h5py.File(file_path, "r"): + return True + except OSError: + return False + + def get_data( jdata: dict[str, Any], rcut: float, diff --git a/source/tests/common/test_process_systems.py b/source/tests/common/test_process_systems.py new file mode 100644 index 0000000000..8ae19e3cc5 --- /dev/null +++ b/source/tests/common/test_process_systems.py @@ -0,0 +1,338 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import os +import tempfile +import unittest + +import h5py + +from deepmd.utils.data_system import ( + _is_hdf5_file, + _is_hdf5_format, + _is_hdf5_multisystem, + process_systems, +) + + +class TestProcessSystems(unittest.TestCase): + def setUp(self) -> None: + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.addCleanup(self._cleanup) + + def _cleanup(self) -> None: + """Clean up test fixtures.""" + import shutil + + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_hdf5_file(self, filename: str, systems: list[str]) -> str: + """Create an HDF5 file with multiple systems.""" + h5_path = os.path.join(self.temp_dir, filename) + + with h5py.File(h5_path, "w") as f: + for sys_name in systems: + sys_group = f.create_group(sys_name) + + # Add required type information + sys_group.create_dataset("type.raw", data=[0, 1]) + sys_group.create_dataset("type_map.raw", data=[b"H", b"O"]) + + # Create a data set + set_group = sys_group.create_group("set.000") + + # Add minimal required data + natoms = 2 + nframes = 1 + + coords = [[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]]] + set_group.create_dataset("coord.npy", data=coords) + + boxes = [[[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]]] + set_group.create_dataset("box.npy", data=boxes) + + energies = [1.0] + set_group.create_dataset("energy.npy", data=energies) + + forces = [[[0.1, 0.1, 0.1], [0.2, 0.2, 0.2]]] + set_group.create_dataset("force.npy", data=forces) + + return h5_path + + def _create_regular_system(self, dirname: str) -> str: + """Create a regular directory-based system.""" + sys_path = os.path.join(self.temp_dir, dirname) + os.makedirs(sys_path, exist_ok=True) + + # Create type.raw to make it a valid system + with open(os.path.join(sys_path, "type.raw"), "w") as f: + f.write("0\n1\n") + + return sys_path + + def test_hdf5_list_expansion(self) -> None: + """Test that HDF5 files in lists are properly expanded.""" + # Create two HDF5 files with multiple systems each + h5_file1 = self._create_hdf5_file("multi1.h5", ["sys_a", "sys_b"]) + h5_file2 = self._create_hdf5_file("multi2.h5", ["water", "ice"]) + + # Process the list of HDF5 files + input_systems = [h5_file1, h5_file2] + result = process_systems(input_systems) + + # Should expand to 4 systems total + expected_systems = [ + f"{h5_file1}#/sys_a", + f"{h5_file1}#/sys_b", + f"{h5_file2}#/water", + f"{h5_file2}#/ice", + ] + + self.assertEqual(len(result), 4) + for expected in expected_systems: + self.assertIn(expected, result) + + def test_mixed_systems_list(self) -> None: + """Test mixed list with HDF5 files and regular directories.""" + h5_file = self._create_hdf5_file("mixed.h5", ["system1", "system2"]) + regular_dir = self._create_regular_system("regular_system") + + input_systems = [h5_file, regular_dir] + result = process_systems(input_systems) + + # Should expand HDF5 but keep regular directory as-is + expected_systems = [f"{h5_file}#/system1", f"{h5_file}#/system2", regular_dir] + + self.assertEqual(len(result), 3) + for expected in expected_systems: + self.assertIn(expected, result) + + def test_explicit_hdf5_system_preserved(self) -> None: + """Test that explicitly specified HDF5 systems are not expanded.""" + h5_file = self._create_hdf5_file("explicit.h5", ["sys1", "sys2"]) + + # Use explicit system specification with # + input_systems = [f"{h5_file}#/sys1"] + result = process_systems(input_systems) + + # Should not expand, keep as explicit + self.assertEqual(result, [f"{h5_file}#/sys1"]) + + def test_regular_list_unchanged(self) -> None: + """Test that lists without HDF5 files are unchanged.""" + regular1 = self._create_regular_system("sys1") + regular2 = self._create_regular_system("sys2") + + input_systems = [regular1, regular2] + result = process_systems(input_systems) + + # Should remain unchanged + self.assertEqual(result, input_systems) + + def test_invalid_hdf5_file_handled(self) -> None: + """Test that invalid HDF5 files are handled gracefully.""" + # Create a text file with .h5 extension + fake_h5 = os.path.join(self.temp_dir, "fake.h5") + with open(fake_h5, "w") as f: + f.write("This is not an HDF5 file") + + input_systems = [fake_h5] + result = process_systems(input_systems) + + # Should treat as regular system since it can't be read as HDF5 + self.assertEqual(result, [fake_h5]) + + def test_single_system_hdf5_standard_format(self) -> None: + """Test that single-system HDF5 files use standard HDF5 format.""" + # Create an HDF5 file that looks like a single system (has type.raw and set.* at root) + h5_file = os.path.join(self.temp_dir, "single_system.h5") + + with h5py.File(h5_file, "w") as f: + # Add type information at root level (single system structure) + f.create_dataset("type.raw", data=[0, 1]) + f.create_dataset("type_map.raw", data=[b"H", b"O"]) + + # Create sets at root level + for set_idx in range(2): + set_name = f"set.{set_idx:03d}" + set_group = f.create_group(set_name) + + coords = [[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]]] + set_group.create_dataset("coord.npy", data=coords) + + boxes = [[[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]]] + set_group.create_dataset("box.npy", data=boxes) + + energies = [1.0] + set_group.create_dataset("energy.npy", data=energies) + + forces = [[[0.1, 0.1, 0.1], [0.2, 0.2, 0.2]]] + set_group.create_dataset("force.npy", data=forces) + + # Process as list (this is the main use case) + input_systems = [h5_file] + result = process_systems(input_systems) + + # Should use standard HDF5 format for single-system files + self.assertEqual(result, [f"{h5_file}#/"]) + + def test_backward_compatibility(self) -> None: + """Test that existing functionality is preserved.""" + # Test single string (existing behavior) + regular_dir = self._create_regular_system("test_system") + result = process_systems(regular_dir) + self.assertIn(regular_dir, result) + + # Test regular list (existing behavior) + regular_list = [regular_dir] + result = process_systems(regular_list) + self.assertEqual(result, regular_list) + + def test_is_hdf5_file_edge_cases(self) -> None: + """Test edge cases for _is_hdf5_file function.""" + # Test non-existent file + self.assertFalse(_is_hdf5_file("/non/existent/file.h5")) + + # Test file with .h5 extension but not HDF5 format + fake_h5 = os.path.join(self.temp_dir, "fake.h5") + with open(fake_h5, "w") as f: + f.write("This is not an HDF5 file") + self.assertTrue(_is_hdf5_file(fake_h5)) # True due to .h5 extension + + # Test file without extension but HDF5 format + real_h5_no_ext = os.path.join(self.temp_dir, "no_extension") + with h5py.File(real_h5_no_ext, "w") as f: + f.create_dataset("test", data=[1, 2, 3]) + self.assertTrue(_is_hdf5_file(real_h5_no_ext)) + + # Test with HDF5 path containing # + h5_file = self._create_hdf5_file("test.h5", ["sys1"]) + self.assertTrue(_is_hdf5_file(f"{h5_file}#/sys1")) + + def test_is_hdf5_format_edge_cases(self) -> None: + """Test edge cases for _is_hdf5_format function.""" + # Test non-existent file + self.assertFalse(_is_hdf5_format("/non/existent/file")) + + # Test non-HDF5 file + text_file = os.path.join(self.temp_dir, "text.txt") + with open(text_file, "w") as f: + f.write("plain text") + self.assertFalse(_is_hdf5_format(text_file)) + + # Test valid HDF5 file + h5_file = os.path.join(self.temp_dir, "valid.h5") + with h5py.File(h5_file, "w") as f: + f.create_dataset("data", data=[1, 2, 3]) + self.assertTrue(_is_hdf5_format(h5_file)) + + def test_is_hdf5_multisystem_edge_cases(self) -> None: + """Test edge cases for _is_hdf5_multisystem function.""" + # Test non-existent file + self.assertFalse(_is_hdf5_multisystem("/non/existent/file.h5")) + + # Test non-HDF5 file + fake_h5 = os.path.join(self.temp_dir, "fake.h5") + with open(fake_h5, "w") as f: + f.write("not hdf5") + self.assertFalse(_is_hdf5_multisystem(fake_h5)) + + # Test empty HDF5 file + empty_h5 = os.path.join(self.temp_dir, "empty.h5") + with h5py.File(empty_h5, "w") as f: + pass + self.assertFalse(_is_hdf5_multisystem(empty_h5)) + + # Test file with single system-like group (should be False) + single_group = os.path.join(self.temp_dir, "single_group.h5") + with h5py.File(single_group, "w") as f: + grp = f.create_group("system1") + grp.create_dataset("type.raw", data=[0, 1]) + grp.create_group("set.000") + self.assertFalse(_is_hdf5_multisystem(single_group)) + + # Test file with multiple system-like groups (should be True) + multi_group = os.path.join(self.temp_dir, "multi_group.h5") + with h5py.File(multi_group, "w") as f: + for i in range(2): + grp = f.create_group(f"system{i}") + grp.create_dataset("type.raw", data=[0, 1]) + grp.create_group("set.000") + self.assertTrue(_is_hdf5_multisystem(multi_group)) + + # Test file with groups that don't look like systems + non_system_groups = os.path.join(self.temp_dir, "non_system.h5") + with h5py.File(non_system_groups, "w") as f: + grp1 = f.create_group("group1") + grp1.create_dataset("random_data", data=[1, 2, 3]) + grp2 = f.create_group("group2") + grp2.create_dataset("other_data", data=[4, 5, 6]) + self.assertFalse(_is_hdf5_multisystem(non_system_groups)) + + def test_hdf5_file_read_error_handling(self) -> None: + """Test error handling when HDF5 files cannot be read.""" + # Create a corrupted file with .h5 extension + corrupted_h5 = os.path.join(self.temp_dir, "corrupted.h5") + with open(corrupted_h5, "wb") as f: + f.write(b"corrupted\x00\x01\x02data") + + # Should handle gracefully and treat as regular system + input_systems = [corrupted_h5] + result = process_systems(input_systems) + self.assertEqual(result, [corrupted_h5]) + + def test_empty_systems_list(self) -> None: + """Test with empty systems list.""" + result = process_systems([]) + self.assertEqual(result, []) + + def test_non_hdf5_files_with_hdf5_extensions(self) -> None: + """Test files with .hdf5 extension that aren't valid HDF5.""" + fake_hdf5 = os.path.join(self.temp_dir, "fake.hdf5") + with open(fake_hdf5, "w") as f: + f.write("not an hdf5 file") + + input_systems = [fake_hdf5] + result = process_systems(input_systems) + self.assertEqual(result, [fake_hdf5]) + + def test_hdf5_with_mixed_group_types(self) -> None: + """Test HDF5 file with mix of system and non-system groups.""" + mixed_h5 = os.path.join(self.temp_dir, "mixed.h5") + with h5py.File(mixed_h5, "w") as f: + # Valid system group + sys_grp = f.create_group("water_system") + sys_grp.create_dataset("type.raw", data=[0, 1]) + sys_grp.create_dataset("type_map.raw", data=[b"H", b"O"]) + sys_grp.create_group("set.000") + + # Invalid group (no type.raw) + invalid_grp = f.create_group("metadata") + invalid_grp.create_dataset("info", data=[1, 2, 3]) + + # Another valid system group + sys_grp2 = f.create_group("ice_system") + sys_grp2.create_dataset("type.raw", data=[0, 1]) + sys_grp2.create_dataset("type_map.raw", data=[b"H", b"O"]) + sys_grp2.create_group("set.000") + + input_systems = [mixed_h5] + result = process_systems(input_systems) + + # Should expand to include only valid systems + expected = [f"{mixed_h5}#/water_system", f"{mixed_h5}#/ice_system"] + self.assertEqual(len(result), 2) + for expected_sys in expected: + self.assertIn(expected_sys, result) + + def test_patterns_parameter(self) -> None: + """Test process_systems with patterns parameter.""" + # Test that patterns parameter still works for single string input + regular_dir = self._create_regular_system("pattern_test") + result = process_systems(regular_dir, patterns=["*"]) + # This should call rglob_sys_str instead of expand_sys_str + # The exact behavior depends on the directory structure, but it should not crash + self.assertIsInstance(result, list) + + +if __name__ == "__main__": + unittest.main() diff --git a/source/tests/pt/test_hdf5_dataloader.py b/source/tests/pt/test_hdf5_dataloader.py new file mode 100644 index 0000000000..a61d82741a --- /dev/null +++ b/source/tests/pt/test_hdf5_dataloader.py @@ -0,0 +1,267 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import os +import tempfile +import unittest + +import h5py + +# Test PyTorch dataloader - may not be available in all environments +try: + from deepmd.pt.utils.dataloader import ( + DpLoaderSet, + ) + + PYTORCH_AVAILABLE = True +except ImportError: + PYTORCH_AVAILABLE = False + + +@unittest.skipUnless(PYTORCH_AVAILABLE, "PyTorch not available") +class TestHDF5DataloaderSupport(unittest.TestCase): + def setUp(self) -> None: + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.addCleanup(self._cleanup) + + def _cleanup(self) -> None: + """Clean up test fixtures.""" + import shutil + + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_single_system_hdf5(self, filename: str) -> str: + """Create an HDF5 file representing a single system.""" + h5_path = os.path.join(self.temp_dir, filename) + + with h5py.File(h5_path, "w") as f: + # Add type information at root level + f.create_dataset("type.raw", data=[0, 1]) + f.create_dataset("type_map.raw", data=[b"H", b"O"]) + + # Create data sets + for set_idx in range(2): + set_name = f"set.{set_idx:03d}" + set_group = f.create_group(set_name) + + # Add realistic data for 2 atoms, 3 frames + coords = [ + [[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]], + [[0.1, 0.0, 0.0], [1.1, 0.0, 0.0]], + [[0.2, 0.0, 0.0], [1.2, 0.0, 0.0]], + ] + set_group.create_dataset("coord.npy", data=coords) + + boxes = [[[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]]] * 3 + set_group.create_dataset("box.npy", data=boxes) + + energies = [1.0, 1.1, 1.2] + set_group.create_dataset("energy.npy", data=energies) + + forces = [ + [[0.1, 0.0, 0.0], [0.2, 0.0, 0.0]], + [[0.11, 0.0, 0.0], [0.21, 0.0, 0.0]], + [[0.12, 0.0, 0.0], [0.22, 0.0, 0.0]], + ] + set_group.create_dataset("force.npy", data=forces) + + return h5_path + + def _create_multisystem_hdf5(self, filename: str, systems: list[str]) -> str: + """Create an HDF5 file with multiple systems.""" + h5_path = os.path.join(self.temp_dir, filename) + + with h5py.File(h5_path, "w") as f: + for sys_name in systems: + sys_group = f.create_group(sys_name) + + # Add type information + sys_group.create_dataset("type.raw", data=[0, 1]) + sys_group.create_dataset("type_map.raw", data=[b"H", b"O"]) + + # Create data set + set_group = sys_group.create_group("set.000") + + coords = [[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]]] + set_group.create_dataset("coord.npy", data=coords) + + boxes = [[[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]]] + set_group.create_dataset("box.npy", data=boxes) + + energies = [1.0] + set_group.create_dataset("energy.npy", data=energies) + + forces = [[[0.1, 0.0, 0.0], [0.2, 0.0, 0.0]]] + set_group.create_dataset("force.npy", data=forces) + + return h5_path + + def test_single_system_hdf5_string_input(self) -> None: + """Test dataloader with single-system HDF5 file as string input.""" + h5_file = self._create_single_system_hdf5("single.h5") + + try: + loader = DpLoaderSet( + systems=h5_file, # String input + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Should create one system without expansion + self.assertEqual(len(loader.systems), 1) + # The system path should be the HDF5 file with #/ suffix (standard HDF5 format) + system_path = loader.systems[0].system + self.assertTrue(system_path.startswith(h5_file)) + self.assertTrue(system_path.endswith("#/")) + + except Exception as e: + # If there are issues with the actual data loading, that's ok + # We're mainly testing the path handling logic + self.assertIn("System", str(e)) + + def test_multisystem_hdf5_string_input(self) -> None: + """Test dataloader with multisystem HDF5 file as string input.""" + h5_file = self._create_multisystem_hdf5("multi.h5", ["sys1", "sys2"]) + + try: + loader = DpLoaderSet( + systems=h5_file, # String input + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Should expand to multiple systems + self.assertEqual(len(loader.systems), 2) + + # Check that the systems have correct expanded paths + system_paths = [sys.system for sys in loader.systems] + expected_paths = [f"{h5_file}#/sys1", f"{h5_file}#/sys2"] + + for expected in expected_paths: + self.assertIn(expected, system_paths) + + except Exception as e: + # Test the path handling even if data loading fails + self.assertIn("System", str(e)) + + def test_hdf5_file_list_input(self) -> None: + """Test dataloader with list of HDF5 files.""" + # This tests that when process_systems expands HDF5 files in lists, + # the dataloader can handle the resulting expanded paths + h5_file1 = self._create_multisystem_hdf5("multi1.h5", ["water", "ice"]) + h5_file2 = self._create_single_system_hdf5("single.h5") + + # Simulate what process_systems would do + from deepmd.utils.data_system import ( + process_systems, + ) + + processed_systems = process_systems([h5_file1, h5_file2]) + + # Should have 3 systems: 2 from multi1.h5 + 1 from single.h5 + self.assertEqual(len(processed_systems), 3) + + try: + loader = DpLoaderSet( + systems=processed_systems, # List input from process_systems + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Should handle the expanded system paths + self.assertEqual(len(loader.systems), 3) + + except Exception as e: + # Path handling should work even if data details fail + self.assertIn("System", str(e)) + + def test_invalid_hdf5_string_input(self) -> None: + """Test dataloader with invalid HDF5 file as string input.""" + fake_h5 = os.path.join(self.temp_dir, "fake.h5") + with open(fake_h5, "w") as f: + f.write("not an hdf5 file") + + try: + loader = DpLoaderSet( + systems=fake_h5, # Invalid HDF5 file + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Should treat as single system without expansion + self.assertEqual(len(loader.systems), 1) + self.assertEqual(loader.systems[0].system, fake_h5) + + except Exception as e: + # Should fail gracefully without crashing on path processing + self.assertIsInstance(e, (OSError, FileNotFoundError, ValueError)) + + def test_non_hdf5_string_input(self) -> None: + """Test dataloader with non-HDF5 file as string input.""" + text_file = os.path.join(self.temp_dir, "data.txt") + with open(text_file, "w") as f: + f.write("plain text file") + + try: + loader = DpLoaderSet( + systems=text_file, # Non-HDF5 file + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Should treat as single system + self.assertEqual(len(loader.systems), 1) + self.assertEqual(loader.systems[0].system, text_file) + + except Exception as e: + # Should handle gracefully + self.assertIsInstance(e, (OSError, FileNotFoundError, ValueError)) + + def test_empty_hdf5_file(self) -> None: + """Test dataloader with empty HDF5 file.""" + empty_h5 = os.path.join(self.temp_dir, "empty.h5") + with h5py.File(empty_h5, "w") as f: + pass # Create empty file + + try: + loader = DpLoaderSet( + systems=empty_h5, + batch_size=1, + type_map=["H", "O"], + seed=42, + shuffle=False, + ) + + # Empty HDF5 file might result in different behavior + # The key is that it doesn't crash during path processing + self.assertIsInstance(loader.systems, list) + + except Exception as e: + # Expected to fail on data validation, not path processing + self.assertIsInstance(e, (ValueError, FileNotFoundError, KeyError, OSError)) + + +# Test without PyTorch dependency +class TestPyTorchDataloaderFallback(unittest.TestCase): + def test_pytorch_import_graceful_failure(self) -> None: + """Test that missing PyTorch dependency is handled gracefully.""" + if not PYTORCH_AVAILABLE: + # Verify that the import fails gracefully + with self.assertRaises(ImportError): + from deepmd.pt.utils.dataloader import DpLoaderSet # noqa: F401 + else: + # If PyTorch is available, this test should pass + self.skipTest("PyTorch is available") + + +if __name__ == "__main__": + unittest.main()