-
Notifications
You must be signed in to change notification settings - Fork 21
Fetch GEFS from AWS #712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fetch GEFS from AWS #712
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,12 @@ | |
| CHECK_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/{}/prod' | ||
| BASE_URL = 'https://nomads.ncep.noaa.gov/cgi-bin/' | ||
|
|
||
| GEFS_BASE_URL = 'https://noaa-gefs-pds.s3.amazonaws.com' | ||
|
|
||
| # When querying aws for directories, start-after is used to paginate. | ||
| # 2021-01-01 is the date the expected files and folder structure | ||
| # appears. | ||
| GEFS_STARTAFTER = 'gefs.20210101' | ||
|
|
||
| GFS_0P25_1HR = {'endpoint': 'filter_gfs_0p25_1hr.pl', | ||
| 'file': 'gfs.t{init_hr:02d}z.pgrb2.0p25.f{valid_hr:03d}', | ||
|
|
@@ -288,7 +294,10 @@ async def get_available_dirs(session, model): | |
| """Get the available date/date+init_hr directories""" | ||
| simple_model = _simple_model(model) | ||
| is_init_date = 'init_date' in model['dir'] | ||
| model_url = BASE_URL + model['endpoint'] | ||
| if simple_model == 'gefs': | ||
| return await get_available_gefs_dirs(session) | ||
| else: | ||
| model_url = BASE_URL + model['endpoint'] | ||
|
|
||
| async def _get(model_url): | ||
| async with session.get(model_url, raise_for_status=True) as r: | ||
|
|
@@ -304,6 +313,33 @@ async def _get(model_url): | |
| return list_avail_days | ||
|
|
||
|
|
||
| @abort_all_on_exception | ||
| async def get_available_gefs_dirs(session, start_after=GEFS_STARTAFTER): | ||
| params = { | ||
| 'list-type': 2, | ||
| 'delimiter': '/', | ||
| } | ||
| if start_after is not None: | ||
| params['start-after'] = start_after | ||
|
|
||
| async def _get(session): | ||
| async with session.get( | ||
| GEFS_BASE_URL, | ||
| params=params, | ||
| raise_for_status=True | ||
| ) as r: | ||
| return await r.text() | ||
| listing = await _get(session) | ||
| all_dirs = re.findall("gefs\\.([0-9]{8})", listing) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment like |
||
| if len(all_dirs) < 1000: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aws starts paginating at 1k items? |
||
| return all_dirs | ||
| else: | ||
| return all_dirs + await get_available_gefs_dirs( | ||
| session, | ||
| 'gefs.'+all_dirs[-1] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a proponent of calling kwargs as kwargs, so |
||
| ) | ||
|
|
||
|
|
||
| def _process_params(model, init_time): | ||
| """Generator to get the parameters for fetching forecasts for a given | ||
| model at a given init_time""" | ||
|
|
@@ -329,15 +365,24 @@ async def check_next_inittime(session, init_time, model): | |
| """Check if data from the next model initializtion time is available""" | ||
| next_inittime = init_time + pd.Timedelta(model['update_freq']) | ||
| simple_model = _simple_model(model) | ||
| next_init_url = (CHECK_URL.format(model.get('check_url_name', | ||
| simple_model)) | ||
| + model['dir'].format( | ||
| init_date=next_inittime.strftime('%Y%m%d'), | ||
| init_dt=next_inittime.strftime('%Y%m%d%H'), | ||
| init_hr=next_inittime.strftime('%H')) | ||
| + '/' + model['file'].format(init_hr=next_inittime.hour, | ||
| valid_hr=0)) | ||
|
|
||
| if simple_model == 'gefs': | ||
| next_init_url = ( | ||
| GEFS_BASE_URL | ||
| + model['dir'].format( | ||
| init_date=next_inittime.strftime('%Y%m%d'), | ||
| init_dt=next_inittime.strftime('%Y%m%d%H'), | ||
| init_hr=next_inittime.strftime('%H')) | ||
| + '/' + model['file'].format(init_hr=next_inittime.hour, | ||
| valid_hr=0)) | ||
| else: | ||
| next_init_url = (CHECK_URL.format(model.get('check_url_name', | ||
| simple_model)) | ||
| + model['dir'].format( | ||
| init_date=next_inittime.strftime('%Y%m%d'), | ||
| init_dt=next_inittime.strftime('%Y%m%d%H'), | ||
| init_hr=next_inittime.strftime('%H')) | ||
| + '/' + model['file'].format(init_hr=next_inittime.hour, | ||
| valid_hr=0)) | ||
| try: | ||
| async with session.head(next_init_url) as r: | ||
| if r.status == 200: | ||
|
|
@@ -371,9 +416,13 @@ async def files_to_retrieve(session, model, modelpath, init_time): | |
| if filename.exists(): | ||
| yield next_params | ||
| continue | ||
| next_model_url = (CHECK_URL.format(model.get('check_url_name', | ||
| simple_model)) | ||
| + next_params['dir'] + '/' + next_params['file']) | ||
| if simple_model == 'gefs': | ||
| next_model_url = (GEFS_BASE_URL + next_params['dir'] | ||
| + '/' + next_params['file']) | ||
| else: | ||
| next_model_url = (CHECK_URL.format(model.get('check_url_name', | ||
| simple_model)) | ||
| + next_params['dir'] + '/' + next_params['file']) | ||
| while True: | ||
| # is the next file ready? | ||
| try: | ||
|
|
@@ -465,8 +514,13 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize): | |
| aiohttp.ClientResponseError | ||
| When the HTTP request fails/returns a status code >= 400 | ||
| """ | ||
| endpoint = params.pop('endpoint') | ||
| url = BASE_URL + endpoint | ||
| slice_domain = False | ||
| if _simple_model(params) == 'gefs': | ||
| url = GEFS_BASE_URL + params['dir'] + '/' + params['file'] | ||
| slice_domain = True | ||
| else: | ||
| endpoint = params.pop('endpoint') | ||
| url = BASE_URL + endpoint | ||
| filename = get_filename(basepath, init_time, params) | ||
| if filename.exists(): | ||
| return filename | ||
|
|
@@ -475,7 +529,22 @@ async def fetch_grib_files(session, params, basepath, init_time, chunksize): | |
| logger.info('Getting file %s', filename) | ||
| tmpfile = filename.with_name('.tmp_' + filename.name) | ||
| await get_with_retries(_get_file, session, url, params, tmpfile, chunksize) | ||
| tmpfile.rename(filename) | ||
| if slice_domain: | ||
| logging.debug('Truncating grib file %s', tmpfile) | ||
| try: | ||
| # S3 files are the full domain, slice down to domain of | ||
| # interest before saving | ||
| subprocess.run( | ||
| f'wgrib2 {tmpfile} -small_grib {_domain_args()} {filename}', | ||
| shell=True, check=True, capture_output=True) | ||
| except subprocess.CalledProcessError as e: | ||
| logger.error('Error applying domain to file %s\n%s', | ||
| filename, e.stderr) | ||
| raise OSError | ||
| else: | ||
| logging.debug('Sucessfully truncated to: %s', filename) | ||
| else: | ||
| tmpfile.rename(filename) | ||
| logging.debug('Successfully saved %s', filename) | ||
| return filename | ||
|
|
||
|
|
@@ -730,3 +799,11 @@ def check_wgrib2(): | |
| if shutil.which('wgrib2') is None: | ||
| logger.error('wgrib2 was not found in PATH and is required') | ||
| sys.exit(1) | ||
|
|
||
|
|
||
| def _domain_args(): | ||
| lonW = DOMAIN["leftlon"] | ||
| lonE = DOMAIN["rightlon"] | ||
| latS = DOMAIN["bottomlat"] | ||
| latN = DOMAIN["toplat"] | ||
| return f'{lonW}:{lonE} {latS}:{latN}' | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be helpful in the function documentation too