diff --git a/solarforecastarbiter/io/fetch/nwp.py b/solarforecastarbiter/io/fetch/nwp.py index 7668eb6d..54084b13 100644 --- a/solarforecastarbiter/io/fetch/nwp.py +++ b/solarforecastarbiter/io/fetch/nwp.py @@ -57,6 +57,12 @@ CHECK_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/{}/prod' BASE_URL = 'https://nomads.ncep.noaa.gov/cgi-bin/' +GEFS_BASE_URL = 'https://noaa-gefs-pds.s3.amazonaws.com' + +# When querying aws for directories, start-after is used to paginate. +# 2021-01-01 is the date the expected files and folder structure +# appears. +GEFS_STARTAFTER = 'gefs.20210101' GFS_0P25_1HR = {'endpoint': 'filter_gfs_0p25_1hr.pl', 'file': 'gfs.t{init_hr:02d}z.pgrb2.0p25.f{valid_hr:03d}', @@ -288,7 +294,10 @@ async def get_available_dirs(session, model): """Get the available date/date+init_hr directories""" simple_model = _simple_model(model) is_init_date = 'init_date' in model['dir'] - model_url = BASE_URL + model['endpoint'] + if simple_model == 'gefs': + return await get_available_gefs_dirs(session) + else: + model_url = BASE_URL + model['endpoint'] async def _get(model_url): async with session.get(model_url, raise_for_status=True) as r: @@ -304,6 +313,33 @@ async def _get(model_url): return list_avail_days +@abort_all_on_exception +async def get_available_gefs_dirs(session, start_after=GEFS_STARTAFTER): + params = { + 'list-type': 2, + 'delimiter': '/', + } + if start_after is not None: + params['start-after'] = start_after + + async def _get(session): + async with session.get( + GEFS_BASE_URL, + params=params, + raise_for_status=True + ) as r: + return await r.text() + listing = await _get(session) + all_dirs = re.findall("gefs\\.([0-9]{8})", listing) + if len(all_dirs) < 1000: + return all_dirs + else: + return all_dirs + await get_available_gefs_dirs( + session, + 'gefs.'+all_dirs[-1] + ) + + def _process_params(model, init_time): """Generator to get the parameters for fetching forecasts for a given model at a given init_time""" @@ -329,15 +365,24 @@ async def check_next_inittime(session, init_time, model): """Check if data from the next model initializtion time is available""" next_inittime = init_time + pd.Timedelta(model['update_freq']) simple_model = _simple_model(model) - next_init_url = (CHECK_URL.format(model.get('check_url_name', - simple_model)) - + model['dir'].format( - init_date=next_inittime.strftime('%Y%m%d'), - init_dt=next_inittime.strftime('%Y%m%d%H'), - init_hr=next_inittime.strftime('%H')) - + '/' + model['file'].format(init_hr=next_inittime.hour, - valid_hr=0)) - + if simple_model == 'gefs': + next_init_url = ( + GEFS_BASE_URL + + model['dir'].format( + init_date=next_inittime.strftime('%Y%m%d'), + init_dt=next_inittime.strftime('%Y%m%d%H'), + init_hr=next_inittime.strftime('%H')) + + '/' + model['file'].format(init_hr=next_inittime.hour, + valid_hr=0)) + else: + next_init_url = (CHECK_URL.format(model.get('check_url_name', + simple_model)) + + model['dir'].format( + init_date=next_inittime.strftime('%Y%m%d'), + init_dt=next_inittime.strftime('%Y%m%d%H'), + init_hr=next_inittime.strftime('%H')) + + '/' + model['file'].format(init_hr=next_inittime.hour, + valid_hr=0)) try: async with session.head(next_init_url) as r: if r.status == 200: @@ -371,9 +416,13 @@ async def files_to_retrieve(session, model, modelpath, init_time): if filename.exists(): yield next_params continue - next_model_url = (CHECK_URL.format(model.get('check_url_name', - simple_model)) - + next_params['dir'] + '/' + next_params['file']) + if simple_model == 'gefs': + next_model_url = (GEFS_BASE_URL + next_params['dir'] + + '/' + next_params['file']) + else: + next_model_url = (CHECK_URL.format(model.get('check_url_name', + simple_model)) + + next_params['dir'] + '/' + next_params['file']) while True: # is the next file ready? try: @@ -465,8 +514,13 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize): aiohttp.ClientResponseError When the HTTP request fails/returns a status code >= 400 """ - endpoint = params.pop('endpoint') - url = BASE_URL + endpoint + slice_domain = False + if _simple_model(params) == 'gefs': + url = GEFS_BASE_URL + params['dir'] + '/' + params['file'] + slice_domain = True + else: + endpoint = params.pop('endpoint') + url = BASE_URL + endpoint filename = get_filename(basepath, init_time, params) if filename.exists(): return filename @@ -475,7 +529,22 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize): logger.info('Getting file %s', filename) tmpfile = filename.with_name('.tmp_' + filename.name) await get_with_retries(_get_file, session, url, params, tmpfile, chunksize) - tmpfile.rename(filename) + if slice_domain: + logging.debug('Truncating grib file %s', tmpfile) + try: + # S3 files are the full domain, slice down to domain of + # interest before saving + subprocess.run( + f'wgrib2 {tmpfile} -small_grib {_domain_args()} {filename}', + shell=True, check=True, capture_output=True) + except subprocess.CalledProcessError as e: + logger.error('Error applying domain to file %s\n%s', + filename, e.stderr) + raise OSError + else: + logging.debug('Sucessfully truncated to: %s', filename) + else: + tmpfile.rename(filename) logging.debug('Successfully saved %s', filename) return filename @@ -730,3 +799,11 @@ def check_wgrib2(): if shutil.which('wgrib2') is None: logger.error('wgrib2 was not found in PATH and is required') sys.exit(1) + + +def _domain_args(): + lonW = DOMAIN["leftlon"] + lonE = DOMAIN["rightlon"] + latS = DOMAIN["bottomlat"] + latN = DOMAIN["toplat"] + return f'{lonW}:{lonE} {latS}:{latN}'