|
26 | 26 | from geecs_data_utils.type_defs import ScanTag |
27 | 27 | from geecs_data_utils.geecs_paths_config import GeecsPathsConfig |
28 | 28 |
|
| 29 | +# Acceptable extensions (lowercase, no dot) |
| 30 | +_ACCEPTABLE_EXTS = {"png", "tif", "tiff", "h5", "dat", "tdms", "jpg", "jpeg", "bmp"} |
| 31 | + |
29 | 32 | # from geecs_data_utils.types import ScanConfig, ScanMode |
30 | 33 |
|
31 | 34 | # module‐level logger |
@@ -709,6 +712,69 @@ def get_common_shot_dataframe( |
709 | 712 |
|
710 | 713 | return pd.DataFrame(rows) |
711 | 714 |
|
| 715 | + # ---- Minimal naming + discovery primitives (no shot loops) ------------------- |
| 716 | + |
| 717 | + @staticmethod |
| 718 | + def _shot_str(shot: int) -> str: |
| 719 | + """Zero-pad to at least 3 digits; expand naturally beyond 999.""" |
| 720 | + return f"{shot:03d}" if shot < 1000 else str(shot) |
| 721 | + |
| 722 | + def list_device_folders(self) -> list[str]: |
| 723 | + """Return device subfolder names from this scan folder.""" |
| 724 | + try: |
| 725 | + return self.get_folders_and_files().get("devices", []) |
| 726 | + except Exception: |
| 727 | + root = self.get_folder() |
| 728 | + return ( |
| 729 | + [p.name for p in root.iterdir() if p.is_dir()] |
| 730 | + if root and root.exists() |
| 731 | + else [] |
| 732 | + ) |
| 733 | + |
| 734 | + def device_folder(self, device: str) -> Path: |
| 735 | + """Resolve '<scan>/<device>'.""" |
| 736 | + return self.get_folder() / device |
| 737 | + |
| 738 | + @staticmethod |
| 739 | + def build_asset_filename( |
| 740 | + *, scan: int, shot: int, device: str, ext: str, variant: Optional[str] = None |
| 741 | + ) -> str: |
| 742 | + """Build canonical expected file naming.""" |
| 743 | + ext = ext.lstrip(".").lower() |
| 744 | + shot_str = ScanPaths._shot_str(shot) |
| 745 | + variant_seg = "" if not variant else f"{variant}" |
| 746 | + return f"Scan{scan:03d}_{device}_{shot_str}{variant_seg}.{ext}" |
| 747 | + |
| 748 | + def build_asset_path( |
| 749 | + self, *, shot: int, device: str, ext: str, variant: Optional[str] = None |
| 750 | + ) -> Path: |
| 751 | + """Full expected path for one asset.""" |
| 752 | + tag = self.get_tag() |
| 753 | + fname = self.build_asset_filename( |
| 754 | + scan=tag.number, shot=shot, device=device, ext=ext, variant=variant |
| 755 | + ) |
| 756 | + return self.device_folder(device) / fname |
| 757 | + |
| 758 | + def infer_device_ext(self, device: str, *, max_files: int = 5) -> str: |
| 759 | + """Peek at up to `max_files` files to find proper file extension.""" |
| 760 | + from collections import Counter |
| 761 | + |
| 762 | + dpath = self.device_folder(device) |
| 763 | + if not dpath.exists(): |
| 764 | + return "png" |
| 765 | + |
| 766 | + counts = Counter() |
| 767 | + seen = 0 |
| 768 | + for f in dpath.iterdir(): |
| 769 | + if f.is_file(): |
| 770 | + ext = f.suffix.lower().lstrip(".") |
| 771 | + if ext in _ACCEPTABLE_EXTS: |
| 772 | + counts[ext] += 1 |
| 773 | + seen += 1 |
| 774 | + if seen >= max_files: |
| 775 | + break |
| 776 | + return counts.most_common(1)[0][0] if counts else "png" |
| 777 | + |
712 | 778 |
|
713 | 779 | ScanPaths.reload_paths_config() |
714 | 780 |
|
|
0 commit comments