-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmake_dark_file.py
More file actions
210 lines (191 loc) · 7.08 KB
/
make_dark_file.py
File metadata and controls
210 lines (191 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""Makes dark reference files."""
import sys
from datetime import UTC, datetime
from os.path import exists
from os.path import split as pathsplit
import asdf
import numpy as np
import yaml
from astropy.io import fits
from astropy.stats import sigma_clip
### Command-line inputs to this script ###
pattern = sys.argv[1] # pattern name
target = sys.argv[2] # name of first noise file, should end with _001.fits
noise_out = sys.argv[3] # noise summary file generated by solid-waffle
sca = int(sys.argv[4]) # the SCA number
outfile = sys.argv[5] # output file
### End command-line inputs ###
nside = 4096 # dimension of H4RG
# get read pattern
with open("settings_" + pattern + ".yaml") as f:
settings = yaml.safe_load(f)
use_read_pattern = []
ng = len(settings["READS"]) // 2
for j in range(ng):
use_read_pattern.append(list(range(int(settings["READS"][2 * j]), int(settings["READS"][2 * j + 1]))))
print("Read pattern:", use_read_pattern)
# read the noise files
nf = 1
noisefiles = []
# set a maximum just in case
while nf <= 500:
noisefile = target[:-8] + f"{nf:03d}.fits"
if exists(noisefile):
noisefiles.append(noisefile)
nf += 1
else:
break
del nf
Nfile = len(noisefiles)
print(noisefiles)
# get group averages
for ig in range(ng):
for j in range(Nfile):
if j % 10 == 0:
print("loading groups", ig, j)
sys.stdout.flush()
with fits.open(noisefiles[j]) as f:
if ig == 0 and j == 0:
nx = f[1].header["NAXIS1"]
ny = f[1].header["NAXIS2"]
# tempave is going to be huge! about 7 GB for 100 darks
darkave = np.zeros((ng, ny, nx), dtype=np.float32)
tempave = np.zeros((Nfile, ny, nx), dtype=np.float32)
tempave[j, :, :] = np.mean(
f[1].data[0, use_read_pattern[ig][0] : use_read_pattern[ig][-1] + 1, :, :].astype(np.float32),
axis=0,
)
darkave[ig, :, :] = np.nanmean(sigma_clip(tempave, sigma=3, axis=0, masked=False), axis=0)
del tempave # free up memory
# pull in the dark current maps
with fits.open(sys.argv[3]) as f:
index_dark1 = f[1].header["DARK1"]
index_dark1_err = f[1].header["DARK1ERR"]
index_dark2 = f[1].header["DARK2"]
index_dark2_err = f[1].header["DARK2ERR"]
# above 200 DN/s, switch to dark1
use1 = f[1].data[index_dark2, :, :nside] > 200
dark_slope = np.where(use1, f[1].data[index_dark1, :, :nside], f[1].data[index_dark2, :, :nside]).astype(
np.float32
)
dark_slope_err = np.where(
use1, f[1].data[index_dark1_err, :, :nside], f[1].data[index_dark2_err, :, :nside]
).astype(np.float32)
header = f[1].header
# now get AMP33 if it is there
try:
m_pink = float(f["AMP33"].header["M_PINK"])
ru_pink = float(f["AMP33"].header["RU_PINK"])
amp33_med = np.copy(f["AMP33"].data[0, :, :]).astype(np.float32)
amp33_std = np.copy(f["AMP33"].data[1, :, :]).astype(np.float32)
amp33 = {"valid": True, "med": amp33_med, "std": amp33_std, "M_PINK": m_pink, "RU_PINK": ru_pink}
except (KeyError, ValueError, NameError):
# placeholders
amp33 = {
"valid": False,
"med": np.zeros((4096, 128), dtype=np.float32),
"std": np.zeros((4096, 128), dtype=np.float32),
"M_PINK": 0.0,
"RU_PINK": 0.0,
}
# construct images
tree = {
"roman": {
"meta": {
"author": "make_dark_file.py",
"description": "make_dark_file.py",
"exposure": {
"groupgap": 0,
"ma_table_name": pattern,
"ma_table_number": 1000000,
"nframes": 1,
"ngroups": ng,
"p_exptype": "WFI_IMAGE|",
"type": "WFI_IMAGE",
},
"instrument": {
"detector": f"WFI{sca:02d}",
"name": "WFI",
"optical_element": "F158", # actually doesn't matter for dark
},
"origin": "PIT - romanimpreprocess",
"date": datetime.now(UTC).isoformat(),
"pedigree": "DUMMY",
"reftype": "DARK",
"telescope": "ROMAN",
"useafter": "!time/time-1.2.0 2020-01-01T00:00:00.000",
},
"data": darkave[:, :, :nside].astype(np.float32),
"dq": np.zeros((nside, nside), np.uint32),
"dark_slope": dark_slope,
"dark_slope_err": dark_slope_err,
},
"notes": {"noise_header": header.tostring()},
}
# write to a file
asdf.AsdfFile(tree).write_to(outfile)
# this stuff is just so that we can also display the images in ds9
fits.HDUList(
[
fits.PrimaryHDU(),
fits.ImageHDU(tree["roman"]["data"]),
fits.ImageHDU(tree["roman"]["dq"]),
fits.ImageHDU(tree["roman"]["dark_slope"]),
fits.ImageHDU(tree["roman"]["dark_slope_err"]),
]
).writeto(outfile[:-5] + "_asdf_data.fits", overwrite=True)
### Read noise ###
with fits.open(sys.argv[3]) as f:
index_cds = f[1].header["CDS"]
read_noise = (f[1].data[index_cds, :, :nside] / np.sqrt(2)).astype(np.float32)
index_reset = f[1].header["RESET"]
reset_noise = f[1].data[index_reset, :, :nside]
# construct read noise image
tree = {
"roman": {
"meta": {
"author": "make_dark_file.py",
"description": "make_dark_file.py",
"exposure": {
"groupgap": 0,
"ma_table_name": pattern,
"ma_table_number": 1000000,
"nframes": 1,
"ngroups": ng,
"p_exptype": "WFI_IMAGE|",
"type": "WFI_IMAGE",
},
"instrument": {
"detector": f"WFI{sca:02d}",
"name": "WFI",
"optical_element": "F158", # actually doesn't matter for read noise
},
"origin": "PIT - romanimpreprocess",
"date": datetime.now(UTC).isoformat(),
"pedigree": "DUMMY",
"reftype": "READNOISE",
"telescope": "ROMAN",
"useafter": "!time/time-1.2.0 2020-01-01T00:00:00.000",
},
"data": read_noise[:, :nside].astype(np.float32),
"resetnoise": reset_noise[:, :nside].astype(np.float32),
"anc": {
# this information won't be read by romancal,
# but some simulators may want it
"ACN": header["ACN"],
"C_PINK": header["C_PINK"],
"U_PINK": header["U_PINK"],
"UNIT": "DN", # both read noise and correlated noise information is in DN
},
"amp33": amp33, # reference output noise information
},
"notes": {"noise_header": header.tostring()},
}
# write to a file
head, tail = pathsplit(outfile)
outfile2 = head + "/" + tail.replace("_dark_", "_read_")
asdf.AsdfFile(tree).write_to(outfile2)
# this stuff is just so that we can also display the images in ds9
fits.HDUList(
[fits.PrimaryHDU(), fits.ImageHDU(tree["roman"]["data"]), fits.ImageHDU(tree["roman"]["resetnoise"])]
).writeto(outfile2[:-5] + "_asdf_data.fits", overwrite=True)