diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 968d6ddd..206e944b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,7 @@ repos: rev: v0.24.2 hooks: - id: toml-sort-fix + exclude: ^examples/ - repo: https://github.com/pre-commit/mirrors-prettier rev: v4.0.0-alpha.8 hooks: diff --git a/examples/heracles.cfg b/examples/heracles.cfg deleted file mode 100644 index 75d321fa..00000000 --- a/examples/heracles.cfg +++ /dev/null @@ -1,72 +0,0 @@ -# example config file for Heracles -# values from [defaults] are applied to all sections - -[defaults] -lmin = 10 -bins = 32 log 2l+1 - -[spectra:clustering] -include = D, D -lmax = 2000 -l2max = 4000 - -[spectra:shear] -include = - G_E, G_E - G_B, G_B - G_E, G_B -lmax = 3000 -l2max = 5000 - -[spectra:ggl] -include = - D, G_E - D, G_B -lmax = 1000 -l2max = 2000 - -[fields:D] -type = positions -columns = - SHE_RA - SHE_DEC -mask = V -nside = 2048 -lmax = 2000 - -[fields:G] -type = shears -columns = - SHE_RA - SHE_DEC - SHE_E1_CAL - -SHE_E2_CAL - SHE_WEIGHT -mask = W -nside = 2048 -lmax = 3000 - -[fields:V] -type = visibility -nside = 4096 -lmax = 6000 - -[fields:W] -type = weights -columns = - SHE_RA - SHE_DEC - SHE_WEIGHT -nside = 8192 -lmax = 8000 - -[catalogs:fs2-dr1n-noia] -source = catalog.fits -selections = - 0 = TOM_BIN_ID==0 - 1 = TOM_BIN_ID==1 - 2 = TOM_BIN_ID==2 -visibility = - 0 = vmap.0.fits - 1 = vmap.1.fits - 2 = vmap.2.fits diff --git a/examples/heracles.toml b/examples/heracles.toml new file mode 100644 index 00000000..06ffbd52 --- /dev/null +++ b/examples/heracles.toml @@ -0,0 +1,77 @@ +# example config file for Heracles + +[[catalogs]] +source = "pos_catalog.fits" +label = "position catalogue" +fields = ["POS", "VIS"] +selections = [ + {key = 1, selection = "POS_TOM_BIN_ID==1", visibility = "visibility.1.fits"}, + {key = 2, selection = "POS_TOM_BIN_ID==2", visibility = "visibility.2.fits"}, + {key = 3, selection = "POS_TOM_BIN_ID==3", visibility = "visibility.3.fits"}, +] + +[[catalogs]] +source = "she_catalog.fits" +label = "shear catalogue" +fields = ["SHE", "WHT"] +visibility = "coverage.fits" # global visibility for this catalogue +selections = [ + {key = 1, selection = "TOM_BIN_ID==1"}, + {key = 2, selection = "TOM_BIN_ID==2"}, + {key = 3, selection = "TOM_BIN_ID==3"}, +] + +[[fields]] +key = "POS" +type = "positions" +columns = ["RIGHT_ASCENSION", "DECLINATION"] +mask = "VIS" +mapper = "healpix" # default, requires healpy +lmax = 2000 +nside = 2048 + +[[fields]] +key = "VIS" +type = "visibility" +lmax = 6000 # maximum lmax + l2max from spectra involving POS +nside = 4096 + +[[fields]] +key = "SHE" +type = "shears" +columns = ["SHE_RA", "SHE_DEC", "SHE_E1_CAL", "-SHE_E2_CAL", "SHE_WEIGHT"] +mask = "WHT" +mapper = "discrete" # requires ducc, no nside here +lmax = 3000 + +[[fields]] +key = "WHT" +type = "weights" +columns = ["SHE_RA", "SHE_DEC", "SHE_WEIGHT"] +lmax = 8000 # maximum lmax + l2max from spectra involving SHE +nside = 8192 + +[[spectra]] +# galaxy clustering +key = "POS-POS" # can use string syntax here +lmin = 10 # for angular binning +lmax = 2000 +l2max = 4000 # mixing matrix: number of columns +l3max = 6000 # lmax + l2max is the default +bins = {n = 32, spacing = "log", weights = "2l+1"} + +[[spectra]] +# cosmic shear +key = "SHE-SHE" +lmin = 10 +lmax = 3000 +l2max = 5000 +bins = {n = 32, spacing = "log", weights = "2l+1"} + +[[spectra]] +# galaxy-galaxy lensing +key = ["POS", "SHE"] # can also use sequence +lmin = 10 +lmax = 1000 +l2max = 2000 +bins = {n = 32, spacing = "log", weights = "2l+1"} diff --git a/heracles/__init__.py b/heracles/__init__.py index c48c4440..0720590d 100644 --- a/heracles/__init__.py +++ b/heracles/__init__.py @@ -37,6 +37,8 @@ "toc_filter", "toc_match", "update_metadata", + "key_to_str", + "key_from_str", # fields "ComplexField", "Field", @@ -86,12 +88,13 @@ FootprintFilter, InvalidValueFilter, ) - from .core import ( TocDict, toc_filter, toc_match, update_metadata, + key_to_str, + key_from_str, ) from .fields import ( diff --git a/heracles/cli.py b/heracles/cli.py index bc4d47ef..d9715a3e 100644 --- a/heracles/cli.py +++ b/heracles/cli.py @@ -21,726 +21,242 @@ from __future__ import annotations import argparse -import configparser -import logging -import os -from collections.abc import Iterable, Iterator, Mapping -from typing import TYPE_CHECKING, Any, Callable, Union +import collections +import gc +import sys +import textwrap +import traceback -import numpy as np +import heracles.config +import heracles.io -if TYPE_CHECKING: - from numpy.typing import NDArray - - from .fields import Field +try: + import tomllib +except ModuleNotFoundError: + from heracles.core import external_dependency_explainer -# valid option keys -FIELD_TYPES = { - "positions": "heracles.fields:Positions", - "shears": "heracles.fields:Shears", - "visibility": "heracles.fields:Visibility", - "weights": "heracles.fields:Weights", -} + with external_dependency_explainer: + import tomli as tomllib # type: ignore[no-redef] +TYPE_CHECKING = False +if TYPE_CHECKING: + from heracles.catalog import CatalogView + from heracles.config import Config -def getlist(value: str) -> list[str]: - """Convert to list.""" - return list(filter(None, map(str.strip, value.splitlines()))) +class MainFormatter(argparse.RawDescriptionHelpFormatter): + """Formatter that keeps order of arguments for usage.""" -def getdict(value: str) -> dict[str, str]: - """Convert to dictionary.""" - out = {} - for line in map(str.strip, value.splitlines()): - if not line: - continue - key, sep, val = line.partition("=") - if sep != "=": - msg = f"Invalid value: {line!r} (expected 'KEY = VALUE')" - raise ValueError(msg) - out[key.rstrip()] = val.lstrip() - return out + def add_usage(self, usage, actions, groups, prefix=None): + self.actions = actions + super().add_usage(usage, actions, groups, prefix) + def _format_actions_usage(self, actions, groups): + return super()._format_actions_usage(self.actions, groups) -def getchoice(value: str, choices: dict[str, Any]) -> Any: - """Get choice from a fixed set of option values.""" - try: - return choices[value] - except KeyError: - expected = ", ".join(map(repr, choices)) - msg = f"Invalid value: {value!r} (expected {expected})" - raise ValueError(msg) from None - - -def getpath(value: str) -> str: - "Convert to path, expanding environment variables." - return os.path.expanduser(os.path.expandvars(value)) - - -def getfilter(value: str) -> list[tuple[Any, ...]]: - """Convert to list of include or exclude filters.""" - filt = [] - for row in getlist(value): - item = [] - for part in map(str.strip, row.split(",")): - if part == "...": - item.append(...) - elif part.isdigit(): - item.append(int(part)) - else: - item.append(part) - filt.append(tuple(item)) - return filt - - -class ConfigParser(configparser.ConfigParser): - """ConfigParser with additional getters.""" - - _UNSET = configparser._UNSET - - def __init__(self) -> None: - # fully specify parent class - super().__init__( - defaults={ - "mapper": "healpix", - }, - dict_type=dict, - allow_no_value=False, - delimiters=("=",), - comment_prefixes=("#",), - inline_comment_prefixes=("#",), - strict=True, - empty_lines_in_values=False, - default_section="defaults", - interpolation=None, - converters={ - "list": getlist, - "dict": getdict, - "path": getpath, - "filter": getfilter, - }, - ) - def getchoice( - self, - section, - option, - choices, - *, - raw=False, - vars=None, # noqa: A002 - fallback=_UNSET, - ): - """Get choice from a fixed set of option values.""" - try: - value = self.get(section, option, raw=False, vars=None) - except (configparser.NoSectionError, configparser.NoOptionError): - if fallback is not self._UNSET: - return fallback - raise - return getchoice(value, choices) - - def sections(self, prefix: str | None = None) -> list[str]: - """ - Return all the configuration section names. If given, only - sections starting with *prefix* are returned. - """ - - sections = super().sections() - if prefix is not None: - sections = [s for s in sections if s.startswith(prefix)] - return sections - - def subsections(self, group: str) -> dict[str, str]: - """ - Return a mapping of subsections in *group*. - """ - sections = self.sections(f"{group}:") - return {s.rpartition(":")[-1].strip(): s for s in sections} - - -def mapper_from_config(config, section): - """Construct a mapper instance from config.""" - - choices = { - "none": "none", - "healpix": "healpix", - "discrete": "discrete", - } - - mapper = config.getchoice(section, "mapper", choices) - - if mapper == "healpix": - from .healpy import HealpixMapper - - nside = config.getint(section, "nside") - lmax = config.getint(section, "lmax", fallback=None) - deconvolve = config.getboolean(section, "deconvolve", fallback=None) - return HealpixMapper(nside, lmax, deconvolve=deconvolve) - - if mapper == "discrete": - from .ducc import DiscreteMapper - - lmax = config.getint(section, "lmax", fallback=None) - return DiscreteMapper(lmax) - - return None - - -def field_from_config(config, section): - """Construct a field instance from config.""" - - from pkgutil import resolve_name - - _type = config.getchoice(section, "type", FIELD_TYPES) - if isinstance(_type, str): - try: - cls = resolve_name(_type) - except (ValueError, ImportError, AttributeError) as exc: - value = config.get(section, "type") - msg = ( - f"Internal error: field type {value!r} maps to type {_type!r}, " - f"which raised the following error: {exc!s}" - ) - raise RuntimeError(msg) from None +def read_config(path: str | None = None) -> Config: + """ + Read a config file. + """ + if path is None: + path = "heracles.toml" + use_default = True else: - cls = _type - mapper = mapper_from_config(config, section) - columns = config.getlist(section, "columns", fallback=()) - mask = config.get(section, "mask", fallback=None) - return cls(mapper, *columns, mask=mask) - - -def fields_from_config(config): - """Construct all field instances from config.""" - sections = config.subsections("fields") - return { - name: field_from_config(config, section) for name, section in sections.items() - } - - -def catalog_from_config(config, section, label=None, *, out=None): - """Construct a catalogue instance from config.""" - - from .catalog import FitsCatalog - from .io import read_vmap - - # TODO support non-FITS catalogue sources - source = config.getpath(section, "source") - # check if visibility is per catalogue or per selection - visibility: str | Mapping[str, str] - visibility = config.get(section, "visibility", fallback=None) - visibility_transform = config.getboolean( - section, - "visibility-transform", - fallback=False, - ) - visibility_lmax = config.getint(section, "visibility-lmax", fallback=None) - # check if visibility is a mapping - if visibility and "\n" in visibility: - visibility = config.getdict(section, "visibility") - selections = config.getdict(section, "selections") - # build the base catalogue - base_catalog = FitsCatalog(source) - base_catalog.label = label - # set base catalogue's visibility if just one was given - if isinstance(visibility, str): - try: - vmap = read_vmap( - getpath(visibility), - transform=visibility_transform, - lmax=visibility_lmax, - ) - except (TypeError, ValueError, OSError) as exc: - msg = f"Cannot load visibility: {exc!s}" - raise ValueError(msg) - else: - base_catalog.visibility = vmap - del vmap - # create a view of the base catalogue for each selection - # since `out` can be given, also keep track of selections added here - if out is None: - out = {} - added = set() - for key, where in selections.items(): - # convert key to number and make sure it wasn't used before - num = int(key) - if out and num in out: - msg = f"Duplicate selection: {num}" - raise ValueError(msg) - # create view from selection string, if present - # otherwise, give the base catalog itself - if where: - catalog = base_catalog.where(where) - else: - catalog = base_catalog - # store the selection - out[num] = catalog - added.add(num) - # assign visibilities to individual selections if a mapping was given - # only allow visibilities for selections added here - if isinstance(visibility, Mapping): - for key, value in visibility.items(): - num = int(key) - if num not in added: - msg = f"Invalid value: unknown selection '{num}'" - raise ValueError(msg) - try: - vmap = read_vmap( - getpath(value), - transform=visibility_transform, - lmax=visibility_lmax, - ) - except (TypeError, ValueError, OSError) as exc: - msg = f"Cannot load visibility: {exc!s}" - raise ValueError(msg) - else: - out[num].visibility = vmap - del vmap - # all done, return `out` unconditionally - return out - - -def catalogs_from_config(config): - """Construct all catalog instances from config.""" - sections = config.subsections("catalogs") - catalogs = {} - for label, section in sections.items(): - catalog_from_config(config, section, label, out=catalogs) - return catalogs - - -def bins_from_config(config, section): - """Construct angular bins from config.""" - - # dictionary of {spacing: (op, invop)} - spacings = { - "linear": (lambda x: x, lambda x: x), - "log": (np.log10, lambda x: 10**x), - "sqrt": (np.sqrt, np.square), - "log1p": (np.log1p, np.expm1), - } - - # dictionary of known weights - weights = { - None, - "2l+1", - "l(l+1)", - } - - bins = config.get(section, "bins", fallback="none") - - if bins == "none": - return None, None - - binopts = bins.split() - - if not 2 <= len(binopts) <= 3: - msg = f"{section}: bins should be of the form ' []'" - raise ValueError(msg) - - n = int(binopts[0]) - s = binopts[1] - w = binopts[2] if len(binopts) > 2 else None - - if n < 2: - msg = f"Invalid bin size '{n}' in section {section}" - raise ValueError(msg) - if s not in spacings: - msg = f"Invalid bin spacing '{s}' in section {section}" - raise ValueError(msg) - if w is not None and w not in weights: - msg = f"Invalid bin weights '{w}' in section {section}" - raise ValueError(msg) - - lmin = config.getint(section, "lmin", fallback=1) - lmax = config.getint(section, "lmax") - - op, inv = spacings[s] - arr = inv(np.linspace(op(lmin), op(lmax + 1), n + 1)) - # fix first and last array element to be exact - arr[0], arr[-1] = lmin, lmax + 1 - - return arr, w - - -def spectrum_from_config(config, section): - """Construct info dict for angular power spectra from config.""" - - options = config[section] - - info = {} - if "lmax" in options: - info["lmax"] = options.getint("lmax") - if "l2max" in options: - info["l2max"] = options.getint("l2max") - if "l3max" in options: - info["l3max"] = options.getint("l3max") - if "include" in options: - info["include"] = options.getfilter("include") - if "exclude" in options: - info["exclude"] = options.getfilter("exclude") - if "debias" in options: - info["debias"] = options.getboolean("debias") - if "bins" in options: - info["bins"] = bins_from_config(config, section) - - return info - - -def spectra_from_config(config): - """Construct pairs of labels and *kwargs* for angular power spectra.""" - sections = config.subsections("spectra") - spectra = [] - for label, section in sections.items(): - spectra += [(label, spectrum_from_config(config, section))] - if not spectra: - spectra += [(None, {})] - return spectra - - -# the type of a single path -Path = Union[str, os.PathLike] - -# the type of one or multiple paths -Paths = Union[Path, Iterable[Path]] - -# the type of loader functions for load_xyz() -ConfigLoader = Callable[[Paths], ConfigParser] - + use_default = False -def configloader(path: Paths) -> ConfigParser: - """Load a config file using configparser.""" - - if isinstance(path, (str, os.PathLike)): - path = (path,) - - config = ConfigParser() - for p in path: - with open(p) as fp: - config.read_file(fp) - return config - - -# this constant sets the default loader -DEFAULT_LOADER = configloader - - -def map_all_selections( - fields: Mapping[str, Field], - config: ConfigParser, - logger: logging.Logger, - progress: bool, -) -> Iterator: - """Iteratively map the catalogues defined in config.""" - - from .mapping import map_catalogs + try: + with open(path, "rb") as fp: + config = tomllib.load(fp) + except FileNotFoundError as exc: + if use_default and hasattr(exc, "add_note"): + exc.add_note( + "(Hint: it looks like Heracles cannot load its default " + "configuration file. Did you forget to create one?)" + ) + raise - # load catalogues to process - catalogs = catalogs_from_config(config) + return heracles.config.load(config) - logger.info("fields %s", ", ".join(map(repr, fields))) - # process each catalogue separately into maps - for key, catalog in catalogs.items(): - logger.info( - "%s%s", - f"catalog {catalog.label!r}, " if catalog.label else "", - f"selection {key}", - ) +def _maps_alms_internal( + *, + maps_path: str | None = None, + alms_path: str | None = None, + config_path: str | None = None, + parallel: bool = False, +) -> None: + """ + Compute maps and/or alms from catalogues. + """ - # maps for single catalogue - yield map_catalogs( - fields, - {key: catalog}, - parallel=True, # process everything at this level in one go - progress=progress, - ) + config = read_config(config_path) + if maps_path is not None: + maps_out = heracles.io.MapFits(maps_path, clobber=True) + else: + maps_out = None -def load_all_maps(paths: Paths, logger: logging.Logger) -> Iterator: - """Iterate over MapFits from a path or list of paths.""" + if alms_path is not None: + alms_out = heracles.io.AlmFits(alms_path, clobber=True) + else: + alms_out = None + + for catalog_config in config.catalogs: + base_catalog = heracles.FitsCatalog(catalog_config.source) + base_catalog.label = catalog_config.label + + if catalog_config.visibility is not None: + base_catalog.visibility = heracles.read_vmap(catalog_config.visibility) + + catalogs: dict[int, CatalogView] = {} + visibilities: dict[int, str | None] = {} + for selection in catalog_config.selections: + catalogs[selection.key] = base_catalog[selection.selection] + visibilities[selection.key] = selection.visibility + + fields = {key: config.fields[key] for key in catalog_config.fields} + + for key, catalog in catalogs.items(): + if visibilities[key] is not None: + catalogs[key].visibility = heracles.read_vmap(visibilities[key]) + + # this split will no longer be neccessary when visibilities are + # lazy-loaded + if not parallel: + # process one catalogue + data = heracles.map_catalogs( + fields, + {key: catalog}, + parallel=False, + ) - from .io import MapFits + # write if asked to + if maps_out is not None: + maps_out.update(data) + + # this catalogue is done, clean up + catalogs[key].visibility = None + gc.collect() + + # compute alms if asked to + if alms_out is not None: + heracles.transform(fields, data, out=alms_out) + + # done with data + del data + gc.collect() + + if parallel: + # process all catalogues + data = heracles.map_catalogs( + fields, + catalogs, + out=maps_out, + parallel=True, + ) - # make iterable if single path is given - if isinstance(paths, (str, os.PathLike)): - paths = (paths,) + # compute alms if asked to + if alms_out is not None: + heracles.transform(fields, data, out=alms_out) - for path in paths: - logger.info("reading maps from %s", path) - yield MapFits(path, clobber=False) + # clean up before next catalogue is processed + del base_catalog, catalogs, visibilities + gc.collect() def maps( - path: Path, - *, - files: Paths, - logger: logging.Logger, - loader: ConfigLoader = DEFAULT_LOADER, - progress: bool, + path: str, + config_path: str | None = None, + parallel: bool = False, ) -> None: - """compute maps""" + """map catalogues - from .io import MapFits + Create maps from input catalogues. - # load the config file, this contains the maps definition - logger.info("reading configuration from %s", files) - config = loader(files) - - # construct fields for mapping - fields = fields_from_config(config) - - # iterator over the individual maps - # this generates maps on the fly - itermaps = map_all_selections(fields, config, logger, progress) - - # output goes into a FITS-backed tocdict so we don't fill memory up - out = MapFits(path, clobber=True) - - # iterate over maps, keeping only one in memory at a time - for maps in itermaps: - # write to disk - logger.info("writing maps to %s", path) - out.update(maps) - # forget maps before next turn to free some memory - del maps + """ + return _maps_alms_internal( + maps_path=path, + config_path=config_path, + parallel=parallel, + ) def alms( - path: Path, - *, - files: Paths | None, - maps: Paths | None, - healpix_datapath: Path | None = None, - logger: logging.Logger, - loader: ConfigLoader = DEFAULT_LOADER, - progress: bool, + path: str, + maps: list[str], + config_path: str | None = None, + parallel: bool = False, ) -> None: - """compute spherical harmonic coefficients + """compute alms from catalogues or maps - Compute spherical harmonic coefficients (alms) from catalogues or - maps. For catalogue input, the maps for each selection are created - in memory and discarded after its alms have been computed. + Compute alms from input catalogues or pre-computed maps. """ + # if no maps are given, process directly from catalogues + if not maps: + return _maps_alms_internal( + alms_path=path, + config_path=config_path, + parallel=parallel, + ) - from . import transform - from .healpy import HealpixMapper - from .io import AlmFits - - # load the config file, this contains alms setting and maps definition - logger.info("reading configuration from %s", files) - config = loader(files) - - # set the HEALPix datapath - if healpix_datapath is not None: - HealpixMapper.DATAPATH = healpix_datapath - - # construct fields to get mappers for transform - fields = fields_from_config(config) + # load configuration to get fields + config = read_config(config_path) - # process either catalogues or maps - # everything is loaded via iterators to keep memory use low - itermaps: Iterator - if maps: - itermaps = load_all_maps(maps, logger) - else: - itermaps = map_all_selections(fields, config, logger, progress) - - # output goes into a FITS-backed tocdict so we don't fill up memory - logger.info("writing alms to %s", path) - out = AlmFits(path, clobber=True) - - # iterate over maps and transform each - for maps in itermaps: - logger.info("transforming %d maps", len(maps)) - transform( - fields, - maps, - progress=progress, - out=out, - ) - del maps + # open output FITS + alms_out = heracles.io.AlmFits(path, clobber=True) + for maps_path in maps: + # quick check to see if file is readable + with open(maps_path) as _fp: + pass -def chained_alms(alms: Paths | None) -> Mapping[Any, NDArray] | None: - """Return a ChainMap of AlmFits from all input alms, or None.""" - from collections import ChainMap + # lazy-load the file + data = heracles.io.MapFits(maps_path) - from .io import AlmFits + # transform this fits + heracles.transform(config.fields, data, out=alms_out) - if alms is None: - return None - return ChainMap(*(AlmFits(alm) for alm in reversed(alms))) + # clean up before next iteration + del data def spectra( - path: Path, + path: str, + alms: list[str], *, - files: Paths, - alms: Paths, - alms2: Paths | None, - logger: logging.Logger, - loader: ConfigLoader = DEFAULT_LOADER, - progress: bool, + config_path: str | None = None, ) -> None: - """compute angular power spectra""" + """compute angular power spectra - from .io import ClsFits - from .twopoint import angular_power_spectra - - # load the config file, this contains angular binning settings - logger.info("reading configuration from %s", files) - config = loader(files) - - # collect angular power spectra settings from config - spectra = spectra_from_config(config) - - # link all alms together - all_alms, all_alms2 = chained_alms(alms), chained_alms(alms2) - - # create an empty cls file, then fill it iteratively with alm combinations - out = ClsFits(path, clobber=True) - - total = 0 - logger.info("using %d set(s) of alms", len(all_alms)) - if all_alms2 is not None: - logger.info("using %d set(s) of cross-alms", len(all_alms2)) - for label, info in spectra: - logger.info( - "computing %s spectra", - repr(label) if label is not None else "all", - ) - # angular binning - if info.get("bins") is not None: - bins, weights = info["bins"] - else: - bins, weights = None, None - # compute spectra - angular_power_spectra( - all_alms, - all_alms2, - lmax=info.get("lmax"), - debias=info.get("debias", True), - bins=bins, - weights=weights, - include=info.get("include"), - exclude=info.get("exclude"), - out=out, - ) - logger.info("-> added %d spectra, total is now %d", len(out) - total, len(out)) - total = len(out) - logger.info("finished computing %d spectra", total) + Compute angular power spectra from sets of alms. + """ -def mixmats( - path: Path, - *, - files: Paths, - alms: Paths, - alms2: Paths | None, - logger: logging.Logger, - loader: ConfigLoader = DEFAULT_LOADER, - progress: bool, -) -> None: - """compute mixing matrices""" - - from .fields import get_masks - from .io import MmsFits - from .twopoint import angular_power_spectra, mixing_matrices - - # load the config file, this contains angular binning settings - logger.info("reading configuration from %s", files) - config = loader(files) - - # collect the defined fields from config - fields = fields_from_config(config) - - # collect angular power spectra settings from config - spectra = spectra_from_config(config) - - # link all alms together - all_alms, all_alms2 = chained_alms(alms), chained_alms(alms2) - - # create an empty mms file, then fill it iteratively - out = MmsFits(path, clobber=True) - - total = 0 - logger.info("using %d set(s) of alms", len(all_alms)) - if all_alms2 is not None: - logger.info("using %d set(s) of cross-alms", len(all_alms2)) - for label, info in spectra: - # get mask combinations for fields included in these spectra - include, exclude = info.get("include"), info.get("exclude") - include_masks = get_masks( - fields, - comb=2, - include=include, - exclude=exclude, - append_eb=True, - ) - if not include_masks: - logger.info( - "missing masks for %s spectra, skipping...", - repr(label) if label is not None else "all", - ) - continue - logger.info( - "computing %s mask spectra for %s", - repr(label) if label is not None else "all", - ", ".join(map(str, include_masks)), - ) - # determine the various lmax values - lmax, l2max, l3max = info.get("lmax"), info.get("l2max"), info.get("l3max") - # angular binning, to be applied to rows of mixing matrices - if info.get("bins") is not None: - bins, weights = info["bins"] - else: - bins, weights = None, None - # compute spectra of masks - mask_cls = angular_power_spectra( - all_alms, - all_alms2, - lmax=l3max, - debias=info.get("debias", True), - include=include_masks, - ) - # now compute the mixing matrices from these spectra - logger.info( - "computing %s mixing matrices from %d spectra", - repr(label) if label is not None else "all", - len(mask_cls), - ) - mixing_matrices( - fields, - mask_cls, - l1max=lmax, - l2max=l2max, - l3max=l3max, - bins=bins, - weights=weights, - progress=progress, - out=out, - ) - logger.info("-> added %d mixmats, total is now %d", len(out) - total, len(out)) - total = len(out) - del mask_cls - logger.info("finished computing %d mixing matrices", total) + # load configuration to get requested spectra + config = read_config(config_path) + # make sure two-point statistics are defined in config + if not config.spectra: + raise ValueError(f"{config_path}: no 'spectra' in config") -class MainFormatter(argparse.RawDescriptionHelpFormatter): - """Formatter that keeps order of arguments for usage.""" + # lazy-load all input FITS into a combined mapping + all_alms = collections.ChainMap({}) + for alms_path in alms: + # quick check to see if file is readable + with open(alms_path) as _fp: + pass - def add_usage(self, usage, actions, groups, prefix=None): - self.actions = actions - super().add_usage(usage, actions, groups, prefix) + # add new lazy-loaded FITS to the ChainMap + # keys will be sorted in the order the files are passed + all_alms = all_alms.new_child(heracles.io.AlmFits(alms_path, clobber=False)) - def _format_actions_usage(self, actions, groups): - return super()._format_actions_usage(self.actions, groups) + # compute all spectra combinations + spectra = {} -def main(): +def main() -> int: """Main method of the `heracles` command. Parses arguments and calls the appropriate subcommand. @@ -757,7 +273,7 @@ def add_command(func): parser = commands.add_parser( name, help=help_, - description=description, + description=textwrap.dedent(description), parents=[cmd_parser], formatter_class=MainFormatter, ) @@ -765,27 +281,25 @@ def add_command(func): return parser # common parser for all subcommands - cmd_parser = argparse.ArgumentParser( - add_help=False, - ) + cmd_parser = argparse.ArgumentParser(add_help=False) cmd_parser.add_argument( "-c", "--config", - help="configuration file (can be repeated)", - metavar="", - action="append", - dest="files", - ) - cmd_parser.add_argument( - "--no-progress", - help="do not show progress bars", - action="store_false", - dest="progress", + help="configuration file", + dest="config_path", ) # main parser for CLI invokation main_parser = argparse.ArgumentParser( prog="heracles", + description=textwrap.dedent( + """ + This is Heracles — Harmonic-space statistics on the sphere. + + To run a command, use `heracles `. + To show help for a command, use `heracles --help`. + """ + ), epilog="Made in the Euclid Science Ground Segment", formatter_class=MainFormatter, ) @@ -793,97 +307,58 @@ def add_command(func): commands = main_parser.add_subparsers( title="commands", - metavar="", - help="the processing step to carry out", + metavar="", + help="command to run", ) ######## # maps # ######## - parser = add_command(maps) - group = parser.add_argument_group("output") - group.add_argument( + maps_parser = add_command(maps) + maps_parser.add_argument( + "--parallel", + action="store_true", + help="process all maps of a catalogue in parallel", + ) + maps_parser.add_argument( "path", help="output FITS file for maps", - metavar="", ) ######## # alms # ######## - parser = add_command(alms) - parser.add_argument( - "--healpix-datapath", - help="path to HEALPix data files", - metavar="", + alms_parser = add_command(alms) + alms_parser.add_argument( + "--parallel", + action="store_true", + help="process all maps of a catalogue in parallel", ) - group = parser.add_argument_group("output") - group.add_argument( + alms_parser.add_argument( "path", help="output FITS file for alms", - metavar="", ) - group = parser.add_argument_group("inputs") - group.add_argument( + alms_parser.add_argument( "maps", nargs="*", - default=None, - help="input FITS file(s) for maps", - metavar="", + help="transform pre-computed maps", ) ########### # spectra # ########### - parser = add_command(spectra) - group = parser.add_argument_group("output") - group.add_argument( + spectra_parser = add_command(spectra) + spectra_parser.add_argument( "path", help="output FITS file for spectra", - metavar="", - ) - group = parser.add_argument_group("inputs") - group.add_argument( - "alms", - nargs="+", - help="input FITS file(s) for alms", - metavar="", - ) - group.add_argument( - "-X", - nargs="+", - help="input FITS file(s) for cross-spectra", - metavar="", - dest="alms2", ) - - ########### - # mixmats # - ########### - - parser = add_command(mixmats) - group = parser.add_argument_group("output") - group.add_argument( - "path", - help="output FITS file for mixing matrices", - metavar="", - ) - group = parser.add_argument_group("inputs") - group.add_argument( + spectra_parser.add_argument( "alms", nargs="+", - help="input FITS file(s) for alms", - metavar="", - ) - group.add_argument( - "-X", - nargs="+", - help="input FITS file(s) for cross-spectra", - metavar="", - dest="alms2", + help="input FITS file(s) with sets of alms", ) ####### @@ -897,24 +372,33 @@ def add_command(func): main_parser.print_help() return 1 - # fix default config - if not args.files: - args.files = ["heracles.cfg"] - # get keyword args - kwargs = vars(args) + kwargs = dict(args.__dict__) cmd = kwargs.pop("cmd") - # set up logger for CLI output - logger = logging.getLogger(__name__) - logger.addHandler(logging.StreamHandler()) - logger.setLevel(logging.DEBUG) - try: - cmd(**kwargs, logger=logger) + cmd(**kwargs) except Exception as exc: # noqa: BLE001 - logger.debug("Exception", exc_info=exc) - logger.error(f"ERROR: {exc!s}") + print( + textwrap.dedent( + """ + === ERROR === + + Heracles crashed with an uncaught exception. + + If you suspect this is a bug, please open an issue at + https://github.com/heracles-ec/heracles/issues. + """ + ), + file=sys.stderr, + flush=True, + ) + tb = traceback.format_exception(type(exc), exc, exc.__traceback__) + print("\n".join(tb), file=sys.stderr, flush=True) return 1 else: return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/heracles/config.py b/heracles/config.py new file mode 100644 index 00000000..d5062f9e --- /dev/null +++ b/heracles/config.py @@ -0,0 +1,399 @@ +# Heracles: Euclid code for harmonic-space statistics on the sphere +# +# Copyright (C) 2023-2025 Euclid Science Ground Segment +# +# This file is part of Heracles. +# +# Heracles is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Heracles is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with Heracles. If not, see . +"""Module for processing of configuration.""" + +from __future__ import annotations + +import enum +from typing import TYPE_CHECKING, NamedTuple + +import numpy as np + +import heracles + +if TYPE_CHECKING: + from collections.abc import Mapping + from typing import Any, TypeGuard + from numpy.typing import NDArray + from heracles import Field, Mapper + + TwoPointKey = ( + tuple[()] + | tuple[str] + | tuple[str, str] + | tuple[str, str, int] + | tuple[str, str, int, int] + ) + + +def _is_list_of_str(obj: Any) -> TypeGuard[list[str]]: + """Type guard for list of strings.""" + return isinstance(obj, list) and all(isinstance(item, str) for item in obj) + + +class FieldType(enum.Enum): + positions: type[heracles.Field] = heracles.Positions + shears: type[heracles.Field] = heracles.Shears + visibility: type[heracles.Field] = heracles.Visibility + weights: type[heracles.Field] = heracles.Weights + + def __str__(self) -> str: + return self.name + + +class MapperType(enum.Enum): + none = enum.auto() + healpix = enum.auto() + discrete = enum.auto() + + def __str__(self) -> str: + return self.name + + +class BinSpacing(enum.Enum): + """Spacing for binning. + + Values are functions for the spacing and its inverse. + + """ + + linear = (lambda x: x, lambda x: x) + log = (np.log, np.exp) + log1p = (np.log1p, np.expm1) + sqrt = (np.sqrt, np.square) + + +class SelectionConfig(NamedTuple): + key: int + selection: str + visibility: str | None + + +class CatalogConfig(NamedTuple): + source: str + fields: list[str] + label: str | None + visibility: str | None + selections: list[SelectionConfig] + + +class TwoPointConfig(NamedTuple): + lmin: int | None + lmax: int | None + l2max: int | None + l3max: int | None + debias: bool + bins: NDArray[Any] | None + weights: str | None + + +class Config(NamedTuple): + catalogs: list[CatalogConfig] + fields: dict[str, Field] + spectra: dict[TwoPointKey, TwoPointConfig] + + +def _load_selection(config: Mapping[str, Any]) -> SelectionConfig: + """Load a selection configuration from a dictionary.""" + + if missing := {"key", "selection"} - config.keys(): + raise ValueError("missing option: " + ", ".join(missing)) + + if unknown := config.keys() - {"key", "selection", "visibility"}: + raise ValueError("unknown option: " + ", ".join(unknown)) + + key = config["key"] + if not isinstance(key, int): + raise ValueError(f"key: expected int, got {type(key)}") + + selection = config["selection"] + if not isinstance(selection, str): + raise ValueError(f"selection: expected str, got {type(selection)}") + + visibility = config.get("visibility") + if visibility is not None and not isinstance(visibility, str): + raise ValueError(f"visibility: expected str, got {type(visibility)}") + + return SelectionConfig( + key=key, + selection=selection, + visibility=visibility, + ) + + +def _load_catalog(config: Mapping[str, Any]) -> CatalogConfig: + """Load a catalog configuration from a dictionary.""" + # create a copy so we can pop() entries + config = dict(config) + + source = config.pop("source") + if not isinstance(source, str): + raise ValueError(f"source: expected str, got {type(source)}") + + fields = config.pop("fields") + if not _is_list_of_str(fields): + raise ValueError("fields: expected list of str") + + label = config.pop("label", None) + if label is not None and not isinstance(label, str): + raise ValueError(f"label: expected str, got {type(label)}") + + visibility = config.pop("visibility", None) + if visibility is not None and not isinstance(visibility, str): + raise ValueError(f"visibility: expected str, got {type(visibility)}") + + _selections = config.pop("selections", []) + if not isinstance(_selections, list): + raise ValueError(f"selections: expected list, got {type(_selections)}") + selections = list(map(_load_selection, _selections)) + + # check unknown options + if config: + raise ValueError("unknown options: " + ", ".join(map(repr, config))) + + return CatalogConfig( + fields=fields, + source=source, + label=label, + selections=selections, + visibility=visibility, + ) + + +def _load_mapper(config: dict[str, Any]) -> Mapper | None: + """Load a mapper from a dictionary. + + For simplicity, this consumes keys from the dictionary. + + """ + _mapper = config.pop("mapper", "healpix") + if not isinstance(_mapper, str): + raise ValueError(f"mapper: expected str, got {type(_mapper)}") + try: + mapper = MapperType[_mapper] + except KeyError: + raise ValueError(f"mapper: invalid value: {_mapper!r}") from None + + nside = config.pop("nside", None) + if nside is not None and not isinstance(nside, int): + raise ValueError(f"nside: expected int, got {type(nside)}") + + lmax = config.pop("lmax", None) + if lmax is not None and not isinstance(lmax, int): + raise ValueError(f"lmax: expected int, got {type(lmax)}") + + if mapper is MapperType.healpix: + from heracles.healpy import HealpixMapper + + if nside is None: + raise ValueError("missing option: nside") + + return HealpixMapper(nside, lmax) + + if mapper is MapperType.discrete: + from heracles.ducc import DiscreteMapper + + if lmax is None: + raise ValueError("missing option: nside") + + return DiscreteMapper(lmax) + + # mapper is "none" here + return None + + +def _load_field(config: Mapping[str, Any]) -> tuple[str, Field]: + """Load a field from a dictionary.""" + # make a copy so that we can pop items + config = dict(config) + + key = config.pop("key") + if not isinstance(key, str): + raise ValueError(f"key: expected str, got {type(key)}") + + _cls = config.pop("type") + if not isinstance(_cls, str): + raise ValueError(f"{key}: type: expected str, got {type(_cls)}") + try: + cls = FieldType[_cls].value + except KeyError: + raise ValueError(f"{key}: type: invalid value: {_cls!r}") from None + + columns = config.pop("columns", []) + if not _is_list_of_str(columns): + raise ValueError("{key}: columns: expected list of str") + + mask = config.pop("mask", None) + if mask is not None and not isinstance(mask, str): + raise ValueError(f"{key}: mask: expected str, got {type(mask)}") + + try: + mapper = _load_mapper(config) + except ValueError as exc: + raise ValueError(f"{key}: {exc!s}") from None + + # check unknown options + if config: + raise ValueError(f"{key}: unknown options: " + ", ".join(map(repr, config))) + + return key, cls(mapper, *columns, mask=mask) + + +def _load_bins( + config: dict[str, Any], + lmin: int, + lmax: int, +) -> tuple[NDArray[Any], str | None]: + """Construct angular bins from config.""" + config = dict(config) + + n = config.pop("n") + if not isinstance(n, int): + raise ValueError(f"n: expected int, got {type(n)}") + if n < 2: + raise ValueError(f"n: invalid number of bins: {n}") + + _spacing = config.pop("spacing", "linear") + if not isinstance(_spacing, str): + raise ValueError(f"spacing: expected str, got {type(_spacing)}") + try: + op, inv = BinSpacing[_spacing].value + except KeyError: + raise ValueError(f"spacing: invalid value: {_spacing!r}") from None + + weights = config.pop("weights", None) + if weights is not None and not isinstance(weights, str): + raise ValueError(f"weights: expected str, got {type(weights)}") + + bins = inv(np.linspace(op(lmin), op(lmax + 1), n + 1)) + # fix first and last array element to be exact + bins[0], bins[-1] = lmin, lmax + 1 + + return bins, weights + + +def _load_twopoint(config: Mapping[str, Any]) -> tuple[TwoPointKey, TwoPointConfig]: + """Load two-point config from a dictionary.""" + # make a copy so that we can pop items + config = dict(config) + + _key = config.pop("key") + if not isinstance(_key, (str, list)): + raise ValueError(f"key: expected str or list, got {type(_key)}") + if isinstance(_key, str): + key = heracles.key_from_str(_key) + else: + key = tuple(_key) + # stringified version for errors + _key = heracles.key_to_str(key) + + lmin = config.pop("lmin", None) + if lmin is not None and not isinstance(lmin, int): + raise ValueError(f"{_key}: lmin: expected int, got {type(lmin)}") + + lmax = config.pop("lmax", None) + if lmax is not None and not isinstance(lmax, int): + raise ValueError(f"{_key}: lmax: expected int, got {type(lmax)}") + + l2max = config.pop("l2max", None) + if l2max is not None and not isinstance(l2max, int): + raise ValueError(f"{_key}: l2max: expected int, got {type(l2max)}") + + l3max = config.pop("l3max", None) + if l3max is not None and not isinstance(l3max, int): + raise ValueError(f"{_key}: l3max: expected int, got {type(l3max)}") + + debias = config.pop("debias", True) + if not isinstance(debias, bool): + raise ValueError(f"{_key}: debias: expected bool, got {type(debias)}") + + _bins = config.pop("bins", None) + if _bins is not None: + if not isinstance(_bins, dict): + raise ValueError(f"{_key}: bins: expected dict, got {type(_bins)}") + if lmin is None or lmax is None: + raise ValueError(f"{_key}: bins: angular binning requires lmin and lmax") + try: + bins, weights = _load_bins(_bins, lmin, lmax) + except ValueError as exc: + raise ValueError(f"{_key}: bins: {exc!s}") + else: + bins, weights = None, None + + # check unknown options + if config: + raise ValueError(f"{_key}: unknown options: " + ", ".join(map(repr, config))) + + return key, TwoPointConfig( + lmin=lmin, + lmax=lmax, + l2max=l2max, + l3max=l3max, + debias=debias, + bins=bins, + weights=weights, + ) + + +def load(config: Mapping[str, Any]) -> Config: + """Load configuration from a dictionary.""" + # make a copy so that we can pop entries + config = dict(config) + + _catalogs = config.pop("catalogs", []) + if not isinstance(_catalogs, list): + raise ValueError(f"catalogs: expected list, got {type(_catalogs)}") + catalogs: list[CatalogConfig] = [] + for i, item in enumerate(_catalogs): + try: + catalog = _load_catalog(item) + except ValueError as exc: + raise ValueError(f"catalogs[{i + 1}]: {exc!s}") from None + else: + catalogs.append(catalog) + + _fields = config.pop("fields", []) + if not isinstance(_fields, list): + raise ValueError(f"fields: expected list, got {type(_fields)}") + fields: dict[str, Field] = {} + for i, item in enumerate(_fields): + try: + key, field = _load_field(item) + except ValueError as exc: + raise ValueError(f"fields[{i + 1}]: {exc!s}") from None + else: + fields[key] = field + + _spectra = config.pop("spectra", []) + if not isinstance(_spectra, list): + raise ValueError(f"spectra: expected list, got {type(_spectra)}") + spectra: dict[TwoPointKey, TwoPointConfig] = {} + for i, item in enumerate(_spectra): + try: + key2, twopoint = _load_twopoint(item) + except ValueError as exc: + raise ValueError(f"spectra[{i + 1}]: {exc!s}") from None + else: + spectra[key2] = twopoint + + return Config( + catalogs=catalogs, + fields=fields, + spectra=spectra, + ) diff --git a/heracles/core.py b/heracles/core.py index 3eec1bad..55e8d520 100644 --- a/heracles/core.py +++ b/heracles/core.py @@ -22,14 +22,55 @@ from __future__ import annotations +import re from collections import UserDict from collections.abc import Mapping, Sequence -from typing import TypeVar +from typing import TYPE_CHECKING, TypeVar import numpy as np T = TypeVar("T") +if TYPE_CHECKING: + # type for valid keys + Key = str | int | tuple["Key", ...] + + +def key_to_str(key: Key) -> str: + """ + Return string representation for a given key. + """ + # recursive expansion for sequences + if isinstance(key, Sequence) and not isinstance(key, str): + return "-".join(map(key_to_str, key)) + + # get string representation of key + s = str(key) + + # escape literal "\" + s = s.replace("\\", "\\\\") + + # escape literal "-" + s = s.replace("-", "\\-") + + # substitute non-FITS characters by tilde + s = re.sub(r"[^ -~]+", "~", s, flags=re.ASCII) + + return s + + +def key_from_str(s: str) -> Key: + """ + Return key for a given string representation. + """ + parts = re.split(r"(? 1: + return tuple(map(key_from_str, parts)) + key = parts[0] + key = key.replace("\\-", "-") + key = key.replace("\0", "\\") + return int(key) if key.removeprefix("-").isdigit() else key + def toc_match(key, include=None, exclude=None): """return whether a tocdict entry matches include/exclude criteria""" diff --git a/heracles/ducc.py b/heracles/ducc.py index 065397c0..1ce80cad 100644 --- a/heracles/ducc.py +++ b/heracles/ducc.py @@ -35,7 +35,7 @@ from collections.abc import Mapping from typing import Any - from numpy.typing import ArrayLike, DTypeLike, NDArray + from numpy.typing import DTypeLike, NDArray class DiscreteMapper: @@ -136,10 +136,7 @@ def map_values( data += alms - def transform( - self, - data: ArrayLike, - ) -> ArrayLike | tuple[ArrayLike, ArrayLike]: + def transform(self, data: NDArray[Any]) -> NDArray[Any]: """ Does nothing, since inputs are alms already. """ diff --git a/heracles/io.py b/heracles/io.py index 90044a8f..74cccf11 100644 --- a/heracles/io.py +++ b/heracles/io.py @@ -20,22 +20,17 @@ import logging import os -import re -from collections.abc import MutableMapping, Sequence +from collections.abc import MutableMapping from pathlib import Path -from typing import TYPE_CHECKING, Union from warnings import warn from weakref import WeakValueDictionary import fitsio import numpy as np -from .core import toc_match +from .core import toc_match, key_to_str, key_from_str from .result import Result -if TYPE_CHECKING: - from typing import TypeAlias - logger = logging.getLogger(__name__) @@ -67,45 +62,6 @@ "bias": "additive bias of spectrum", } -# type for valid keys -_DictKey: "TypeAlias" = Union[str, int, tuple["_DictKey", ...]] - - -def _string_from_key(key: _DictKey) -> str: - """ - Return string representation for a given key. - """ - # recursive expansion for sequences - if isinstance(key, Sequence) and not isinstance(key, str): - return "-".join(map(_string_from_key, key)) - - # get string representation of key - s = str(key) - - # escape literal "\" - s = s.replace("\\", "\\\\") - - # escape literal "-" - s = s.replace("-", "\\-") - - # substitute non-FITS characters by tilde - s = re.sub(r"[^ -~]+", "~", s, flags=re.ASCII) - - return s - - -def _key_from_string(s: str) -> _DictKey: - """ - Return key for a given string representation. - """ - parts = re.split(r"(? 1: - return tuple(map(_key_from_string, parts)) - key = parts[0] - key = key.replace("\\-", "-") - key = key.replace("\0", "\\") - return int(key) if key.removeprefix("-").isdigit() else key - def _write_metadata(hdu, metadata): """write array metadata to FITS HDU""" @@ -389,7 +345,7 @@ def write_maps(path, maps, *, clobber=False): logger.info("writing map %s", key) # extension name - ext = _string_from_key(key) + ext = key_to_str(key) # write the map in HEALPix FITS format _write_map(fits, ext, m) @@ -416,7 +372,7 @@ def read_maps(path, *, include=None, exclude=None): if not ext: continue # decode extension name into key, skip if empty - key = _key_from_string(ext) + key = key_from_str(ext) if not key: continue # match key against explicit include and exclude @@ -452,7 +408,7 @@ def write_alms(path, alms, *, clobber=False): logger.info("writing alm %s", key) # extension name - ext = _string_from_key(key) + ext = key_to_str(key) # write the alm as structured data with metadata _write_complex(fits, ext, alm) @@ -479,7 +435,7 @@ def read_alms(path, *, include=None, exclude=None): if not ext: continue # decode extension name into key, skip if empty - key = _key_from_string(ext) + key = key_from_str(ext) if not key: continue # match key against explicit include and exclude @@ -516,7 +472,7 @@ def write(path, results, *, clobber=False): logger.info("writing result %s", key) # extension name - ext = _string_from_key(key) + ext = key_to_str(key) # write the data in structured format _write_result(fits, ext, result) @@ -542,7 +498,7 @@ def read(path): ext = hdu.get_extname() if not ext: continue - key = _key_from_string(ext) + key = key_from_str(ext) if not key: continue logger.info("reading result %s", key) @@ -596,7 +552,7 @@ def __iter__(self): if not ext: continue # decode extension name into key, skip if empty - key = _key_from_string(ext) + key = key_from_str(ext) if not key: continue yield key @@ -608,13 +564,13 @@ def __len__(self): return n def __contains__(self, key): - ext = _string_from_key(key) + ext = key_to_str(key) with fitsio.FITS(self.path) as fits: return ext in fits def __getitem__(self, key): # a specific extension was requested, fetch data - ext = _string_from_key(key) + ext = key_to_str(key) data = self._cache.get(ext) if data is None: with self.fits as fits: @@ -623,7 +579,7 @@ def __getitem__(self, key): return data def __setitem__(self, key, value): - ext = _string_from_key(key) + ext = key_to_str(key) with self.fits as fits: self.writer(fits, ext, value) diff --git a/pyproject.toml b/pyproject.toml index 5e57a813..62f8fb20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "healpy", "numba", "numpy", + "tomli; python_version<'3.11'", ] description = "Harmonic-space statistics on the sphere" dynamic = [ @@ -66,6 +67,14 @@ Documentation = "https://heracles.readthedocs.io/" Homepage = "https://github.com/heracles-ec/heracles" Issues = "https://github.com/heracles-ec/heracles/issues" +[tool.coverage] +report.exclude_also = [ + "if TYPE_CHECKING:", +] +run.omit = [ + "tests/*", +] + [tool.hatch] build.hooks.vcs.version-file = "heracles/_version.py" version.source = "vcs" diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 00000000..b3454b7c --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,372 @@ +import unittest.mock +import numpy as np +import pytest + +import heracles +import heracles.config + + +def test_field_type(): + assert heracles.config.FieldType["positions"].value is heracles.Positions + assert heracles.config.FieldType["shears"].value is heracles.Shears + assert heracles.config.FieldType["visibility"].value is heracles.Visibility + assert heracles.config.FieldType["weights"].value is heracles.Weights + assert str(heracles.config.FieldType.positions) == "positions" + + +def test_mapper_type(): + assert heracles.config.MapperType["none"] + assert heracles.config.MapperType["healpix"] + assert heracles.config.MapperType["discrete"] + assert str(heracles.config.MapperType.none) == "none" + + +def test_selection_config(): + config = heracles.config._load_selection({"key": 1, "selection": "abc"}) + + assert isinstance(config, heracles.config.SelectionConfig) + assert config.key == 1 + assert config.selection == "abc" + assert config.visibility is None + + config = heracles.config._load_selection( + { + "key": 1, + "selection": "abc", + "visibility": "def", + } + ) + + assert isinstance(config, heracles.config.SelectionConfig) + assert config.key == 1 + assert config.selection == "abc" + assert config.visibility == "def" + + with pytest.raises(ValueError, match="missing option"): + heracles.config._load_selection({"key": 1}) + + with pytest.raises(ValueError, match="unknown option"): + heracles.config._load_selection({"key": 1, "selection": "abc", "bad": 0}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_selection({"key": "1", "selection": "abc"}) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_selection({"key": 1, "selection": 10}) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_selection( + {"key": 1, "selection": "abc", "visibility": 10} + ) + + +def test_catalog_config(): + config = heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + }, + ) + + assert isinstance(config, heracles.config.CatalogConfig) + assert config.source == "catalog.fits" + assert config.fields == ["A", "B"] + assert config.label is None + assert config.selections == [] + assert config.visibility is None + + # test with invalid source + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_catalog({"source": object()}) + + # test with invalid fields + + with pytest.raises(ValueError, match="expected list of str"): + heracles.config._load_catalog({"source": "s", "fields": [1, 2]}) + + # test with label + + config = heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "label": "my label", + }, + ) + + assert isinstance(config, heracles.config.CatalogConfig) + assert config.source == "catalog.fits" + assert config.fields == ["A", "B"] + assert config.label == "my label" + assert config.selections == [] + assert config.visibility is None + + # test with invalid label + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_catalog({"source": "s", "fields": [], "label": 0}) + + # test with selections + + selections = [object(), object()] + + with unittest.mock.patch("heracles.config._load_selection") as mock: + config = heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "selections": selections, + }, + ) + + assert isinstance(config, heracles.config.CatalogConfig) + assert config.source == "catalog.fits" + assert config.fields == ["A", "B"] + assert config.label is None + assert config.selections == [mock.return_value, mock.return_value] + assert config.visibility is None + + assert mock.call_args_list == [unittest.mock.call(_) for _ in selections] + + # test with invalid selections + + with pytest.raises(ValueError, match="expected list, got"): + heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "selections": object(), + }, + ) + + # test with visibility + + config = heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "visibility": "vmap.fits", + }, + ) + + assert isinstance(config, heracles.config.CatalogConfig) + assert config.source == "catalog.fits" + assert config.fields == ["A", "B"] + assert config.label is None + assert config.selections == [] + assert config.visibility == "vmap.fits" + + # test with invalid visibility + + with pytest.raises(ValueError, match="expected str, got"): + heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "visibility": object(), + }, + ) + + # test with invalid options + + with pytest.raises(ValueError, match="unknown options: 'a', 'b'"): + heracles.config._load_catalog( + { + "source": "catalog.fits", + "fields": ["A", "B"], + "a": object(), + "b": object(), + }, + ) + + +def test_load_mapper(): + with unittest.mock.patch("heracles.healpy.HealpixMapper") as mock: + heracles.config._load_mapper({"nside": 1}) + mock.assert_called_once_with(1, None) + + with unittest.mock.patch("heracles.healpy.HealpixMapper") as mock: + heracles.config._load_mapper({"mapper": "healpix", "nside": 1, "lmax": 2}) + mock.assert_called_once_with(1, 2) + + with pytest.raises(ValueError, match="missing option"): + heracles.config._load_mapper({"mapper": "healpix", "lmax": 2}) + + with unittest.mock.patch("heracles.ducc.DiscreteMapper") as mock: + heracles.config._load_mapper({"mapper": "discrete", "lmax": 1}) + mock.assert_called_once_with(1) + + with pytest.raises(ValueError, match="missing option"): + heracles.config._load_mapper({"mapper": "discrete"}) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_mapper({"mapper": None}) + + with pytest.raises(ValueError, match="invalid value"): + heracles.config._load_mapper({"mapper": "invalid"}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_mapper({"nside": "1"}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_mapper({"lmax": "1"}) + + +def test_load_field(): + key, field = heracles.config._load_field( + { + "key": "POS", + "type": "positions", + "mapper": "none", + } + ) + assert key == "POS" + assert isinstance(field, heracles.Positions) + assert field.mapper is None + assert field.columns is None + assert field.mask is None + + key, field = heracles.config._load_field( + { + "key": "POS", + "type": "positions", + "mapper": "none", + "columns": ["RA", "Dec"], + "mask": "MASK", + } + ) + assert key == "POS" + assert isinstance(field, heracles.Positions) + assert field.mapper is None + assert field.columns == ("RA", "Dec", None) + assert field.mask == "MASK" + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_field({"key": 1, "type": "positions"}) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_field({"key": "POS", "type": object()}) + + with pytest.raises(ValueError, match="invalid value"): + heracles.config._load_field({"key": "POS", "type": "invalid"}) + + with pytest.raises(ValueError, match="expected list of str"): + heracles.config._load_field({"key": "POS", "type": "positions", "columns": [1]}) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_field( + {"key": "POS", "type": "positions", "mapper": "none", "mask": object()} + ) + + with pytest.raises(ValueError, match="POS: mapper: invalid value"): + heracles.config._load_field( + {"key": "POS", "type": "positions", "mapper": "invalid"} + ) + + with pytest.raises(ValueError, match="POS: unknown options"): + heracles.config._load_field( + {"key": "POS", "type": "positions", "mapper": "none", "invalid": 0} + ) + + +def test_load_bins(): + bins, weights = heracles.config._load_bins({"n": 10}, 0, 10) + np.testing.assert_array_equal(bins, np.linspace(0, 11, 11)) + assert weights is None + + bins, weights = heracles.config._load_bins({"n": 4, "spacing": "log"}, 1, 11) + np.testing.assert_array_almost_equal_nulp(bins, np.geomspace(1, 12, 5)) + assert weights is None + + bins, weights = heracles.config._load_bins({"n": 2, "weights": "2l+1"}, 0, 2) + np.testing.assert_array_equal(bins, np.linspace(0, 3, 3)) + assert weights == "2l+1" + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_bins({"n": "a"}, 0, 1) + + with pytest.raises(ValueError, match="invalid number of bins"): + heracles.config._load_bins({"n": 1}, 0, 1) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_bins({"n": 2, "spacing": 1}, 0, 1) + + with pytest.raises(ValueError, match="invalid value"): + heracles.config._load_bins({"n": 2, "spacing": "invalid"}, 0, 1) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_bins({"n": 2, "weights": 1}, 0, 1) + + +def test_load_twopoint(): + key, twopoint = heracles.config._load_twopoint({"key": "POS-POS"}) + assert key == ("POS", "POS") + assert twopoint.lmin is None + assert twopoint.lmax is None + assert twopoint.l2max is None + assert twopoint.l3max is None + assert twopoint.debias + assert twopoint.bins is None + + key, twopoint = heracles.config._load_twopoint({"key": ["POS", "POS"]}) + assert key == ("POS", "POS") + + key, twopoint = heracles.config._load_twopoint({"key": "x", "lmin": 0}) + assert twopoint.lmin == 0 + + key, twopoint = heracles.config._load_twopoint({"key": "x", "lmax": 0}) + assert twopoint.lmax == 0 + + key, twopoint = heracles.config._load_twopoint({"key": "x", "l2max": 0}) + assert twopoint.l2max == 0 + + key, twopoint = heracles.config._load_twopoint({"key": "x", "l3max": 0}) + assert twopoint.l3max == 0 + + key, twopoint = heracles.config._load_twopoint({"key": "x", "debias": False}) + assert not twopoint.debias + + key, twopoint = heracles.config._load_twopoint( + {"key": "x", "lmin": 0, "lmax": 1, "bins": {"n": 2}} + ) + np.testing.assert_array_equal(twopoint.bins, [0, 1, 2]) + + with pytest.raises(ValueError, match="expected str"): + heracles.config._load_twopoint({"key": 0}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_twopoint({"key": "x", "lmin": "0"}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_twopoint({"key": "x", "lmax": "0"}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_twopoint({"key": "x", "l2max": "0"}) + + with pytest.raises(ValueError, match="expected int"): + heracles.config._load_twopoint({"key": "x", "l3max": "0"}) + + with pytest.raises(ValueError, match="expected bool"): + heracles.config._load_twopoint({"key": "x", "debias": 0}) + + with pytest.raises(ValueError, match="expected dict"): + heracles.config._load_twopoint({"key": "x", "bins": 0}) + + with pytest.raises(ValueError, match="requires lmin and lmax"): + heracles.config._load_twopoint({"key": "x", "bins": {}}) + + with pytest.raises(ValueError, match="bins: n: expected int"): + heracles.config._load_twopoint( + {"key": "x", "lmin": 0, "lmax": 1, "bins": {"n": "0"}} + ) + + with pytest.raises(ValueError, match="unknown options"): + heracles.config._load_twopoint({"key": "x", "unknown": 0}) + + +def test_load(): + config = heracles.config.load({}) + assert config.catalogs == [] + assert config.fields == {} + assert config.spectra == {} diff --git a/tests/test_core.py b/tests/test_core.py index be7727be..e875d22d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,6 +2,41 @@ import pytest +def test_key_to_string(): + from heracles.core import key_to_str + + assert key_to_str("a") == "a" + assert key_to_str(1) == "1" + assert key_to_str(("a",)) == "a" + assert key_to_str((1,)) == "1" + assert key_to_str(("a", 1)) == "a-1" + assert key_to_str(("a", "b", 1, 2)) == "a-b-1-2" + + # flatten nested sequences + assert key_to_str([("a", 1), "b"]) == "a-1-b" + assert key_to_str([("a", 1), ("b", (2,))]) == "a-1-b-2" + + # test special chars + assert key_to_str("a-b-c") == r"a\-b\-c" + assert key_to_str(("a\\", 1)) == r"a\\-1" + assert key_to_str(("a\\-", 1)) == r"a\\\--1" + assert key_to_str(("a\\", -1)) == r"a\\-\-1" + assert key_to_str("a€£") == "a~" + + +def test_string_to_key(): + from heracles.io import key_from_str + + assert key_from_str("a") == "a" + assert key_from_str("1") == 1 + assert key_from_str("a-1") == ("a", 1) + assert key_from_str("a-b-1-2") == ("a", "b", 1, 2) + assert key_from_str(r"a\-b\-c") == "a-b-c" + assert key_from_str(r"a\\-1") == ("a\\", 1) + assert key_from_str(r"a\\\-1") == "a\\-1" + assert key_from_str(r"a\\-\-1") == ("a\\", -1) + + def test_toc_match(): from heracles.core import toc_match diff --git a/tests/test_io.py b/tests/test_io.py index 239ab56d..f7d4fa12 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -182,41 +182,6 @@ def mock_vmap(mock_vmap_fields, nside, datadir): return filename -def test_string_from_key(): - from heracles.io import _string_from_key - - assert _string_from_key("a") == "a" - assert _string_from_key(1) == "1" - assert _string_from_key(("a",)) == "a" - assert _string_from_key((1,)) == "1" - assert _string_from_key(("a", 1)) == "a-1" - assert _string_from_key(("a", "b", 1, 2)) == "a-b-1-2" - - # flatten nested sequences - assert _string_from_key([("a", 1), "b"]) == "a-1-b" - assert _string_from_key([("a", 1), ("b", (2,))]) == "a-1-b-2" - - # test special chars - assert _string_from_key("a-b-c") == r"a\-b\-c" - assert _string_from_key(("a\\", 1)) == r"a\\-1" - assert _string_from_key(("a\\-", 1)) == r"a\\\--1" - assert _string_from_key(("a\\", -1)) == r"a\\-\-1" - assert _string_from_key("a€£") == "a~" - - -def test_key_from_string(): - from heracles.io import _key_from_string - - assert _key_from_string("a") == "a" - assert _key_from_string("1") == 1 - assert _key_from_string("a-1") == ("a", 1) - assert _key_from_string("a-b-1-2") == ("a", "b", 1, 2) - assert _key_from_string(r"a\-b\-c") == "a-b-c" - assert _key_from_string(r"a\\-1") == ("a\\", 1) - assert _key_from_string(r"a\\\-1") == "a\\-1" - assert _key_from_string(r"a\\-\-1") == ("a\\", -1) - - def test_write_read_maps(rng, tmp_path): import healpy as hp import numpy as np