1+ import xarray as xr
2+ import numpy as np
3+ import fsspec
4+ import logging
5+ import datetime
6+ import os
7+
8+ logger = logging .getLogger (__name__ )
9+ logging .basicConfig (level = logging .INFO )
10+
11+ FS = fsspec .filesystem ("gs" , token = os .getenv ('GOOGLE_APPLICATION_CREDENTIALS' )) # this should give access to impactlab GCS
12+ LOCAL_FS = fsspec .filesystem ("file" )
13+ FPS_IN_PATTERN = (
14+ "gs://impactlab-data/climate/source_data/ERA-5/{VARIABLE_ID}/daily/netcdf/v1.1/*.nc"
15+ )
16+ OUT_FP_PATTERN = "gs://impactlab-data/climate/source_data/ERA-5/{VARIABLE_ID}/daily/netcdf/v1.1/all_years_concatenated.zarr"
17+ VAR_PARSER = {
18+ "pr" : "precip_total" ,
19+ "tasmin" : "tmin" ,
20+ "tasmax" : "tmax" ,
21+ }
22+
23+
24+ def get_time (fps , fs = FS ):
25+ """
26+ get number of days in concat dataset
27+ """
28+
29+ time_info = {}
30+ for fp in fps :
31+ logger .info (
32+ f"{ str (datetime .datetime .now ())} : Retrieving time info for file { fp } "
33+ )
34+ try :
35+ with xr .open_dataset (fs .open (fp )) as ds :
36+ time_info [fp ] = ds ["time" ].values
37+ except ValueError as ve :
38+ logger .error (f"encountered an error reading file { fp } and getting time info, skipping. Error message : { ve } " )
39+ logger .info (
40+ f"{ str (datetime .datetime .now ())} : Retrieved dictionary of time info"
41+ )
42+ return time_info
43+
44+
45+ def get_lat_lon (fp , fs = FS ):
46+ """
47+ get lat lon values for concat dataset
48+ """
49+ logger .info (f"{ str (datetime .datetime .now ())} : Retrieving lat lon" )
50+ with xr .open_dataset (fs .open (fp )) as ds :
51+ lat , lon = ds ["latitude" ].values , ds ["longitude" ].values
52+ logger .info (f"{ str (datetime .datetime .now ())} : Retrieved lat lon" )
53+ return lat , lon
54+
55+
56+ def prime_zarr (
57+ fp_out ,
58+ time_info ,
59+ lat = [- 1 , 1 ],
60+ lon = [- 1 , 1 ],
61+ variable = "fake_variable" ,
62+ fs = FS ,
63+ ):
64+ """
65+ filling a time, lat, lon zarr storage with zeros
66+ """
67+ logger .info (f"{ str (datetime .datetime .now ())} : Priming zarr : { fp_out } " )
68+
69+ time = np .concatenate (list (time_info .values ()))
70+ # time = xr.cftime_range(
71+ # start=start_time, freq="D", periods=len_time, calendar="standard"
72+ # ) that was before
73+ x = np .zeros ((len (time ), len (lat ), len (lon )))
74+ da = xr .DataArray (
75+ data = x ,
76+ dims = ["time" , "lat" , "lon" ],
77+ coords = {"time" : time , "lon" : lon , "lat" : lat },
78+ )
79+ out = xr .Dataset ({variable : da })
80+
81+ out .to_zarr (fs .get_mapper (fp_out ), mode = "w" , compute = False , consolidated = True , safe_chunks = False )
82+
83+ logger .info (f"{ str (datetime .datetime .now ())} : Done priming zarr : { fp_out } " )
84+ # with fs.open(fp_out) as outfile:
85+ # out.to_zarr(outfile, mode="w", compute=True)
86+
87+
88+ def write_period (fp_in , fp_out , region , variable , fs = FS ):
89+
90+ """
91+ write some data to a zarr's given time period
92+ """
93+
94+ logger .info (
95+ f"{ str (datetime .datetime .now ())} : Writing data from { fp_in } to { fp_out } "
96+ )
97+
98+ with xr .open_dataset (fs .open (fp_in )) as ds_in :
99+ # We need to drop all variables not sliced by the selected zarr_region.
100+ ds_in = ds_in .rename (
101+ {
102+ "longitude" : "lon" ,
103+ "latitude" : "lat" ,
104+ VAR_PARSER [variable ]: variable ,
105+ }
106+ )
107+ variables_to_drop = []
108+ region_variables = list (region .keys ())
109+ for variable_name , variable in ds_in .variables .items ():
110+ if any (
111+ region_variable not in variable .dims
112+ for region_variable in region_variables
113+ ):
114+ variables_to_drop .append (variable_name )
115+
116+ to_write = ds_in .drop_vars (variables_to_drop )
117+ logger .info (
118+ f"{ str (datetime .datetime .now ())} : Done converting var names and dropping irrelevant variables"
119+ )
120+ logger .info (
121+ f"{ str (datetime .datetime .now ())} :Writing to zarr { fp_out } in regions { region } "
122+ )
123+ to_write .to_zarr (fs .get_mapper (fp_out ), mode = "a" , region = region )
124+ logger .info (
125+ f"{ str (datetime .datetime .now ())} : Done writing data from { fp_in } to { fp_out } "
126+ )
127+
128+
129+ def prime_and_write_periods (fp_out , variable , light , fs = FS ):
130+
131+ # priming
132+ fps_in = fs .glob (FPS_IN_PATTERN .format (VARIABLE_ID = variable ))
133+ if light :
134+ fps_in = fps_in [1 :6 ]
135+ time_arrays = get_time (fps_in )
136+ lat , lon = get_lat_lon (fps_in [0 ])
137+ prime_zarr (fp_out , time_arrays , lat , lon , variable )
138+
139+ logger .info (
140+ f"{ str (datetime .datetime .now ())} : Starting to loop over files, length of loop is { len (fps_in )} "
141+ )
142+
143+ # writing periods
144+ i = 0
145+ for fp_in in sorted (list (time_arrays .keys ())):
146+ time_slice_to_write = slice (i , i + len (time_arrays [fp_in ]))
147+ logger .info (
148+ f"{ str (datetime .datetime .now ())} : Processing file : { fp_in } with slice : { str (time_slice_to_write )} "
149+ )
150+ try :
151+ write_period (
152+ fp_in ,
153+ fp_out ,
154+ region = {"time" : time_slice_to_write },
155+ variable = variable ,
156+ fs = FS ,
157+ )
158+ i = i + len (time_arrays [fp_in ])
159+ except Exception :
160+ logger .error (f"{ str (datetime .datetime .now ())} : encountered an error writing period of file { fp_in } to { fp_out } " )
161+ raise
162+
163+ logger .info (
164+ f"{ str (datetime .datetime .now ())} : End of files iteration"
165+ )
166+
167+ def controller (variable , out_fs = FS , overwrite = True , light = False ):
168+ fp_out = OUT_FP_PATTERN .format (VARIABLE_ID = variable )
169+ if out_fs .exists (fp_out ) and overwrite : # should always overwrite, the code is iterating over indices.
170+ logger .info (f"Removing already existing { fp_out } " )
171+ out_fs .rm (fp_out , recursive = True )
172+ prime_and_write_periods (fp_out , variable , light )
173+
174+
175+ # import this module and run it the following way :
176+ # controller('pr')
177+ # controller('tasmin')
178+ # controller('tasmax')
0 commit comments