|
2 | 2 |
|
3 | 3 | import os |
4 | 4 |
|
5 | | -import astropy.units as u |
6 | | -import numpy as np |
7 | 5 | from astropy.io import fits |
8 | 6 | from astropy.nddata import StdDevUncertainty |
9 | 7 | from astropy.units import Unit |
10 | | -from astropy.wcs import WCS |
11 | 8 | from specutils import Spectrum |
12 | | -from specutils.io.parsing_utils import read_fileobj_or_hdulist |
13 | 9 | from specutils.io.registers import data_loader |
14 | 10 |
|
15 | 11 | # pylint: disable=no-member, unused-argument |
@@ -75,112 +71,6 @@ def spex_prism_loader(filename, **kwargs): |
75 | 71 | return Spectrum(flux=data, spectral_axis=wave, uncertainty=uncertainty, meta=meta) |
76 | 72 |
|
77 | 73 |
|
78 | | -def identify_wcs1d_multispec(origin, *args, **kwargs): |
79 | | - """ |
80 | | - Identifier for WCS1D multispec |
81 | | - """ |
82 | | - hdu = kwargs.get("hdu", 0) |
83 | | - |
84 | | - # Check if number of axes is one and dimension of WCS is greater than one |
85 | | - with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: |
86 | | - return ( |
87 | | - hdulist[hdu].header.get("WCSDIM", 1) > 1 |
88 | | - and hdulist[hdu].header["NAXIS"] > 1 |
89 | | - and "WAT0_001" in hdulist[hdu].header |
90 | | - and hdulist[hdu].header.get("WCSDIM", 1) == hdulist[hdu].header["NAXIS"] |
91 | | - and "LINEAR" in hdulist[hdu].header.get("CTYPE1", "") |
92 | | - ) |
93 | | - |
94 | | - |
95 | | -@data_loader("wcs1d-multispec", identifier=identify_wcs1d_multispec, extensions=["fits"], dtype=Spectrum, priority=10) |
96 | | -def wcs1d_multispec_loader(file_obj, flux_unit=None, hdu=0, verbose=False, **kwargs): |
97 | | - """ |
98 | | - Loader for multiextension spectra as wcs1d. Adapted from wcs1d_fits_loader |
99 | | -
|
100 | | - Parameters |
101 | | - ---------- |
102 | | - file_obj : str, file-like or HDUList |
103 | | - FITS file name, object (provided from name by Astropy I/O Registry), |
104 | | - or HDUList (as resulting from astropy.io.fits.open()). |
105 | | - flux_unit : :class:`~astropy.units.Unit` or str, optional |
106 | | - Units of the flux for this spectrum. If not given (or None), the unit |
107 | | - will be inferred from the BUNIT keyword in the header. Note that this |
108 | | - unit will attempt to convert from BUNIT if BUNIT is present. |
109 | | - hdu : int |
110 | | - The index of the HDU to load into this spectrum. |
111 | | - verbose : bool |
112 | | - Print extra info. |
113 | | - **kwargs |
114 | | - Extra keywords for :func:`~specutils.io.parsing_utils.read_fileobj_or_hdulist`. |
115 | | -
|
116 | | - Returns |
117 | | - ------- |
118 | | - :class:`~specutils.Spectrum` |
119 | | - """ |
120 | | - |
121 | | - with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: |
122 | | - header = hdulist[hdu].header |
123 | | - wcs = WCS(header) |
124 | | - |
125 | | - # Load data, convert units if BUNIT and flux_unit is provided and not the same |
126 | | - if "BUNIT" in header: |
127 | | - data = u.Quantity(hdulist[hdu].data, unit=header["BUNIT"]) |
128 | | - if u.A in data.unit.bases: |
129 | | - data = data * u.A / u.AA # convert ampere to Angroms |
130 | | - if flux_unit is not None: |
131 | | - data = data.to(flux_unit) |
132 | | - else: |
133 | | - data = u.Quantity(hdulist[hdu].data, unit=flux_unit) |
134 | | - |
135 | | - if wcs.wcs.cunit[0] == "" and "WAT1_001" in header: |
136 | | - # Try to extract from IRAF-style card or use Angstrom as default. |
137 | | - wat_dict = dict((rec.split("=") for rec in header["WAT1_001"].split())) |
138 | | - unit = wat_dict.get("units", "Angstrom") |
139 | | - if hasattr(u, unit): |
140 | | - wcs.wcs.cunit[0] = unit |
141 | | - else: # try with unit name stripped of excess plural 's'... |
142 | | - wcs.wcs.cunit[0] = unit.rstrip("s") |
143 | | - if verbose: |
144 | | - print(f"Extracted spectral axis unit '{unit}' from 'WAT1_001'") |
145 | | - elif wcs.wcs.cunit[0] == "": |
146 | | - wcs.wcs.cunit[0] = "Angstrom" |
147 | | - |
148 | | - # Compatibility attribute for lookup_table (gwcs) WCS |
149 | | - wcs.unit = tuple(wcs.wcs.cunit) |
150 | | - |
151 | | - # Identify the correct parts of the data to store |
152 | | - if len(data.shape) > 1: |
153 | | - flux_data = data[0] |
154 | | - else: |
155 | | - flux_data = data |
156 | | - uncertainty = None |
157 | | - if "NAXIS3" in header: |
158 | | - for i in range(header["NAXIS3"]): |
159 | | - if "spectrum" in header.get(f"BANDID{i+1}", ""): |
160 | | - flux_data = data[i] |
161 | | - if "sigma" in header.get(f"BANDID{i+1}", ""): |
162 | | - uncertainty = data[i] |
163 | | - |
164 | | - # Reshape arrays if needed |
165 | | - if len(flux_data) == 1 and len(flux_data.shape) > 1: |
166 | | - flux_data = np.reshape(flux_data, -1) |
167 | | - if uncertainty is not None: |
168 | | - uncertainty = np.reshape(uncertainty, -1) |
169 | | - |
170 | | - # Convert uncertainty to StdDevUncertainty array |
171 | | - if uncertainty is not None: |
172 | | - uncertainty = StdDevUncertainty(uncertainty) |
173 | | - |
174 | | - # Manually generate spectral axis |
175 | | - pixels = [[i] + [0] * (wcs.naxis - 1) for i in range(wcs.pixel_shape[0])] |
176 | | - spectral_axis = [i[0] for i in wcs.all_pix2world(pixels, 0)] * wcs.wcs.cunit[0] |
177 | | - |
178 | | - # Store header as metadata information |
179 | | - meta = {"header": header} |
180 | | - |
181 | | - return Spectrum(flux=flux_data, spectral_axis=spectral_axis, uncertainty=uncertainty, meta=meta) |
182 | | - |
183 | | - |
184 | 74 | def load_spectrum(filename: str, spectra_format: str = None, raise_error: bool = False): |
185 | 75 | """Attempt to load the filename as a spectrum object |
186 | 76 |
|
|
0 commit comments