diff --git a/pyproject.toml b/pyproject.toml index b5440dc..4407a18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,10 @@ [build-system] -requires = ["setuptools>=61.0"] +requires = ["setuptools>=70.0"] build-backend = "setuptools.build_meta" [project] name = "srcsim" -version = "1.0.0" +version = "2.0.0" authors = [ { name="Ievgen Vovk", email="vovk@icrr.u-tokyo.ac.jp" }, { name="Marcel Strzys", email="strzys@icrr.u-tokyo.ac.jp" }, @@ -20,12 +20,13 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "astropy==5.1", + "astropy<6", + "gammapy>=1.1", "numpy>=1.21.0", - "pandas==1.4.1", - "scipy>=1.5.0", - "tables==3.7.0", - "PyYAML==5.3.1" + "pandas>=1.4.1", + "scipy<1.12", + "tables>=3.7.0", + "PyYAML>=5.3.1" ] [project.urls] @@ -35,3 +36,4 @@ dependencies = [ [project.scripts] getruns = "srcsim.scripts.get_runs:main" simrun = "srcsim.scripts.sim_run:main" +simdl3 = "srcsim.gpy.scripts.sim_dl3:main" diff --git a/src/srcsim/gpy/irf.py b/src/srcsim/gpy/irf.py new file mode 100644 index 0000000..23b35ab --- /dev/null +++ b/src/srcsim/gpy/irf.py @@ -0,0 +1,118 @@ +import re +import glob +import numpy as np +from dataclasses import dataclass + +from astropy.coordinates import SkyCoord +from gammapy.irf.effective_area import EffectiveAreaTable2D +from gammapy.irf.edisp.core import EnergyDispersion2D +from gammapy.irf.background import Background2D +from gammapy.irf.psf.table import PSF3D + +from gammapy.irf import load_irf_dict_from_file + + +@dataclass +class IRFSample: + altaz: SkyCoord = None + aeff: EffectiveAreaTable2D = None + edisp: EnergyDispersion2D = None + bkg: Background2D = None + psf: PSF3D = None + + def __str__(self): + desc = f"""{type(self).__name__} instance + {'AltAz':.<20s}: {self.altaz} +""" + return desc + + @classmethod + def from_fits(cls, file_name): + parsed = re.findall( + '.*node_corsika_theta_([\d\.]+)_az_([\d\.]+).*', + file_name + ) + if not len(parsed): + raise RuntimeError( + f'can not parse pointing alt/az from "{file_name}"' + ) + + try: + theta = float(parsed[0][0]) + az = float(parsed[0][1]) + altaz = SkyCoord(az, 90-theta, unit='deg', frame='altaz') + except Exception as exc: + raise RuntimeError( + f'can not convert pointing ({theta}; {az}) to SkyCoord' + ) + + irfs = load_irf_dict_from_file(file_name) + + return cls(altaz, irfs.get('aeff'), irfs.get('edisp'), irfs.get('bkg'), irfs.get('psf')) + + def to_dict(self): + data = dict( + aeff = self.aeff, + edisp = self.edisp, + bkg = self.bkg, + psf = self.psf + ) + return data + + +class IRFCollection: + def __init__(self, file_mask=None, samples=None): + self.file_mask = file_mask + + if samples is None: + self.samples = self.read_files(file_mask) + else: + self.samples = samples + + def __str__(self): + desc = f"""{type(self).__name__} instance + {'Sample alt/az':.<20s}: {SkyCoord([sample.altaz for sample in self.samples])} +""" + return desc + + @classmethod + def read_files(cls, file_mask): + file_list = glob.glob(file_mask) + + samples = tuple( + IRFSample.from_fits(file_name) + for file_name in file_list + ) + + return samples + + def get_closest(self, target_position): + altaz = SkyCoord([sample.altaz for sample in self.samples]) + separation = altaz.separation(target_position) + idx = separation.argmin() + + return IRFCollection(samples=(self.samples[idx],)) + + def get_nearby(self, target_position, search_radius): + samples = tuple( + filter( + lambda sample: sample.altaz.separation(target_position) <= search_radius, + self.samples + ) + ) + + return IRFCollection(samples=samples) + + def get_in_box(self, target_position, max_lon_offset, max_lat_offset): + centers = SkyCoord([sample.altaz for sample in self.samples]) + target_position = SkyCoord(target_position.altaz.az, target_position.altaz.alt, frame='altaz') + + lon_offset, lat_offset = centers.altaz.spherical_offsets_to(target_position.altaz) + inbox = (np.absolute(lon_offset) <= max_lon_offset) & (np.absolute(lat_offset) <= max_lat_offset) + + if sum(inbox): + samples = tuple(sample for sample, take_it in zip(self.samples, inbox) if take_it) + else: + samples = () + + return IRFCollection(samples=samples) diff --git a/src/srcsim/gpy/run/__init__.py b/src/srcsim/gpy/run/__init__.py new file mode 100644 index 0000000..dda540e --- /dev/null +++ b/src/srcsim/gpy/run/__init__.py @@ -0,0 +1,2 @@ +from .sky import SkyDataRun +from .fixed import FixedPointingDataRun \ No newline at end of file diff --git a/src/srcsim/gpy/run/base.py b/src/srcsim/gpy/run/base.py new file mode 100644 index 0000000..1fa9309 --- /dev/null +++ b/src/srcsim/gpy/run/base.py @@ -0,0 +1,286 @@ +import astropy.units as u +import numpy as np +import warnings +from astropy.time import Time +from astropy.coordinates import AltAz +from astropy.utils.metadata import MergeConflictWarning + +from gammapy.data import Observation +from gammapy.datasets import MapDataset +from gammapy.maps import MapAxis, WcsGeom +# from gammapy.datasets import MapDatasetEventSampler +# from gammapy.makers import MapDatasetMaker + +from .makers import MapDatasetMaker +from .simulate import MapDatasetEventSampler + + +class DataRun: + mode = None + + def __init__(self, tel_pos, tstart, tstop, obsloc, id=0): + """ + Creates a generic data run class instance. + + Parameters + ---------- + tel_pos: astropy.coordinates.SkyCoord + Telescope pointing position + tstart: astropy.time.Time + Run time start + tstop: astropy.time.Time + Run time stop + obsloc: astropy.coordinates.EarthLocation + Telescope location + id: int + Run ID + """ + self.id = id + self.tel_pos = tel_pos + self.obsloc = obsloc + self.tstart = tstart + self.tstop = tstop + + @classmethod + def from_config(cls, config): + """ + Create run from the specified configuration. + This method needs to be overloaded in the child classes. + + Parameters + ---------- + config: str or dict + Run configuration to use. If string, + configuration will be loaded from the YAML + file specified by "config". + + Returns + ------- + run: DataRun + Corresponding DataRun child instance + + Notes + ----- + See also self.to_dict() + """ + pass + + @property + def pointing(self): + """ + Observation pointing info. + This method needs to be overloaded in the child classes. + + Returns + ------- + info: gammapy.data.FixedPointingInfo + """ + return None + + @property + def slew_length_ra(self): + """ + Observation slew distance in RA. + Will be used to define the simulation WCS extension. + + Returns + ------- + u.Quantity + slew length in RA + """ + return 0 * u.deg + + @property + def slew_length_dec(self): + """ + Observation slew distance in Dec. + Will be used to define the simulation WCS extension. + + Returns + ------- + u.Quantity + slew length in Dec + """ + return 0 * u.deg + + @property + def tel_pos_center_icrs(self): + """ + Telescope pointing center in equatorial (ICRS) coordinates + + Returns + ------- + astropy.coordinates.SkyCoord + pointing center + """ + return None + + def to_dict(self): + """ + Converts the class definition to a configuration dict. + + Returns + ------- + dict: + Class configuration as dict + + Notes + ----- + See also self.from_config() + """ + pass + + def tel_pos_to_altaz(self, frame): + """ + Transform ICRS telescope poiting to the specified alt/az frame. + + Parameters + ---------- + frame: astropy.coordinates.AltAz + Frame to transform the telescope pointing to. + + Returns + ------- + astropy.coordinates.SkyCoord + Telescope pointing in alt/az frame + """ + pass + + def predict(self, irf_collection, model, tel_pos_tolerance=None): + """ + Creates an observation that would correspond to this run + given the specified IRFs and field of view model. + + Parameters + ---------- + irf_collection: .irf.IRFCollection + A collection to choose the appropriate IRF from + model: gammapy.modeling.models.Model + Field of view model + tel_pos_tolerance: None, list-like or u.Quantity + Position difference between IRFs and telescope pointing to observe + - None: use the closest IRF in collection + - u.Quantity: select IRFs within a circle of the corresponding radius + - list or tuple: select IRFs within the corresponding box on the sky + NOTE: only None option is presently implemented + """ + frame_tref = AltAz( + obstime=Time( + (self.tstart.mjd + self.tstop.mjd) / 2, + format='mjd' + ), + location=self.obsloc + ) + + tel_pos = self.tel_pos_to_altaz(frame_tref) + + if tel_pos_tolerance is None: + irfs = irf_collection.get_closest(tel_pos.altaz) + elif isinstance(tel_pos_tolerance, u.Quantity): + irfs = irf_collection.get_nearby(tel_pos, tel_pos_tolerance) + elif isinstance(tel_pos_tolerance, list) or isinstance(tel_pos_tolerance, tuple): + irfs = irf_collection.get_in_box( + tel_pos, + max_lon_offset=tel_pos_tolerance[0], + max_lat_offset=tel_pos_tolerance[1], + ) + else: + raise ValueError(f"Data type '{type(tel_pos_tolerance)}' for argument 'tel_pos_tolerance' is not supported") + + observation = Observation.create( + pointing=self.pointing, + location=self.obsloc, + obs_id=self.id, + tstart=self.tstart, + tstop=self.tstop, + irfs=irfs.samples[0].to_dict() + ) + observation.aeff.meta["TELESCOP"] = 'cta_north' + + energy_axis = MapAxis.from_energy_bounds( + "0.03 TeV", + "100 TeV", + nbin=10, + per_decade=True + ) + energy_axis_true = MapAxis.from_energy_bounds( + "0.01 TeV", + "300 TeV", + nbin=10, + per_decade=True, + name="energy_true" + ) + migra_axis = MapAxis.from_bounds( + 0.5, + 2, + nbin=20, + node_type="edges", + name="migra" + ) + rad_axis = MapAxis.from_bounds( + 0, + 0.4, + nbin=40, + node_type="edges", + name="rad", + unit="deg" + ) + + geom = WcsGeom.create( + skydir=self.tel_pos_center_icrs, + width=( + self.slew_length_ra.to('deg').value + 5, + self.slew_length_dec.to('deg').value + 5) + , + binsz=0.1, + frame="icrs", + axes=[energy_axis], + ) + + empty = MapDataset.create( + geom, + energy_axis_true=energy_axis_true, + migra_axis=migra_axis, + rad_axis=rad_axis, + binsz_irf=0.5*u.deg, + name="sim-dataset", + ) + + maker = MapDatasetMaker( + selection=["exposure", "background", "psf", "edisp"] + ) + dataset = maker.run(empty, observation) + + dataset.models = model + + time_step = 2*u.minute + unix_edges = np.arange(self.tstart.unix, self.tstop.unix, step=time_step.to('s').value) + if unix_edges[-1] < self.tstop.unix: + unix_edges = np.concatenate((unix_edges, [self.tstop.unix])) + tedges = Time(unix_edges, format='unix') + + for tstart, tstop in zip(tedges[:-1], tedges[1:]): + sampler = MapDatasetEventSampler(random_state=np.random.randint(1e5)) + obs = Observation.create( + pointing=self.pointing, + location=self.obsloc, + obs_id=self.id, + tstart=tstart, + tstop=tstop, + irfs=irfs.samples[0].to_dict() + ) + obs.aeff.meta["TELESCOP"] = 'cta_north' + ds = maker.run(empty, obs) + ds.models = model + events = sampler.run(ds, obs) + + if observation.events: + with warnings.catch_warnings(): + # Warnings related to mergers of TSTART, TSTOP etc + # keys are expected here + warnings.simplefilter("ignore", MergeConflictWarning) + observation.events.stack(events) + else: + observation._events = events + + return observation diff --git a/src/srcsim/gpy/run/fixed.py b/src/srcsim/gpy/run/fixed.py new file mode 100644 index 0000000..f352214 --- /dev/null +++ b/src/srcsim/gpy/run/fixed.py @@ -0,0 +1,201 @@ +import yaml +import numpy as np +import astropy.units as u +from astropy.time import Time +from astropy.coordinates import SkyCoord, EarthLocation, AltAz + +from gammapy.data import PointingMode, FixedPointingInfo + +from .base import DataRun + + +class FixedPointingDataRun(DataRun): + mode = PointingMode.DRIFT + + def __str__(self): + frame_start = AltAz(obstime=self.tstart, location=self.obsloc) + frame_stop = AltAz(obstime=self.tstop, location=self.obsloc) + tel_pos_start = SkyCoord( + self.tel_pos.az, + self.tel_pos.alt, + frame=frame_start + ) + tel_pos_stop = SkyCoord( + self.tel_pos.az, + self.tel_pos.alt, + frame=frame_stop + ) + desc = f"""{type(self).__name__} instance + {'ID':.<20s}: {self.id} + {'Tel. alt/az':.<20s}: {self.tel_pos} + {'Tstart':.<20s}: {self.tstart.isot} + {'Tstop':.<20s}: {self.tstop.isot} + {'Tel. RA':.<20s}: [{tel_pos_start.icrs.ra.to('deg'):.2f} - {tel_pos_stop.icrs.ra.to('deg'):.2f}] + {'Tel. Dec':.<20s}: [{tel_pos_start.icrs.dec.to('deg'):.2f} - {tel_pos_stop.icrs.dec.to('deg'):.2f}] +""" + return desc + + @classmethod + def from_config(cls, config): + """ + Create run from the specified configuration. + This method needs to be overloaded in the child classes. + + Parameters + ---------- + config: str or dict + Run configuration to use. If string, + configuration will be loaded from the YAML + file specified by "config". + + Returns + ------- + run: DataRun + Corresponding DataRun child instance + + Notes + ----- + See also self.to_dict() + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + dummy_obstime = Time('2000-01-01T00:00:00') + location = EarthLocation( + lat=u.Quantity(cfg['location']['lat']), + lon=u.Quantity(cfg['location']['lon']), + height=u.Quantity(cfg['location']['height']), + ) + + data_run = cls( + tel_pos = SkyCoord( + u.Quantity(cfg['pointing']['az']), + u.Quantity(cfg['pointing']['alt']), + location=location, + obstime=dummy_obstime + ), + tstart = Time(cfg['time']['start']), + tstop = Time(cfg['time']['stop']), + obsloc = location, + id = cfg['id'] if 'id' in cfg else 0, + mode = PointingMode.DRIFT + ) + + return data_run + + @property + def pointing(self): + """ + Observation pointing info. + + Returns + ------- + info: gammapy.data.FixedPointingInfo + """ + + pointing = FixedPointingInfo( + fixed_altaz=self.tel_pos, + location=self.obsloc + ) + return pointing + + @property + def slew_length_ra(self): + """ + Observation slew distance in RA corresponding to + this runs alt/az and duration. + Will be used to define the simulation WCS extension. + + Returns + ------- + u.Quantity + slew length in RA + """ + + frame_start = AltAz(obstime=self.tstart, location=self.obsloc) + frame_stop = AltAz(obstime=self.tstop, location=self.obsloc) + + tel_pos_start = SkyCoord(self.tel_pos.az, self.tel_pos.alt, frame=frame_start) + tel_pos_stop = SkyCoord(self.tel_pos.az, self.tel_pos.alt, frame=frame_stop) + + return tel_pos_stop.icrs.ra - tel_pos_start.icrs.ra + + @property + def tel_pos_center_icrs(self): + """ + Telescope pointing center in equatorial (ICRS) coordinates + + Returns + ------- + astropy.coordinates.SkyCoord + pointing center + """ + + frame_tref = AltAz( + obstime=Time( + (self.tstart.mjd + self.tstop.mjd) / 2, + format='mjd' + ), + location=self.obsloc + ) + return SkyCoord(self.tel_pos.az, self.tel_pos.alt, frame=frame_tref) + + def to_dict(self): + """ + Converts the class definition to a configuration dict. + + Returns + ------- + dict: + Class configuration as dict + + Notes + ----- + See also self.from_config() + """ + + data = {'id': self.id, 'pointing': {}, 'time': {}, 'location': {}} + + data['pointing']['alt'] = str(self.tel_pos.alt.to('deg').value) + ' deg' + data['pointing']['az'] = str(self.tel_pos.az.to('deg').value) + ' deg' + + data['time']['start'] = self.tstart.isot + data['time']['stop'] = self.tstop.isot + + data['location']['lon'] = str(self.obsloc.lon.to('deg').value) + ' deg' + data['location']['lat'] = str(self.obsloc.lat.to('deg').value) + ' deg' + data['location']['height'] = str(self.obsloc.height.to('m').to_string()) + + return data + + def tel_pos_to_altaz(self, frame): + """ + Transform ICRS telescope poiting to the specified alt/az frame. + + Parameters + ---------- + frame: astropy.coordinates.AltAz + Frame to transform the telescope pointing to. + + Returns + ------- + astropy.coordinates.SkyCoord + Telescope pointing in alt/az frame + """ + + if frame.obstime.size > 1: + tel_pos = SkyCoord( + np.repeat(self.tel_pos.az, frame.obstime.size), + np.repeat(self.tel_pos.alt, frame.obstime.size), + frame=frame + ) + else: + tel_pos = SkyCoord( + self.tel_pos.az, + self.tel_pos.alt, + frame=frame + ) + return tel_pos diff --git a/src/srcsim/gpy/run/makers.py b/src/srcsim/gpy/run/makers.py new file mode 100644 index 0000000..92bea35 --- /dev/null +++ b/src/srcsim/gpy/run/makers.py @@ -0,0 +1,177 @@ +import numpy as np +import astropy.units as u + +from astropy.time import Time + +from gammapy.makers import MapDatasetMaker +from gammapy.maps import Map, MapAxis +from gammapy.makers.utils import ( + make_map_background_irf, + make_map_exposure_true_energy, +) +from gammapy.data import PointingMode + + +class MapDatasetMaker(MapDatasetMaker): + """ + Drop-in replacement of the gammapy.makers.MapDatasetMaker class + with the support of DRIFT observations. Meant to be temporary till + the corresponding changes are not introduced into gammapy. + + Based on https://docs.gammapy.org/1.1/_modules/gammapy/makers/map.html#MapDatasetMaker + """ + + @staticmethod + def make_exposure(geom, observation, use_region_center=True): + """Make exposure map. + + Parameters + ---------- + geom : `~gammapy.maps.Geom` + Reference map geom. + observation : `~gammapy.data.Observation` + Observation container. + + Returns + ------- + exposure : `~gammapy.maps.Map` + Exposure map. + """ + if observation.pointing.mode == PointingMode.POINTING: + if isinstance(observation.aeff, Map): + return observation.aeff.interp_to_geom( + geom=geom, + ) + return make_map_exposure_true_energy( + pointing=observation.get_pointing_icrs(observation.tmid), + livetime=observation.observation_live_time_duration, + aeff=observation.aeff, + geom=geom, + use_region_center=use_region_center, + ) + + elif observation.pointing.mode == PointingMode.DRIFT: + time_step = 1*u.minute + nsteps = int( + 1 + np.ceil((observation.observation_time_duration / time_step).decompose()) + ) + + mjd_edges = np.linspace(observation.tstart.mjd, observation.tstop.mjd, num=nsteps) + mjd = Time( + (mjd_edges[1:] + mjd_edges[:-1]) / 2, + format='mjd' + ) + dmjd = (mjd_edges[1:] - mjd_edges[:-1]) + + mjd = Time(mjd, format='mjd') + dmjd = dmjd * u.d + + maps = [ + make_map_exposure_true_energy( + pointing=observation.get_pointing_icrs(tref), + livetime=livetime, + aeff=observation.aeff, + geom=geom, + use_region_center=use_region_center, + ) + for tref, livetime in zip(mjd, dmjd) + ] + + time_axis = MapAxis.from_bounds( + 0, + len(maps), + nbin=len(maps), + node_type="center", + name="time" + ) + + return Map.from_stack(maps, axis=time_axis).reduce('time', np.add) + + else: + raise RuntimeError( + f'exposure calculation for pointing mode "{observation.pointing.mode}" not implemeted yet' + ) + + def make_background(self, geom, observation): + """Make background map. + + Parameters + ---------- + geom : `~gammapy.maps.Geom` + Reference geom. + observation : `~gammapy.data.Observation` + Observation container. + + Returns + ------- + background : `~gammapy.maps.Map` + Background map. + """ + if observation.pointing.mode == PointingMode.POINTING: + bkg = observation.bkg + + if isinstance(bkg, Map): + return bkg.interp_to_geom(geom=geom, preserve_counts=True) + + use_region_center = getattr(self, "use_region_center", True) + + if self.background_interp_missing_data: + bkg.interp_missing_data(axis_name="energy") + + if self.background_pad_offset and bkg.has_offset_axis: + bkg = bkg.pad(1, mode="edge", axis_name="offset") + + return make_map_background_irf( + pointing=observation.pointing, + ontime=observation.observation_time_duration, + bkg=bkg, + geom=geom, + oversampling=self.background_oversampling, + use_region_center=use_region_center, + obstime=observation.tmid, + ) + + elif observation.pointing.mode == PointingMode.DRIFT: + use_region_center = getattr(self, "use_region_center", True) + + time_step = 1*u.minute + nsteps = int( + 1 + np.ceil((observation.observation_time_duration / time_step).decompose()) + ) + + mjd_edges = np.linspace(observation.tstart.mjd, observation.tstop.mjd, num=nsteps) + mjd = Time( + (mjd_edges[1:] + mjd_edges[:-1]) / 2, + format='mjd' + ) + dmjd = (mjd_edges[1:] - mjd_edges[:-1]) + + mjd = Time(mjd, format='mjd') + dmjd = dmjd * u.d + + maps = [ + make_map_background_irf( + pointing=observation.get_pointing_icrs(tref), + ontime=livetime, + bkg=observation.bkg, + geom=geom, + oversampling=self.background_oversampling, + use_region_center=use_region_center, + obstime=tref, + ) + for tref, livetime in zip(mjd, dmjd) + ] + + time_axis = MapAxis.from_bounds( + 0, + len(maps), + nbin=len(maps), + node_type="center", + name="time" + ) + + return Map.from_stack(maps, axis=time_axis).reduce('time', np.add) + else: + raise RuntimeError( + f'exposure calculation for pointing mode "{observation.pointing.mode}" not implemeted yet' + ) diff --git a/src/srcsim/gpy/run/simulate.py b/src/srcsim/gpy/run/simulate.py new file mode 100644 index 0000000..3a4d85c --- /dev/null +++ b/src/srcsim/gpy/run/simulate.py @@ -0,0 +1,222 @@ +import numpy as np +import gammapy + +from astropy.time import Time +from gammapy.data import EventList, observatory_locations, PointingMode +from gammapy.modeling.models import ( + ConstantTemporalModel, +) + +from gammapy.datasets import MapDatasetEventSampler + + +class MapDatasetEventSampler(MapDatasetEventSampler): + """ + Drop-in replacement of the gammapy.datasets.MapDatasetEventSampler class + with the support of DRIFT observations. Meant to be temporary till + the corresponding changes are not introduced into gammapy. + + Based on https://docs.gammapy.org/1.1/_modules/gammapy/datasets/simulate.html#MapDatasetEventSampler + """ + + def sample_sources(self, dataset): + """Sample source model components. + + Parameters + ---------- + dataset : `~gammapy.datasets.MapDataset` + Map dataset + + Returns + ------- + events : `~gammapy.data.EventList` + Event list + """ + + events_all = [] + for idx, evaluator in enumerate(dataset.evaluators.values()): + if evaluator.needs_update: + evaluator.update( + dataset.exposure, + dataset.psf, + dataset.edisp, + dataset._geom, + dataset.mask, + ) + + if evaluator.model.temporal_model is None: + temporal_model = ConstantTemporalModel() + else: + temporal_model = evaluator.model.temporal_model + + try: + if temporal_model.is_energy_dependent: + table = self._sample_coord_time_energy(dataset, evaluator.model) + else: + flux = evaluator.compute_flux() + npred = evaluator.apply_exposure(flux) + table = self._sample_coord_time(npred, temporal_model, dataset.gti) + + if len(table) == 0: + mcid = table.Column(name="MC_ID", length=0, dtype=int) + table.add_column(mcid) + + table["MC_ID"] = idx + 1 + table.meta["MID{:05d}".format(idx + 1)] = idx + 1 + table.meta["MMN{:05d}".format(idx + 1)] = evaluator.model.name + + events_all.append(EventList(table)) + except: + print(f'WARN: can not sample events for evaluator: {list(dataset.evaluators.keys())[idx]}') + + return EventList.from_stack(events_all) + + @staticmethod + def event_list_meta(dataset, observation, keep_mc_id=True): + """Event list meta info. + + Parameters + ---------- + dataset : `~gammapy.datasets.MapDataset` + Map dataset + observation : `~gammapy.data.Observation` + In memory observation + keep_mc_id : bool + Flag to tag sampled events from a given model with a Montecarlo identifier. + Default is True. If set to False, no identifier will be assigned. + + Returns + ------- + meta : dict + Meta dictionary + """ + # See: https://gamma-astro-data-formats.readthedocs.io/en/latest/events/events.html#mandatory-header-keywords # noqa: E501 + meta = {} + + meta["HDUCLAS1"] = "EVENTS" + meta["EXTNAME"] = "EVENTS" + meta[ + "HDUDOC" + ] = "https://github.com/open-gamma-ray-astro/gamma-astro-data-formats" + meta["HDUVERS"] = "0.2" + meta["HDUCLASS"] = "GADF" + + meta["OBS_ID"] = observation.obs_id + + meta["TSTART"] = (observation.tstart - dataset.gti.time_ref).to_value("s") + meta["TSTOP"] = (observation.tstop - dataset.gti.time_ref).to_value("s") + + meta["ONTIME"] = observation.observation_time_duration.to("s").value + meta["LIVETIME"] = observation.observation_live_time_duration.to("s").value + meta["DEADC"] = 1 - observation.observation_dead_time_fraction + + if observation.pointing.mode == PointingMode.POINTING: + fixed_icrs = observation.pointing.fixed_icrs + meta["RA_PNT"] = fixed_icrs.ra.deg + meta["DEC_PNT"] = fixed_icrs.dec.deg + elif observation.pointing.mode == PointingMode.DRIFT: + tref = Time( + (observation.tstart.mjd + observation.tstop.mjd) / 2, + format='mjd' + ) + fixed_icrs = observation.pointing.get_icrs(tref) + meta["RA_PNT"] = fixed_icrs.ra.deg + meta["DEC_PNT"] = fixed_icrs.dec.deg + else: + raise ValueError( + f'pointing mode {observation.pointing.mode} not supported, choices are "DRIFT" or "POINTING"' + ) + + meta["EQUINOX"] = "J2000" + meta["RADECSYS"] = "icrs" + + meta["CREATOR"] = "Gammapy {}".format(gammapy.__version__) + meta["EUNIT"] = "TeV" + meta["EVTVER"] = "" + + meta["OBSERVER"] = "Gammapy user" + meta["DSTYP1"] = "TIME" + meta["DSUNI1"] = "s" + meta["DSVAL1"] = "TABLE" + meta["DSREF1"] = ":GTI" + meta["DSTYP2"] = "ENERGY" + meta["DSUNI2"] = "TeV" + meta[ + "DSVAL2" + ] = f'{dataset._geom.axes["energy"].edges.min().value}:{dataset._geom.axes["energy"].edges.max().value}' # noqa: E501 + meta["DSTYP3"] = "POS(RA,DEC) " + + offset_max = np.max(dataset._geom.width).to_value("deg") + meta[ + "DSVAL3" + ] = f"CIRCLE({fixed_icrs.ra.deg},{fixed_icrs.dec.deg},{offset_max})" # noqa: E501 + meta["DSUNI3"] = "deg " + meta["NDSKEYS"] = " 3 " + + # get first non background model component + for model in dataset.models: + if model is not dataset.background_model: + break + else: + model = None + + if model: + meta["OBJECT"] = model.name + meta["RA_OBJ"] = model.position.icrs.ra.deg + meta["DEC_OBJ"] = model.position.icrs.dec.deg + + meta["TELAPSE"] = dataset.gti.time_sum.to("s").value + meta["MJDREFI"] = int(dataset.gti.time_ref.mjd) + meta["MJDREFF"] = dataset.gti.time_ref.mjd % 1 + meta["TIMEUNIT"] = "s" + meta["TIMESYS"] = dataset.gti.time_ref.scale + meta["TIMEREF"] = "LOCAL" + meta["DATE-OBS"] = dataset.gti.time_start.isot[0][0:10] + meta["DATE-END"] = dataset.gti.time_stop.isot[0][0:10] + meta["CONV_DEP"] = 0 + meta["CONV_RA"] = 0 + meta["CONV_DEC"] = 0 + + if keep_mc_id: + meta["NMCIDS"] = len(dataset.models) + + # Necessary for DataStore, but they should be ALT and AZ instead! + telescope = observation.aeff.meta["TELESCOP"] + instrument = observation.aeff.meta["INSTRUME"] + if telescope == "CTA": + if instrument == "Southern Array": + loc = observatory_locations["cta_south"] + elif instrument == "Northern Array": + loc = observatory_locations["cta_north"] + else: + loc = observatory_locations["cta_south"] + + else: + loc = observatory_locations[telescope.lower()] + + if observation.pointing.mode == PointingMode.POINTING: + # this is not really correct but maybe OK for now + coord_altaz = observation.pointing.get_altaz(dataset.gti.time_start, loc) + + meta["ALT_PNT"] = str(coord_altaz.alt.deg[0]) + meta["AZ_PNT"] = str(coord_altaz.az.deg[0]) + elif observation.pointing.mode == PointingMode.DRIFT: + meta["ALT_PNT"] = observation.pointing.fixed_altaz.alt.deg + meta["AZ_PNT"] = observation.pointing.fixed_altaz.az.deg + else: + raise ValueError( + f'pointing mode {observation.pointing.mode} not supported, choices are "DRIFT" or "POINTING"' + ) + + # TO DO: these keywords should be taken from the IRF of the dataset + meta["ORIGIN"] = "Gammapy" + meta["TELESCOP"] = observation.aeff.meta["TELESCOP"] + meta["INSTRUME"] = observation.aeff.meta["INSTRUME"] + meta["N_TELS"] = "" + meta["TELLIST"] = "" + + meta["CREATED"] = "" + meta["OBS_MODE"] = observation.pointing.mode.name + meta["EV_CLASS"] = "" + + return meta diff --git a/src/srcsim/gpy/run/sky.py b/src/srcsim/gpy/run/sky.py new file mode 100644 index 0000000..01f4c1c --- /dev/null +++ b/src/srcsim/gpy/run/sky.py @@ -0,0 +1,141 @@ +import yaml +import astropy.units as u +from astropy.time import Time +from astropy.coordinates import SkyCoord, EarthLocation, AltAz + +from gammapy.data import PointingMode, FixedPointingInfo + +from .base import DataRun + + +class SkyDataRun(DataRun): + mode = PointingMode.POINTING + + def __str__(self): + frame_start = AltAz(obstime=self.tstart, location=self.obsloc) + frame_stop = AltAz(obstime=self.tstop, location=self.obsloc) + desc = f"""{type(self).__name__} instance + {'ID':.<20s}: {self.id} + {'Tel. RA/Dec':.<20s}: {self.tel_pos} + {'Tstart':.<20s}: {self.tstart.isot} + {'Tstop':.<20s}: {self.tstop.isot} + {'Tel. azimuth':.<20s}: [{self.tel_pos.transform_to(frame_start).az.to('deg'):.2f} - {self.tel_pos.transform_to(frame_stop).az.to('deg'):.2f}] + {'Tel. alt':.<20s}: [{self.tel_pos.transform_to(frame_start).alt.to('deg'):.2f} - {self.tel_pos.transform_to(frame_stop).alt.to('deg'):.2f}] +""" + return desc + + @classmethod + def from_config(cls, config): + """ + Create run from the specified configuration. + This method needs to be overloaded in the child classes. + + Parameters + ---------- + config: str or dict + Run configuration to use. If string, + configuration will be loaded from the YAML + file specified by "config". + + Returns + ------- + run: DataRun + Corresponding DataRun child instance + + Notes + ----- + See also self.to_dict() + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + data_run = cls( + SkyCoord( + u.Quantity(cfg['pointing']['ra']), + u.Quantity(cfg['pointing']['dec']), + frame='icrs' + ), + Time(cfg['time']['start']), + Time(cfg['time']['stop']), + EarthLocation( + lat=u.Quantity(cfg['location']['lat']), + lon=u.Quantity(cfg['location']['lon']), + height=u.Quantity(cfg['location']['height']), + ), + cfg['id'] if 'id' in cfg else 0 + ) + + return data_run + + @property + def pointing(self): + """ + Observation pointing info. + + Returns + ------- + info: gammapy.data.FixedPointingInfo + """ + + return FixedPointingInfo(fixed_icrs=self.tel_pos) + + @property + def tel_pos_center_icrs(self): + """ + Telescope pointing center in equatorial (ICRS) coordinates + + Returns + ------- + astropy.coordinates.SkyCoord + pointing center + """ + + return self.tel_pos.icrs + + def to_dict(self): + """ + Converts the class definition to a configuration dict. + + Returns + ------- + dict: + Class configuration as dict + + Notes + ----- + See also self.from_config() + """ + + data = {'id': self.id, 'pointing': {}, 'time': {}, 'location': {}} + + data['pointing']['ra'] = str(self.tel_pos.icrs.ra.to('deg').value) + ' deg' + data['pointing']['dec'] = str(self.tel_pos.icrs.dec.to('deg').value) + ' deg' + + data['time']['start'] = self.tstart.isot + data['time']['stop'] = self.tstop.isot + + data['location']['lon'] = str(self.obsloc.lon.to('deg').value) + ' deg' + data['location']['lat'] = str(self.obsloc.lat.to('deg').value) + ' deg' + data['location']['height'] = str(self.obsloc.height.to('m').to_string()) + + return data + + def tel_pos_to_altaz(self, frame): + """ + Transform ICRS telescope poiting to the specified alt/az frame. + + Parameters + ---------- + frame: astropy.coordinates.AltAz + Frame to transform the telescope pointing to. + + Returns + ------- + astropy.coordinates.SkyCoord + Telescope pointing in alt/az frame + """ + + return self.tel_pos.transform_to(frame) diff --git a/src/srcsim/gpy/rungen/__init__.py b/src/srcsim/gpy/rungen/__init__.py new file mode 100644 index 0000000..2845ee4 --- /dev/null +++ b/src/srcsim/gpy/rungen/__init__.py @@ -0,0 +1,3 @@ +from .generator import generator +from .sky import AltAzBoxGenerator, DataMatchingGenerator +from .fixed import FixedObsGenerator \ No newline at end of file diff --git a/src/srcsim/gpy/rungen/fixed.py b/src/srcsim/gpy/rungen/fixed.py new file mode 100644 index 0000000..f9e6e06 --- /dev/null +++ b/src/srcsim/gpy/rungen/fixed.py @@ -0,0 +1,233 @@ +import yaml +import numpy as np +import astropy.units as u +from scipy.interpolate import CubicSpline +from astropy.time import Time +from astropy.coordinates import SkyCoord, AltAz, EarthLocation + +from ..run import FixedPointingDataRun +from .helpers import get_trajectory, enforce_max_interval_length + + +class FixedObsGenerator: + """ + Generator of the DRIFT sky runs with a fixed alt/az position, + scanning a certain stripe in RA axis. + """ + + @classmethod + def get_runs_from_config(cls, config): + """ + Returns runs corresponding to the specified config. + + Parameters + ---------- + config: dict + Configuration dictionary + + Returns + ------- + runs: tuple + Generated runs + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + ra_width = u.Quantity(cfg['pointing']['width']) + alt = u.Quantity(cfg['pointing']['center']['alt']) + tel_pos = SkyCoord( + u.Quantity(cfg['pointing']['center']['ra']), + u.Quantity(cfg['pointing']['center']['dec']), + frame='icrs' + ) + + tstart = Time(cfg['time']['start']) + duration = u.Quantity(cfg['time']['duration']) + accuracy = u.Quantity(cfg['time']['accuracy']) + max_run_duration = u.Quantity(cfg['time']['max_run_duration']) if cfg['time']['max_run_duration'] is not None else None + + obsloc = EarthLocation( + lat=u.Quantity(cfg['location']['lat']), + lon=u.Quantity(cfg['location']['lon']), + height=u.Quantity(cfg['location']['height']), + ) + + return cls.get_runs(obsloc, tel_pos, duration, alt, ra_width, tstart, accuracy, max_run_duration) + + @classmethod + def get_runs(cls, obsloc, tel_pos, tobs, alt, ra_width, tstart=None, accuracy=1*u.minute, max_run_duration=None): + """ + Generates runs corresponding to specified sky constraints. + + Parameters + ---------- + obsloc: EarthLocation + telescope location + tel_pos: SkyCoord + equatorial telscope pointing coordinates + tobs: u.Quantity + total observation duration + alt: u.Quantity + target telescope altitude + ra_width: u.Quantity + Observation sky patch width in RA direction. + tstart: Time + start time for generated runs + accuracy: u.Quantity + trajectory time step to use in calculations + max_run_duration: u.Quantity + maximal length of a single run to allow + + Returns + ------- + runs: tuple + Generated runs + """ + + tel_pos_trail = SkyCoord( + tel_pos.icrs.ra + ra_width / 2, + tel_pos.icrs.dec, + frame='icrs' + ) + tel_pos_lead = SkyCoord( + tel_pos.icrs.ra - ra_width / 2, + tel_pos.icrs.dec, + frame='icrs' + ) + + # First pass - first day + track_trail = get_trajectory( + tel_pos_trail.icrs, + tstart, + tstop=tstart + 1 * u.d, + time_step=accuracy, + obsloc=obsloc + ) + track_lead = get_trajectory( + tel_pos_lead.icrs, + tstart, + tstop=tstart + 1 * u.d, + time_step=accuracy, + obsloc=obsloc + ) + cs_lead = CubicSpline( + track_lead.obstime.mjd, + track_lead.alt - alt + ) + cs_trail = CubicSpline( + track_trail.obstime.mjd, + track_trail.alt - alt + ) + + tstarts = Time(cs_lead.roots(extrapolate=False), format='mjd') + tstops = Time(cs_trail.roots(extrapolate=False), format='mjd') + + # Total number of the required observation days + run_durations = tstops - tstarts + nsequences = (tobs / np.sum(run_durations)).decompose() + ndays_full = np.floor(nsequences) + ndays_total = np.ceil(nsequences) + + if ndays_total > 1: + # Next pass - to cover the simulation time interval with fully completed runs + track_lead = get_trajectory( + tel_pos_lead.icrs, + tstart, + tstop=tstart + ndays_full * u.d, + time_step=accuracy, + obsloc=obsloc + ) + track_trail = get_trajectory( + tel_pos_trail.icrs, + tstart, + tstop=tstart + ndays_full * u.d, + time_step=accuracy, + obsloc=obsloc + ) + + cs_lead = CubicSpline( + track_lead.obstime.mjd, + track_lead.alt - alt + ) + cs_trail = CubicSpline( + track_trail.obstime.mjd, + track_trail.alt - alt + ) + tstarts = Time(cs_lead.roots(extrapolate=False), format='mjd') + tstops = Time(cs_trail.roots(extrapolate=False), format='mjd') + remaining_tobs = tobs - np.sum(tstops - tstarts) + + # Next pass - additional incomplete runs + track_lead = get_trajectory( + tel_pos_lead.icrs, + tstart=tstart + ndays_full * u.d, + tstop=tstart + ndays_total * u.d, + time_step=accuracy, + obsloc=obsloc + ) + cs_lead = CubicSpline( + track_lead.obstime.mjd, + track_lead.alt - alt + ) + _tstarts = Time(cs_lead.roots(extrapolate=False), format='mjd') + _tstops = Time(_tstarts + remaining_tobs / (len(_tstarts))) + + # Likely an astropy bug (tested with v5.3.4): + # the latter does not work if len(tstarts) == len(_tstarts) + # tstarts = Time([tstarts, _tstarts]) + # tstops = Time([tstops, _tstops]) + tstarts = Time(np.concatenate([tstarts.mjd, _tstarts.mjd]), format='mjd') + tstops = Time(np.concatenate([tstops.mjd, _tstops.mjd]), format='mjd') + + else: + track_lead = get_trajectory( + tel_pos_lead.icrs, + tstart, + tstop=tstart + ndays_total * u.d, + time_step=accuracy, + obsloc=obsloc + ) + track_trail = get_trajectory( + tel_pos_trail.icrs, + tstart, + tstop=tstart + ndays_total * u.d, + time_step=accuracy, + obsloc=obsloc + ) + + cs_lead = CubicSpline( + track_lead.obstime.mjd, + track_lead.alt - alt + ) + cs_trail = CubicSpline( + track_trail.obstime.mjd, + track_trail.alt - alt + ) + tstarts = Time(cs_lead.roots(extrapolate=False), format='mjd') + tstops = Time(tstarts + tobs / (len(tstarts))) + + # Telescope positions before the time interevals will be sliced + # These should contain only two different values - those before and after culmination + _frame = AltAz( + obstime=tstarts, + location=obsloc + ) + _tel_pos_altaz = tel_pos_lead.transform_to(_frame) + + tstarts, tstops = enforce_max_interval_length(tstarts, tstops, max_run_duration) + + # Choosing the closest Alt/Az telescope position from those before interval slicing + tel_pos_altaz = [ + _tel_pos_altaz[abs(_tel_pos_altaz.obstime - tstart).argmin()] + for tstart in tstarts + ] + + runs = tuple( + FixedPointingDataRun(target_altaz, tstart, tstop, obsloc, run_id) + for run_id, (tstart, tstop, target_altaz) in enumerate(zip(tstarts, tstops, tel_pos_altaz)) + ) + + return runs \ No newline at end of file diff --git a/src/srcsim/gpy/rungen/generator.py b/src/srcsim/gpy/rungen/generator.py new file mode 100644 index 0000000..209b197 --- /dev/null +++ b/src/srcsim/gpy/rungen/generator.py @@ -0,0 +1,39 @@ +import yaml + +from .sky import AltAzBoxGenerator, DataMatchingGenerator +from .fixed import FixedObsGenerator + + +def generator(config): + """ + Returns runs automatically selecting + the generator appropriate to config. + + Parameters + ---------- + config: dict + Configuration dictionary + + Returns + ------- + runs: tuple + Generated runs + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + runs = [] + + if cfg['type'] == "altaz_box": + runs = AltAzBoxGenerator.get_runs_from_config(cfg) + elif cfg['type'] == "fixed_altaz": + runs = FixedObsGenerator.get_runs_from_config(cfg) + elif cfg['type'] == "data_matching": + runs = DataMatchingGenerator.get_runs_from_config(cfg) + else: + raise ValueError(f"Unknown run generator type '{cfg['type']}'") + + return runs \ No newline at end of file diff --git a/src/srcsim/gpy/rungen/helpers.py b/src/srcsim/gpy/rungen/helpers.py new file mode 100644 index 0000000..569a7f8 --- /dev/null +++ b/src/srcsim/gpy/rungen/helpers.py @@ -0,0 +1,194 @@ +import numpy as np +from astropy.time import Time +from astropy.coordinates import AltAz + +from gammapy.irf import load_irf_dict_from_file +from gammapy.data import Observation +from gammapy.data.event_list import EventList +from gammapy.data.gti import GTI +from gammapy.data.pointing import FixedPointingInfo + + +def get_trajectory(tel_pos, tstart, tstop, time_step, obsloc): + """ + Compute telescope alt/az trajctory. + + Parameters + ---------- + tel_pos: SkyCoord + Equatorial telscope pointing coordinates + tstart: Time + Start moment of the trajectory + tstop: Time + Stop moment of the trajectory + time_step: u.Quantity + Trajectory time step to use + obsloc: EarthLocation + Telescope location + + Returns + ------- + tel_altaz: SkyCoord + Computed alt/az trajectory + """ + + times = Time( + np.arange(tstart.unix, tstop.unix, step=time_step.to('s').value), + format='unix' + ) + frame = AltAz(obstime=times, location=obsloc) + tel_altaz = tel_pos.transform_to(frame) + + return tel_altaz + + +def enforce_max_interval_length(tstarts, tstops, max_length): + """ + Enforcing the input time intervals do not exceed + max_length duration and splits them otherwise. + + Parameters + ---------- + tstarts: Time + Start moments of the input time intervals + tstops: Time + Stop moments of the input time intervals + max_length: u.Quantity + Maximal length of a single time interval to allow + + Returns + ------- + tstarts: Time + Start moments of the enforced intervals + tstops: Time + Stop moments of the enforced intervals + """ + + tstarts_new = [] + tstops_new = [] + + for tstart, tstop in zip(tstarts, tstops): + interval_duration = tstop - tstart + + if interval_duration > max_length: + time_edges = Time( + np.arange(tstart.unix, tstop.unix, step=max_length.to('s').value), + format='unix' + ) + if tstop not in time_edges: + time_edges = Time([time_edges, tstop]) + + for tmin, tmax in zip(time_edges[:-1], time_edges[1:]): + tstarts_new.append(tmin) + tstops_new.append(tmax) + + else: + tstarts_new.append(tstart) + tstops_new.append(tstop) + + return Time(tstarts_new), Time(tstops_new) + + +def get_time_intervals(tel_altaz, altmin, altmax, azmin, azmax, time_step, max_duration=None): + """ + Compute time intervals when a given alt/az trajectory is contained within + the specified alt/az box. + + Parameters + ---------- + tel_altaz: SkyCoord + Array-like alt/az coordinates of the trajectory to use. + Should contain both coordinates and obstime. + altmin: u.Quantity + Minimal trajectory altitude to use. + altmax: u.Quantity + Maximal trajectory altitude to use. + azmin: u.Quantity + Minimal trajectory azimuth to use. + azmax: u.Quantity + Maximal trajectory azimuth to use. + time_step: u.Quantity + Minimal time interval to assume between the subsequent time intervals + max_duration: u.Quantity + Maximal duration of a single time interval to allow + + Returns + ------- + tstarts: Time + Start moments of the identified intervals + tstops: Time + Stop moments of the identified intervals + """ + + in_box = (tel_altaz.az > azmin) & (tel_altaz.az <= azmax) & (tel_altaz.alt > altmin) & (tel_altaz.alt < altmax) + nodes = np.where(np.diff(tel_altaz.obstime[in_box].unix) > time_step.to('s').value)[0] + + starts = np.concatenate(([0], nodes+1)) + + if (len(tel_altaz.obstime[in_box]) - 1) not in nodes: + stops = np.concatenate((nodes, [len(tel_altaz.obstime[in_box]) - 1])) + else: + stops = nodes + + intervals = tuple((start, stop) for start, stop in zip(starts, stops)) + + # TODO: check if sum(in_box) > 0 and len(intervals) > 0 + + tstarts = Time([tel_altaz.obstime[in_box][interval[0]] for interval in intervals]) + tstops = Time([tel_altaz.obstime[in_box][interval[1]] for interval in intervals]) + + if max_duration is not None: + tstarts, tstops = enforce_max_interval_length(tstarts, tstops, max_duration) + + return tstarts, tstops + + +def read_obs(event_file, irf_file=None): + """Create an Observation from a Event List and an (optional) IRF file. + + This is a modified version of gammapy.data.observations.Observation.read() + In gammapy 1.1, `FixedPointingInfo.from_fits_header()` [1] mistakenly + keeps ALT_PNT and AZ_PNT as `str` without converting to floats / quantities; + their further multiplication with `u.deg` leads to `astropy.units.core.Unit` type + instead of `astropy.units.quantity.Quantity` - incompatible with `AltAz()` constructor. + + Parameters + ---------- + event_file : str, Path + path to the .fits file containing the event list and the GTI + irf_file : str, Path + (optional) path to the .fits file containing the IRF components, + if not provided the IRF will be read from the event file + + Returns + ------- + observation : `~gammapy.data.Observation` + observation with the events and the irf read from the file + + References + ---------- + [1] https://docs.gammapy.org/1.1/_modules/gammapy/data/pointing.html#FixedPointingInfo + """ + from gammapy.irf.io import load_irf_dict_from_file + + events = EventList.read(event_file) + + gti = GTI.read(event_file) + + irf_file = irf_file if irf_file is not None else event_file + irf_dict = load_irf_dict_from_file(irf_file) + + obs_info = events.table.meta + + # Removing problematic keys + del obs_info['AZ_PNT'] + del obs_info['ALT_PNT'] + + return Observation( + events=events, + gti=gti, + obs_info=obs_info, + obs_id=obs_info.get("OBS_ID"), + pointing=FixedPointingInfo.from_fits_header(obs_info), + **irf_dict, + ) diff --git a/src/srcsim/gpy/rungen/sky.py b/src/srcsim/gpy/rungen/sky.py new file mode 100644 index 0000000..0fd0183 --- /dev/null +++ b/src/srcsim/gpy/rungen/sky.py @@ -0,0 +1,306 @@ +import glob +import yaml +import functools +import numpy as np +import pandas as pd +import astropy.units as u +from astropy.time import Time +from astropy.coordinates import SkyCoord, AltAz, EarthLocation + +from ..run import SkyDataRun +from .helpers import get_trajectory, get_time_intervals, read_obs, enforce_max_interval_length + + +class AltAzBoxGenerator: + """ + Generator of the sky runs within a given alt/az box + """ + + @classmethod + def get_runs_from_config(cls, config): + """ + Returns runs corresponding to the specified config. + + Parameters + ---------- + config: dict + Configuration dictionary + + Returns + ------- + runs: tuple + Generated runs + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + tel_pos = SkyCoord( + u.Quantity(cfg['pointing']['ra']), + u.Quantity(cfg['pointing']['dec']), + frame='icrs' + ) + + if cfg['pointing']['wobble'] is not None: + wobble_offset = u.Quantity(cfg['pointing']['wobble']['offset']) + wobble_start_angle = u.Quantity(cfg['pointing']['wobble']['start_angle']) + wobble_count = cfg['pointing']['wobble']['count'] + else: + wobble_offset = None + wobble_start_angle = None + wobble_count = None + + azmin = u.Quantity(cfg['box']['az']['min']) + azmax = u.Quantity(cfg['box']['az']['max']) + altmin = u.Quantity(cfg['box']['alt']['min']) + altmax = u.Quantity(cfg['box']['alt']['max']) + + tstart = Time(cfg['time']['start']) + duration = u.Quantity(cfg['time']['duration']) + accuracy = u.Quantity(cfg['time']['accuracy']) + max_run_duration = u.Quantity(cfg['time']['max_run_duration']) if cfg['time']['max_run_duration'] is not None else None + obsloc = EarthLocation( + lat=u.Quantity(cfg['location']['lat']), + lon=u.Quantity(cfg['location']['lon']), + height=u.Quantity(cfg['location']['height']), + ) + + return cls.get_runs(obsloc, tel_pos, duration, altmin, altmax, azmin, azmax, tstart, accuracy, max_run_duration, wobble_offset, wobble_start_angle, wobble_count) + + @classmethod + def get_runs(cls, obsloc, tel_pos, tobs, altmin, altmax, azmin, azmax, tstart=None, accuracy=1*u.minute, max_run_duration=None, wobble_offset=None, wobble_start_angle=None, wobble_count=None): + """ + Generates runs corresponding to specified sky constraints. + + Parameters + ---------- + obsloc: EarthLocation + telescope location + tel_pos: SkyCoord + equatorial telscope pointing coordinates + tobs: u.Quantity + total observation duration + altmin: u.Quantity + minimal telescope trajectory altitude to use. + altmax: u.Quantity + maximal telescope trajectory altitude to use. + azmin: u.Quantity + minimal telescope trajectory azimuth to use. + azmax: u.Quantity + maximal telescope trajectory azimuth to use. + tstart: Time + start time for generated runs + accuracy: u.Quantity + trajectory time step to use in calculations + max_run_duration: u.Quantity + maximal length of a single run to allow + wobble_offset: u.Quantity + wobble offset to apply + wobble_start_angle: u.Quantity + positional angle of the first wobble + wobble_count: int + number of wobbles to assume. If none, no wobbles + will be generated and runs will center at tel_pos + + Returns + ------- + runs: tuple + Generated runs + """ + + wobble_params = (wobble_offset, wobble_start_angle, wobble_count) + n_params_set = sum([par is not None for par in wobble_params]) + use_wobble = n_params_set == len(wobble_params) + + if not use_wobble and n_params_set > 0: + raise ValueError(f"must specify all or none of the ['wobble_offset', 'wobble_start_angle', 'wobble_count'] parameters") + + if tstart is None: + tstart = Time('1970-01-01') + + tel_altaz = get_trajectory( + tel_pos, + tstart, + tstop=tstart + 1 * u.d, + time_step=accuracy, + obsloc=obsloc + ) + + # Second pass with the start on the source anti-culmination + tstart = tel_altaz.obstime[tel_altaz.alt.argmin()] + tel_altaz = get_trajectory( + tel_pos, + tstart, + tstop=tstart + 1 * u.d, + time_step=accuracy, + obsloc=obsloc + ) + + tstarts, tstops = get_time_intervals(tel_altaz, altmin, altmax, azmin, azmax, accuracy) + run_durations = tstops - tstarts + + nsequences = (tobs / np.sum(run_durations)).decompose() + ndays_full = np.floor(nsequences) + ndays_total = np.ceil(nsequences) + + if ndays_full > 0: + # Third pass - to cover the simulation time interval with fully completed runs + tel_altaz = get_trajectory( + tel_pos, + tstart, + tstop=tstart + ndays_full * u.d, + time_step=accuracy, + obsloc=obsloc + ) + tstarts, tstops = get_time_intervals(tel_altaz, altmin, altmax, azmin, azmax, accuracy) + remaining_tobs = tobs - np.sum(tstops - tstarts) + + # Fourth pass - additional incomplete runs + _tel_altaz = get_trajectory( + tel_pos, + tstart=tstart + ndays_full * u.d, + tstop=tstart + ndays_total * u.d, + time_step=accuracy, + obsloc=obsloc + ) + _tstarts, _tstops = get_time_intervals(_tel_altaz, altmin, altmax, azmin, azmax, accuracy) + _tstops = Time(_tstarts + remaining_tobs / (len(_tstarts))) + + tstarts = Time([tstarts, _tstarts]) + tstops = Time([tstops, _tstops]) + + else: + tel_altaz = get_trajectory( + tel_pos, + tstart, + tstop=tstart + ndays_total * u.d, + time_step=accuracy, + obsloc=obsloc + ) + tstarts, tstops = get_time_intervals(tel_altaz, altmin, altmax, azmin, azmax, accuracy) + tstops = Time(tstarts + tobs / (len(tstarts))) + + tstarts, tstops = enforce_max_interval_length(tstarts, tstops, max_run_duration) + + if use_wobble: + pos_angles = wobble_start_angle + np.linspace(0, 2* np.pi, num=wobble_count+1)[:-1] * u.rad + + runs = tuple( + SkyDataRun( + tel_pos.directional_offset_by(pos_angles[run_id % wobble_count], wobble_offset), + tstart, + tstop, + obsloc, + run_id + ) + for run_id, (tstart, tstop) in enumerate(zip(tstarts, tstops)) + ) + + else: + runs = tuple( + SkyDataRun(tel_pos, tstart, tstop, obsloc, run_id) + for run_id, (tstart, tstop) in enumerate(zip(tstarts, tstops)) + ) + + return runs + + +class DataMatchingGenerator: + """ + Generator of the sky runs within a given alt/az box + """ + @classmethod + def get_runs_from_config(cls, config): + """ + Returns runs corresponding to the specified config. + + Parameters + ---------- + config: dict + Configuration dictionary + + Returns + ------- + runs: tuple + Generated runs + """ + + if isinstance(config, str): + cfg = yaml.load(open(config, "r"), Loader=yaml.FullLoader) + else: + cfg = config + + obsloc = EarthLocation( + lat=u.Quantity(cfg['location']['lat']), + lon=u.Quantity(cfg['location']['lon']), + height=u.Quantity(cfg['location']['height']), + ) + + return cls.get_runs(cfg['file_mask'], obsloc) + + @classmethod + def get_runs(cls, file_mask, obsloc): + """ + Generates runs corresponding to specified observations. + + Parameters + ---------- + file_mask: str + observations file mask + obsloc: EarthLocation + telescope location + + Returns + ------- + runs: tuple + Generated runs + """ + + file_list = glob.glob(file_mask) + + runs_info = functools.reduce( + lambda container, file_name: container + [cls.runs_info(file_name)], + file_list, + [] + ) + + runs = [ + SkyDataRun( + run_info['tel_pos'], + run_info['tstart'], + run_info['tstop'], + obsloc, + run_info['obs_id'], + ) + for run_info in runs_info + ] + + return runs + + @classmethod + def runs_info(cls, file_name): + """ + Read the data runs info. + + Parameters + ---------- + file_name: str + DL3 observations data run file name + + Returns + ------- + run_info: dict + Dictionary with the run ID, ICRS pointing coordinates and start/stop times. + """ + obs = read_obs(file_name) + + run_info = dict( + obs_id = obs.obs_id, + tel_pos = obs.pointing.fixed_icrs, + tstart = obs.tstart, + tstop = obs.tstop, + ) + + return run_info \ No newline at end of file diff --git a/src/srcsim/gpy/scripts/sim_dl3.py b/src/srcsim/gpy/scripts/sim_dl3.py new file mode 100644 index 0000000..0b1fec1 --- /dev/null +++ b/src/srcsim/gpy/scripts/sim_dl3.py @@ -0,0 +1,96 @@ +import os +import gc +import yaml +import datetime +import argparse +import random +import pandas as pd +import astropy.units as u + +from progressbar import ProgressBar +from gammapy.modeling.models import Models + +from srcsim.gpy.irf import IRFCollection +from srcsim.gpy.rungen import generator + + +def info_message(text): + """ + This function prints the specified text with the prefix of the current date + + Parameters + ---------- + text: str + + Returns + ------- + None + + """ + + date_str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + print("{date:s}: {message:s}".format(date=date_str, message=text)) + + +def main(): + arg_parser = argparse.ArgumentParser( + description=""" + LST event simulator. + """ + ) + + arg_parser.add_argument( + "--config", + default="config.yaml", + help='Configuration file to steer the code execution.' + ) + arg_parser.add_argument( + "--id", + default=-1, + type=int, + help='Obs ID to simulate' + ) + args = arg_parser.parse_args() + + cfg = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader) + + info_message('Loading IRFs') + irfs = IRFCollection( + cfg['irf']['files'] + ) + print(irfs) + + info_message('Preparing sources') + source_models = Models.from_dict(cfg['model']) + print(source_models) + + info_message('Preparing the data runs') + runs = generator(cfg['rungen']) + info_message(f'{len(runs)} runs generated') + + if args.id >= 0: + info_message(f'Will proccess only run {args.id}') + runs = runs[args.id:args.id+1] + + info_message('Starting simulation') + + with ProgressBar(max_value=len(runs), prefix="simulation: ") as progress: + for ri, run in enumerate(runs): + obs = run.predict( + irfs, + source_models, + cfg['irf']['search_radius'] + ) + obs.write( + os.path.join(cfg['io']['out'], f'run{obs.obs_id}.fits'), + overwrite=True + ) + del obs + gc.collect() + progress.update(ri) + + info_message('Simulation complete') + + +if __name__ == '__main__': + main()