Skip to content

Commit 9f90bc7

Browse files
committed
Mimic parallel read in unstable
1 parent 13587ee commit 9f90bc7

File tree

1 file changed

+127
-117
lines changed

1 file changed

+127
-117
lines changed

src/pproc/thermal_indices.py

Lines changed: 127 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -34,125 +34,13 @@
3434
)
3535
from pproc.thermo import helpers
3636
from pproc.thermo.indices import ComputeIndices
37-
from pproc.thermo.wrappers import ArrayFieldList
37+
from pproc.thermo.wrappers import ArrayFieldList, GribMetadata
3838

3939
logging.getLogger("pproc").setLevel(os.environ.get("PPROC_LOGLEVEL", "INFO").upper())
4040
logging.basicConfig(format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
4141
logger = logging.getLogger(__name__)
4242

43-
44-
@metered("Process step", out=logger.info)
45-
def process_step(args, config, window_id, fields, accum: Accumulator, accum_metadata: list[GribFieldMetadata], recovery: Recovery,
46-
):
47-
fields = load_input("inst", config, step)
48-
if len(accum["step"].coords) > 1:
49-
# Set step range for de-accumulated fields
50-
step_range = "-".join(map(str, accum["step"].coords))
51-
accum_fields = earthkit.data.FieldList.from_array(
52-
accum.values,
53-
[x.override(stepType="diff", stepRange=step_range) for x in accum_metadata],
54-
)
55-
helpers.write(config.outputs.accum, accum_fields)
56-
fields += accum_fields
57-
58-
helpers.check_field_sizes(fields)
59-
basetime, validtime = helpers.get_datetime(fields)
60-
step = helpers.get_step(fields)
61-
time = basetime.hour
62-
63-
logger.info(
64-
f"Compute indices step {step}, validtime {validtime.isoformat()} - "
65-
+ f"basetime {basetime.date().isoformat()}, time {time}"
66-
)
67-
logger.debug(f"Inputs \n {fields.ls(namespace='mars')}")
68-
indices = ComputeIndices(config.out_keys)
69-
params = fields.indices()["param"]
70-
71-
# Windspeed - shortName ws
72-
if config.is_target_param(step, {"10si", 207}):
73-
ws = indices.calc_field("10si", indices.calc_ws, fields)
74-
config.write(step, ws)
75-
76-
# Cosine of Solar Zenith Angle - shortName cossza - ECMWF product
77-
# TODO: 214001 only exists for GRIB1 -- but here we use it for GRIB2 (waiting for WMO)
78-
if config.is_target_param(step, {"cossza", 214001}):
79-
cossza = indices.calc_field("cossza", indices.calc_cossza_int, fields)
80-
config.write(step, cossza)
81-
82-
# direct solar radiation - shortName dsrp - ECMWF product
83-
if config.is_target_param(step, {"dsrp", 47}) and "dsrp" not in params:
84-
dsrp = indices.calc_field("dsrp", indices.calc_dsrp, fields)
85-
config.write(step, dsrp)
86-
87-
# Mean Radiant Temperature - shortName mrt - ECMWF product
88-
if config.is_target_param(step, {"mrt", 261002}):
89-
mrt = indices.calc_field("mrt", indices.calc_mrt, fields)
90-
config.write(step, mrt)
91-
92-
# Univeral Thermal Climate Index - shortName utci - ECMWF product
93-
if config.is_target_param(step, {"utci", 261001}):
94-
utci = indices.calc_field(
95-
"utci",
96-
indices.calc_utci,
97-
fields,
98-
print_misses=args.utci_misses,
99-
validate=args.validateutci,
100-
)
101-
config.write(step, utci)
102-
103-
# Heat Index (adjusted) - shortName heatx - ECMWF product
104-
if config.is_target_param(step, {"heatx", 260004}):
105-
heatx = indices.calc_field("heatx", indices.calc_heatx, fields)
106-
config.write(step, heatx)
107-
108-
# Wind Chill factor - shortName wcf - ECMWF product
109-
if config.is_target_param(step, {"wcf", 260005}):
110-
wcf = indices.calc_field("wcf", indices.calc_wcf, fields)
111-
config.write(step, wcf)
112-
113-
# Apparent Temperature - shortName aptmp - ECMWF product
114-
if config.is_target_param(step, {"aptmp", 260255}):
115-
aptmp = indices.calc_field("aptmp", indices.calc_aptmp, fields)
116-
config.write(step, aptmp)
117-
118-
# Relative humidity percent at 2m - shortName 2r - ECMWF product
119-
if config.is_target_param(step, {"2r", 260242}):
120-
rhp = indices.calc_field("2r", indices.calc_rhp, fields)
121-
config.write(step, rhp)
122-
123-
# Humidex - shortName hmdx
124-
if config.is_target_param(step, {"hmdx", 261016}):
125-
hmdx = indices.calc_field("hmdx", indices.calc_hmdx, fields)
126-
config.write(step, hmdx)
127-
128-
# Normal Effective Temperature - shortName nefft
129-
if config.is_target_param(step, {"nefft", 261018}):
130-
nefft = indices.calc_field("nefft", indices.calc_nefft, fields)
131-
config.write(step, nefft)
132-
133-
# Globe Temperature - shortName gt
134-
if config.is_target_param(step, {"gt", 261015}):
135-
gt = indices.calc_field("gt", indices.calc_gt, fields)
136-
config.write(step, gt)
137-
138-
# Wet-bulb potential temperature - shortName wbpt
139-
if config.is_target_param(step, {"wbpt", 261022}):
140-
wbpt = indices.calc_field("wbpt", indices.calc_wbpt, fields)
141-
config.write(step, wbpt)
142-
143-
# Wet Bulb Globe Temperature - shortName wbgt
144-
if config.is_target_param(step, {"wbgt", 261014}): #
145-
wbgt = indices.calc_field("wbgt", indices.calc_wbgt, fields)
146-
config.write(step, wbgt)
147-
148-
# effective temperature 261017
149-
# standard effective temperature 261019
150-
151-
config.flush_targets()
152-
recovery.add_checkpoint(window_id)
153-
154-
if args.usage:
155-
print_usage()
43+
__version__ = "2.0.0"
15644

15745

15846
class ThermoTarget:
@@ -254,13 +142,13 @@ def flush_targets(self):
254142

255143

256144
def load_input(source: str, config: ThermoConfig, step: int):
257-
logger.debug(f"Retrieve step {step}: source {source}")
258145
req = config.sources.get(source, {}).copy()
259146
if len(req) == 0:
260147
return None
261148
req.update(config.override_input)
262149
req["step"] = [step]
263150

151+
logger.debug(f"Retrieve step {step}: source {source}")
264152
src = req.pop("source")
265153
if ":" in src:
266154
src, loc = src.split(":")
@@ -281,6 +169,127 @@ def load_input(source: str, config: ThermoConfig, step: int):
281169
return earthkit.data.FieldList.from_array(ds.values, ds.metadata())
282170

283171

172+
@metered("Process step", out=logger.info)
173+
def process_step(
174+
args,
175+
config,
176+
window_id,
177+
step,
178+
accum,
179+
accum_metadata,
180+
recovery,
181+
):
182+
fields = load_input("inst", config, step)
183+
if len(accum["step"].coords) > 1:
184+
# Set step range for de-accumulated fields
185+
step_range = "-".join(map(str, accum["step"].coords))
186+
accum_fields = earthkit.data.FieldList.from_array(
187+
accum.values,
188+
[x.override(stepType="diff", stepRange=step_range) for x in accum_metadata],
189+
)
190+
config.write(step, accum_fields)
191+
fields += accum_fields
192+
193+
helpers.check_field_sizes(fields)
194+
basetime, validtime = helpers.get_datetime(fields)
195+
step = helpers.get_step(fields)
196+
time = basetime.hour
197+
198+
logger.info(
199+
f"Compute indices step {step}, validtime {validtime.isoformat()} - "
200+
+ f"basetime {basetime.date().isoformat()}, time {time}"
201+
)
202+
logger.debug(f"Inputs \n {fields.ls(namespace='mars')}")
203+
indices = ComputeIndices(config.out_keys)
204+
params = fields.indices()["param"]
205+
206+
# Windspeed - shortName ws
207+
if config.is_target_param(step, {"10si", 207}):
208+
ws = indices.calc_field("10si", indices.calc_ws, fields)
209+
config.write(step, ws)
210+
211+
# Cosine of Solar Zenith Angle - shortName cossza - ECMWF product
212+
# TODO: 214001 only exists for GRIB1 -- but here we use it for GRIB2 (waiting for WMO)
213+
if config.is_target_param(step, {"cossza", 214001}):
214+
cossza = indices.calc_field("cossza", indices.calc_cossza_int, fields)
215+
config.write(step, cossza)
216+
217+
# direct solar radiation - shortName dsrp - ECMWF product
218+
if config.is_target_param(step, {"dsrp", 47}) and "dsrp" not in params:
219+
dsrp = indices.calc_field("dsrp", indices.calc_dsrp, fields)
220+
config.write(step, dsrp)
221+
222+
# Mean Radiant Temperature - shortName mrt - ECMWF product
223+
if config.is_target_param(step, {"mrt", 261002}):
224+
mrt = indices.calc_field("mrt", indices.calc_mrt, fields)
225+
config.write(step, mrt)
226+
227+
# Univeral Thermal Climate Index - shortName utci - ECMWF product
228+
if config.is_target_param(step, {"utci", 261001}):
229+
utci = indices.calc_field(
230+
"utci",
231+
indices.calc_utci,
232+
fields,
233+
print_misses=args.utci_misses,
234+
validate=args.validateutci,
235+
)
236+
config.write(step, utci)
237+
238+
# Heat Index (adjusted) - shortName heatx - ECMWF product
239+
if config.is_target_param(step, {"heatx", 260004}):
240+
heatx = indices.calc_field("heatx", indices.calc_heatx, fields)
241+
config.write(step, heatx)
242+
243+
# Wind Chill factor - shortName wcf - ECMWF product
244+
if config.is_target_param(step, {"wcf", 260005}):
245+
wcf = indices.calc_field("wcf", indices.calc_wcf, fields)
246+
config.write(step, wcf)
247+
248+
# Apparent Temperature - shortName aptmp - ECMWF product
249+
if config.is_target_param(step, {"aptmp", 260255}):
250+
aptmp = indices.calc_field("aptmp", indices.calc_aptmp, fields)
251+
config.write(step, aptmp)
252+
253+
# Relative humidity percent at 2m - shortName 2r - ECMWF product
254+
if config.is_target_param(step, {"2r", 260242}):
255+
rhp = indices.calc_field("2r", indices.calc_rhp, fields)
256+
config.write(step, rhp)
257+
258+
# Humidex - shortName hmdx
259+
if config.is_target_param(step, {"hmdx", 261016}):
260+
hmdx = indices.calc_field("hmdx", indices.calc_hmdx, fields)
261+
config.write(step, hmdx)
262+
263+
# Normal Effective Temperature - shortName nefft
264+
if config.is_target_param(step, {"nefft", 261018}):
265+
nefft = indices.calc_field("nefft", indices.calc_nefft, fields)
266+
config.write(step, nefft)
267+
268+
# Globe Temperature - shortName gt
269+
if config.is_target_param(step, {"gt", 261015}):
270+
gt = indices.calc_field("gt", indices.calc_gt, fields)
271+
config.write(step, gt)
272+
273+
# Wet-bulb potential temperature - shortName wbpt
274+
if config.is_target_param(step, {"wbpt", 261022}):
275+
wbpt = indices.calc_field("wbpt", indices.calc_wbpt, fields)
276+
config.write(step, wbpt)
277+
278+
# Wet Bulb Globe Temperature - shortName wbgt
279+
if config.is_target_param(step, {"wbgt", 261014}): #
280+
wbgt = indices.calc_field("wbgt", indices.calc_wbgt, fields)
281+
config.write(step, wbgt)
282+
283+
# effective temperature 261017
284+
# standard effective temperature 261019
285+
286+
config.flush_targets()
287+
recovery.add_checkpoint(window_id)
288+
289+
if args.usage:
290+
print_usage()
291+
292+
284293
def get_parser():
285294
parser = default_parser("Compute thermal indices")
286295

@@ -385,8 +394,9 @@ def main(args: List[str] = sys.argv[1:]):
385394
executor.submit(
386395
thermo_partial,
387396
window_id,
388-
accum,
389-
accum_data.metadata(),
397+
step,
398+
accum,
399+
[GribMetadata(x._handle) for x in accum_data.metadata()],
390400
)
391401
executor.wait()
392402
recovery.clean_file()

0 commit comments

Comments
 (0)