Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions solarforecastarbiter/io/fetch/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
CHECK_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/{}/prod'
BASE_URL = 'https://nomads.ncep.noaa.gov/cgi-bin/'

GEFS_BASE_URL = 'https://noaa-gefs-pds.s3.amazonaws.com'

# When querying aws for directories, start-after is used to paginate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be helpful in the function documentation too

# 2021-01-01 is the date the expected files and folder structure
# appears.
GEFS_STARTAFTER = 'gefs.20210101'

GFS_0P25_1HR = {'endpoint': 'filter_gfs_0p25_1hr.pl',
'file': 'gfs.t{init_hr:02d}z.pgrb2.0p25.f{valid_hr:03d}',
Expand Down Expand Up @@ -288,7 +294,10 @@ async def get_available_dirs(session, model):
"""Get the available date/date+init_hr directories"""
simple_model = _simple_model(model)
is_init_date = 'init_date' in model['dir']
model_url = BASE_URL + model['endpoint']
if simple_model == 'gefs':
return await get_available_gefs_dirs(session)
else:
model_url = BASE_URL + model['endpoint']

async def _get(model_url):
async with session.get(model_url, raise_for_status=True) as r:
Expand All @@ -304,6 +313,33 @@ async def _get(model_url):
return list_avail_days


@abort_all_on_exception
async def get_available_gefs_dirs(session, start_after=GEFS_STARTAFTER):
params = {
'list-type': 2,
'delimiter': '/',
}
if start_after is not None:
params['start-after'] = start_after

async def _get(session):
async with session.get(
GEFS_BASE_URL,
params=params,
raise_for_status=True
) as r:
return await r.text()
listing = await _get(session)
all_dirs = re.findall("gefs\\.([0-9]{8})", listing)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as r"gefs\.([0-9]{8})"? I think r-strings are much easier to read for regex

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment like

# xml contains many entries formatted similar to 
# <CommonPrefixes><Prefix>gefs.20210101/</Prefix>
# regex creates a list like ['20210101', '20210102'...]

if len(all_dirs) < 1000:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws starts paginating at 1k items?

return all_dirs
else:
return all_dirs + await get_available_gefs_dirs(
session,
'gefs.'+all_dirs[-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a proponent of calling kwargs as kwargs, so start_after='gefs.'+all_dirs[-1]

)


def _process_params(model, init_time):
"""Generator to get the parameters for fetching forecasts for a given
model at a given init_time"""
Expand Down Expand Up @@ -371,9 +407,13 @@ async def files_to_retrieve(session, model, modelpath, init_time):
if filename.exists():
yield next_params
continue
next_model_url = (CHECK_URL.format(model.get('check_url_name',
simple_model))
+ next_params['dir'] + '/' + next_params['file'])
if simple_model == 'gefs':
next_model_url = (GEFS_BASE_URL + next_params['dir']
+ '/' + next_params['file'])
else:
next_model_url = (CHECK_URL.format(model.get('check_url_name',
simple_model))
+ next_params['dir'] + '/' + next_params['file'])
while True:
# is the next file ready?
try:
Expand Down Expand Up @@ -465,8 +505,13 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize):
aiohttp.ClientResponseError
When the HTTP request fails/returns a status code >= 400
"""
endpoint = params.pop('endpoint')
url = BASE_URL + endpoint
slice_domain = False
if _simple_model(params) == 'gefs':
url = GEFS_BASE_URL + params['dir'] + '/' + params['file']
slice_domain = True
else:
endpoint = params.pop('endpoint')
url = BASE_URL + endpoint
filename = get_filename(basepath, init_time, params)
if filename.exists():
return filename
Expand All @@ -475,7 +520,22 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize):
logger.info('Getting file %s', filename)
tmpfile = filename.with_name('.tmp_' + filename.name)
await get_with_retries(_get_file, session, url, params, tmpfile, chunksize)
tmpfile.rename(filename)
if slice_domain:
logging.debug('Truncating grib file %s', tmpfile)
try:
# S3 files are the full domain, slice down to domain of
# interest before saving
subprocess.run(
f'wgrib2 {tmpfile} -small_grib {domain_args()} {filename}',
shell=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
logger.error('Error applying domain to file %s\n%s',
filename, e.stderr)
raise OSError
else:
logging.debug('Sucessfully truncated to: %s', filename)
else:
tmpfile.rename(filename)
logging.debug('Successfully saved %s', filename)
return filename

Expand Down Expand Up @@ -730,3 +790,11 @@ def check_wgrib2():
if shutil.which('wgrib2') is None:
logger.error('wgrib2 was not found in PATH and is required')
sys.exit(1)


def domain_args():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private function

lonW = DOMAIN["leftlon"]
lonE = DOMAIN["rightlon"]
latS = DOMAIN["bottomlat"]
latN = DOMAIN["toplat"]
return f'{lonW}:{lonE} {latS}:{latN}'