|
| 1 | +import unittest |
| 2 | +import tempfile |
| 3 | +import shutil |
| 4 | +import subprocess |
| 5 | +import os |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +from pyhealth.datasets import MIMIC3Dataset |
| 9 | + |
| 10 | + |
| 11 | +class TestMIMIC3Demo(unittest.TestCase): |
| 12 | + """Test MIMIC3 dataset with demo data downloaded from PhysioNet.""" |
| 13 | + |
| 14 | + def setUp(self): |
| 15 | + """Download and set up demo dataset for each test.""" |
| 16 | + self.temp_dir = tempfile.mkdtemp() |
| 17 | + self._download_demo_dataset() |
| 18 | + self._load_dataset() |
| 19 | + |
| 20 | + def tearDown(self): |
| 21 | + """Clean up downloaded dataset after each test.""" |
| 22 | + if self.temp_dir and os.path.exists(self.temp_dir): |
| 23 | + shutil.rmtree(self.temp_dir) |
| 24 | + |
| 25 | + def _download_demo_dataset(self): |
| 26 | + """Download MIMIC-III demo dataset using wget.""" |
| 27 | + download_url = "https://physionet.org/files/mimiciii-demo/1.4/" |
| 28 | + |
| 29 | + # Use wget to download the demo dataset recursively |
| 30 | + cmd = [ |
| 31 | + "wget", |
| 32 | + "-r", |
| 33 | + "-N", |
| 34 | + "-c", |
| 35 | + "-np", |
| 36 | + "--directory-prefix", |
| 37 | + self.temp_dir, |
| 38 | + download_url, |
| 39 | + ] |
| 40 | + |
| 41 | + try: |
| 42 | + subprocess.run(cmd, check=True, capture_output=True, text=True) |
| 43 | + except subprocess.CalledProcessError as e: |
| 44 | + raise unittest.SkipTest(f"Failed to download MIMIC-III demo dataset: {e}") |
| 45 | + except FileNotFoundError: |
| 46 | + raise unittest.SkipTest("wget not available - skipping download test") |
| 47 | + |
| 48 | + # Find the downloaded dataset path |
| 49 | + physionet_dir = ( |
| 50 | + Path(self.temp_dir) / "physionet.org" / "files" / "mimiciii-demo" / "1.4" |
| 51 | + ) |
| 52 | + if physionet_dir.exists(): |
| 53 | + self.demo_dataset_path = str(physionet_dir) |
| 54 | + else: |
| 55 | + raise unittest.SkipTest("Downloaded dataset not found in expected location") |
| 56 | + |
| 57 | + def _load_dataset(self): |
| 58 | + """Load the dataset for testing.""" |
| 59 | + tables = ["diagnoses_icd", "procedures_icd", "prescriptions", "noteevents"] |
| 60 | + self.dataset = MIMIC3Dataset(root=self.demo_dataset_path, tables=tables) |
| 61 | + |
| 62 | + def test_stats(self): |
| 63 | + """Test .stats() method execution.""" |
| 64 | + try: |
| 65 | + self.dataset.stats() |
| 66 | + except Exception as e: |
| 67 | + self.fail(f"dataset.stats() failed: {e}") |
| 68 | + |
| 69 | + def test_get_events(self): |
| 70 | + """Test get_patient and get_events methods with patient 10006.""" |
| 71 | + # Test get_patient method |
| 72 | + patient = self.dataset.get_patient("10006") |
| 73 | + self.assertIsNotNone(patient, msg="Patient 10006 should exist in demo dataset") |
| 74 | + |
| 75 | + # Test get_events method |
| 76 | + events = patient.get_events() |
| 77 | + self.assertIsNotNone(events, msg="get_events() should not return None") |
| 78 | + self.assertIsInstance(events, list, msg="get_events() should return a list") |
| 79 | + self.assertGreater( |
| 80 | + len(events), 0, msg="get_events() should not return an empty list" |
| 81 | + ) |
| 82 | + |
| 83 | + |
| 84 | +if __name__ == "__main__": |
| 85 | + unittest.main() |
0 commit comments