-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathupdate_priors.py
More file actions
409 lines (349 loc) · 15.9 KB
/
update_priors.py
File metadata and controls
409 lines (349 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
"""Module to generate geoBAM priors and gaged priors for constrained run type.
Module overwrites GRADES data with gaged data for constrained run type after
uploading all new prior data to the Confluence S3 bucket. The module uses
command line arguments to determine which priors to generate.
Classes
-------
Priors
Functions
---------
main()
main method to generate, retrieve, and overwrite priors
"""
# Standard imports
import argparse
import datetime
import json
import os
from pathlib import Path
import sys
import traceback
# Local imports
from priors.gbpriors.GBPriorsGenerate import GBPriorsGenerate
from priors.gbpriors.GBPriorsUpdate import GBPriorsUpdate
from priors.grdc.GRDC import GRDC
from priors.sos.Sos import Sos
from priors.usgs.USGSUpdate import USGSUpdate
from priors.usgs.USGSPull import USGSPull
from priors.Riggs.RiggsUpdate import RiggsUpdate
from priors.Riggs.RiggsPull import RiggsPull
from priors.HydroShare.HSPull import HSp
from priors.HydroShare.HydroShareUpdate import HydroShareUpdate
# from priors.height_width_fits.HWF_extract import HWF_extraqct
# from priors.height_width_fits.HWF_update import HWF_update
# Third-party imports
import botocore
import numpy as np
# Constants
INPUT_DIR = Path("/mnt/data")
class Priors:
"""Class that coordinates the priors to be generated and stores them in
the SoS, uploads a new version to AWS, and overwrites gaged data.
Attributes
----------
cont: str
Continent abbreviation
run_type: str
'constrained' or 'unconstrained' data product type
priors_list: list
list of strings which determine which priors to retrieve
input_dir: Path
path to input data directory
sos_dir: Path
path to SoS directory on local storage
Methods
-------
update(run_type, priors_list)
Generate and update priors based on arguments.
"""
def __init__(self, cont, run_type, priors_list, input_dir, sos_dir,
sos_version, metadata_json, historic_qt, add_geospatial,
podaac_update, podaac_bucket, sword_version, sos_bucket="confluence-sos"):
"""
Parameters
----------
cont: str
Continent abbreviation
run_type: str
'constrained' or 'unconstrained' data product type
priors_list: list
list of strings which determine which priors to retrieve
input_dir: Path
path to input data directory
sos_dir: Path
path to SoS directory on local storage
"""
self.cont = cont
self.run_type = run_type
self.priors_list = priors_list
self.input_dir = input_dir
self.sos_dir = sos_dir
self.sos_version = sos_version
self.metadata_json = metadata_json
self.time_dict = {
"historic_qt": historic_qt[cont]
}
self.add_geospatial = add_geospatial
self.podaac_update = podaac_update
self.podaac_bucket = podaac_bucket
self.sos_bucket = sos_bucket
self.swordversion = sword_version
def execute_gbpriors(self, sos_file):
"""Create and execute GBPriors operations.
Parameters
----------
sos_file: Path
path to SOS file to update
"""
gen = GBPriorsGenerate(sos_file, self.input_dir / "swot")
gen.run_gb()
app = GBPriorsUpdate(gen.gb_dict, sos_file, metadata_json = self.metadata_json)
app.update_data()
return gen.swot_time
def execute_grdc(self, sos_file):
"""Create and execute GRDC operations.
Parameters
----------
sos_file: Path
path to SOS file to update
"""
grdc_file = self.input_dir / "gage" / "GRDC2SWORDout.nc"
grdc = GRDC(sos_file, grdc_file)
grdc.read_sos()
grdc.read_grdc()
grdc.map_data()
grdc.update_data()
return grdc.map_dict["grdc_qt"]
def execute_usgs(self, sos_file, start_date):
"""Create and execute USGS operations.
Parameters
----------
sos_file: Path
path to SOS file to update
"""
usgs_file = self.input_dir / "gage" / "USGStargetsV7_.nc"
today = datetime.datetime.today().strftime('%Y-%m-%d')
usgs_pull = USGSPull(usgs_targets = usgs_file, start_date = start_date, end_date = today, sos_file = sos_file)
usgs_pull.pull()
usgs_update = USGSUpdate(sos_file, usgs_pull.usgs_dict, metadata_json = self.metadata_json)
usgs_update.read_sos()
usgs_update.map_data()
usgs_update.update_data()
return usgs_update.map_dict["usgs_qt"]
def execute_Riggs(self, sos_file, start_date):
"""Create and execute Riggs operations.
Parameters
----------
sos_file: Path
path to SOS file to update
"""
Riggs_file = self.input_dir / "gage" / "Rtarget"
today = datetime.datetime.today().strftime("%Y-%m-%d")
Riggs_pull = RiggsPull(riggs_targets=Riggs_file, start_date=start_date, end_date=today, cont = self.cont, sos_file = sos_file)
Riggs_pull.pull()
Riggs_update = RiggsUpdate(sos_file, Riggs_pull.riggs_dict, metadata_json = self.metadata_json)
Riggs_update.read_sos()
Riggs_update.map_data()
Riggs_update.update_data()
# Retrieve time data
time_dict = {}
for agency in set(list(Riggs_update.Riggs_dict["Agency"])):
time_dict[agency] = Riggs_update.map_dict[agency]["Riggs_qt"]
return time_dict
def execute_HydroShare(self, sos_file):
#this is set up to take inputs but doesn't need any
hp=HSp()
hp.pull()#this gets you a dict with all HS data
HydroShare_update = HydroShareUpdate(sos_file, hp.HydroShare_dict, metadata_json = self.metadata_json)
HydroShare_update.read_sos()
HydroShare_update.map_data()
HydroShare_update.update_data()
# Retrieve time data
return HydroShare_update.map_dict["SWOT_SHAQ"]["HydroShare_qt"]
def locate_min_max(self):
"""Locate min and max time values."""
min_qt = datetime.datetime(1965,1,1,0,0,0)
max_qt = datetime.datetime(1965,1,1,0,0,0)
# Extract min and max from each prior
swot_ts = datetime.datetime(2000,1,1,0,0,0)
for agency, time in self.time_dict.items():
# Historic gage time
if agency == "historic_qt":
for data in time.values():
data_min = datetime.datetime.fromordinal(data["min"])
data_max = datetime.datetime.fromordinal(data["max"])
if data_min < min_qt or min_qt == datetime.datetime(1965,1,1,0,0,0):
min_qt = data_min
if data_max > max_qt or max_qt == datetime.datetime(1965,1,1,0,0,0):
max_qt = data_max
continue
# GeoBAM priors SWOT time
if agency == "gbpriors" and len(time) > 0:
gb_min = swot_ts + datetime.timedelta(seconds=np.nanmin(time))
gb_max = swot_ts + datetime.timedelta(seconds=np.nanmax(time))
if gb_min < min_qt or min_qt == datetime.datetime(1965,1,1,0,0,0):
min_qt = gb_min
if gb_max > max_qt or max_qt == datetime.datetime(1965,1,1,0,0,0):
max_qt = gb_max
continue
# All other gage agencies
if not np.isnan(time).all():
# time = time[~np.isnan(time)]
time = time[time>0]
time_min = datetime.datetime.fromordinal(int(np.nanmin(time)))
time_max = datetime.datetime.fromordinal(int(np.nanmax(time)))
if time_min < min_qt or min_qt == datetime.datetime(1965,1,1,0,0,0):
min_qt = time_min
if time_max > max_qt or max_qt == datetime.datetime(1965,1,1,0,0,0):
max_qt = time_max
# Convert time to string data
if min_qt == datetime.datetime(1965,1,1,0,0,0):
min_qt = "NO TIME DATA"
else:
min_qt = min_qt
if max_qt == datetime.datetime(1965,1,1,0,0,0):
max_qt = "NO TIME DATA"
else:
max_qt = max_qt
return min_qt, max_qt
def update(self):
"""Generate and update priors based on arguments."""
# Create SoS object to manage SoS operations
print(f"Copy and create new version of the SoS from bucket: {self.sos_bucket}.")
sos = Sos(self.cont, self.run_type, self.sos_dir, self.metadata_json,
self.priors_list, self.podaac_update, self.podaac_bucket,
self.sos_bucket, self.swordversion)
if self.podaac_bucket != 'local':
try:
sos.download_previous_sos(self.sos_version)
except Exception as e:
print(e)
traceback.print_exception(*sys.exc_info())
exit(1)
sos.create_new_version()
sos_file = sos.sos_file
sos_last_run_time = sos.last_run_time
# Retrieve geospatial coverage - pull if true flag
if self.add_geospatial:
sos.store_geospatial_data(INPUT_DIR / "sword" / f"{self.cont}_sword_v{self.swordversion}.nc")
print('Set geospatial coverage for reaches and nodes including maximum and minimum coverage.')
# Determine run type and add requested gage priors
# removed constrained run logic check as both unconstrained and constrained now pull gauge data
# in the future we should write the gauge data to a separate nc file for both
if "grdc" in self.priors_list:
print("Updating GRDC priors.")
self.execute_grdc(sos_file)
if "usgs" in self.priors_list and self.cont == "na":
print("Updating USGS priors.")
# self.time_dict["usgs"] = self.execute_usgs(sos_file, start_date = sos_last_run_time)
self.time_dict["usgs"] = self.execute_usgs(sos_file, start_date = '2022-12-2')
# adding na to this list for now to avoid canada integration
if 'riggs' in self.priors_list and self.cont not in ['as']:
# riggs modules are having problems with downloading just the delta
# change start date to sos_last_run_time to continue development
self.time_dict.update(self.execute_Riggs(sos_file, start_date = '1980-1-1'))
# Add geoBAM priors if requested (for either data product)
if "gbpriors" in self.priors_list:
print("Updating geoBAM priors.")
self.time_dict["gbpriors"] = self.execute_gbpriors(sos_file)
if "hydroshare" in self.priors_list:
print("Updating hydroshare.")
self.time_dict["HydroShare"] = self.execute_HydroShare(sos_file)
# only overwrite if doing a constrained run
if self.run_type == "constrained":
# Overwrite GRADES with gage priors
print("Overwriting GRADES data with gaged priors.")
sos.overwrite_grades()
# Update time coverage in sos file global attributes
print("Locating min and max time values.")
min_qt, max_qt = self.locate_min_max()
print("Updating time coverage based on min and max values.")
sos.update_time_coverage(min_qt, max_qt)
print(f'Updated time coverage of the SoS: {min_qt.strftime("%Y-%m-%dT%H:%M:%S")} to {max_qt.strftime("%Y-%m-%dT%H:%M:%S")}')
# Upload priors results to S3 bucket
print("Uploading new SoS priors version.")
sos.upload_file()
def create_args():
"""Create and return argparser with arguments."""
arg_parser = argparse.ArgumentParser(description="Update Confluence SoS priors.")
arg_parser.add_argument("-i",
"--index",
type=int,
help="Index value to select continent to run on")
arg_parser.add_argument("-r",
"--runtype",
type=str,
choices=["constrained", "unconstrained"],
help="Indicates what type of run to generate priors for.",
default="constrained")
arg_parser.add_argument("-p",
"--priors",
type=str,
nargs="+",
default=[],
help="List: usgs, grdc, riggs, gbpriors, hydroshare")
arg_parser.add_argument("-o",
"--sosversion",
type=str,
default='0001',
help="Priors version to create SoS for")
arg_parser.add_argument("-m",
"--metadatajson",
type=Path,
default=Path(__file__).parent / "metadata" / "metadata.json",
help="Path to JSON file that contains global attribute values")
arg_parser.add_argument("-qt",
"--historicqt",
type=Path,
default=Path(__file__).parent / "metadata" / "historicQt.json",
help="Path to JSON file that contains historic timestamps for discharge from gage agencies")
arg_parser.add_argument("-g",
"--addgeospatial",
action="store_true",
help="Indicate requirement to add geospatial data")
arg_parser.add_argument("-u",
"--podaacupload",
action="store_true",
help="Indicate requirement to upload to PO.DAAC S3 Bucket")
arg_parser.add_argument("-b",
"--podaacbucket",
type=str,
help="Name of PO.DAAC S3 bucket to upload to")
arg_parser.add_argument("-s",
"--sosbucket",
type=str,
default="confluence-sos",
help="Name of SoS S3 bucket to upload to")
arg_parser.add_argument("--swordversion",
type=str,
default="16",
help="Version of sword to run on")
return arg_parser
def main():
"""Main method to generate, retrieve, and overwrite priors."""
# Store command line arguments
arg_parser = create_args()
args = arg_parser.parse_args()
for arg in vars(args):
print(f"{arg}: {getattr(args, arg)}")
# Get continent to run on
i = int(args.index) if args.index != -235 else int(os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX"))
with open(INPUT_DIR / "continent.json") as jsonfile:
cont = list(json.load(jsonfile)[i].keys())[0]
# Load metadata JSON
with open(args.metadatajson) as jf:
variable_atts = json.load(jf)
# Load historic q timestamps
with open(args.historicqt) as jf:
historicqt = json.load(jf)
# Retrieve and update priors
priors = Priors(cont = cont, run_type = args.runtype, priors_list = args.priors,
input_dir = INPUT_DIR, sos_dir = INPUT_DIR / "sos", sos_version = args.sosversion, metadata_json = variable_atts,
historic_qt = historicqt, add_geospatial = args.addgeospatial, podaac_update = args.podaacupload,
podaac_bucket = args.podaacbucket, sos_bucket = args.sosbucket, sword_version = args.swordversion)
priors.update()
if __name__ == "__main__":
start = datetime.datetime.now()
main()
end = datetime.datetime.now()
print(f"Execution time: {end - start}")