|
| 1 | +import base64 |
| 2 | +import json |
| 3 | +import os |
| 4 | + |
| 5 | +import apache_beam as beam |
| 6 | +import requests |
| 7 | +from cmr import GranuleQuery |
| 8 | + |
| 9 | +from pangeo_forge_recipes.patterns import pattern_from_file_sequence |
| 10 | +from pangeo_forge_recipes.transforms import ( |
| 11 | + Indexed, |
| 12 | + OpenURLWithFSSpec, |
| 13 | + OpenWithXarray, |
| 14 | + StoreToZarr, |
| 15 | + T, |
| 16 | +) |
| 17 | + |
| 18 | +HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#' |
| 19 | +S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#' |
| 20 | +AUTH_HEADERS = {'headers': {'Authorization': f"Bearer {os.environ['EARTHDATA_TOKEN']}"}} |
| 21 | +CREDENTIALS_API = 'https://archive.podaac.earthdata.nasa.gov/s3credentials' |
| 22 | + |
| 23 | + |
| 24 | +def earthdata_auth(username, password): |
| 25 | + login_resp = requests.get(CREDENTIALS_API, allow_redirects=False) |
| 26 | + login_resp.raise_for_status() |
| 27 | + |
| 28 | + encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii')) |
| 29 | + auth_redirect = requests.post( |
| 30 | + login_resp.headers['location'], |
| 31 | + data={'credentials': encoded_auth}, |
| 32 | + headers={'Origin': CREDENTIALS_API}, |
| 33 | + allow_redirects=False, |
| 34 | + ) |
| 35 | + auth_redirect.raise_for_status() |
| 36 | + |
| 37 | + final = requests.get(auth_redirect.headers['location'], allow_redirects=False) |
| 38 | + |
| 39 | + results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']}) |
| 40 | + results.raise_for_status() |
| 41 | + |
| 42 | + creds = json.loads(results.content) |
| 43 | + return { |
| 44 | + 'aws_access_key_id': creds['accessKeyId'], |
| 45 | + 'aws_secret_access_key': creds['secretAccessKey'], |
| 46 | + 'aws_session_token': creds['sessionToken'], |
| 47 | + } |
| 48 | + |
| 49 | + |
| 50 | +def filter_data_links(links, rel): |
| 51 | + return filter(lambda link: link['rel'] == rel and link['href'].endswith('.nc'), links) |
| 52 | + |
| 53 | + |
| 54 | +def gen_data_links(rel): |
| 55 | + granules = GranuleQuery().short_name('MUR-JPL-L4-GLOB-v4.1').downloadable(True).get_all() |
| 56 | + for granule in granules: |
| 57 | + s3_links = filter_data_links(granule['links'], rel) |
| 58 | + first = next(s3_links, None) |
| 59 | + # throw if CMR does not have exactly one S3 link for an item |
| 60 | + if not first or next(s3_links, None) is not None: |
| 61 | + raise ValueError(f"Expected 1 link of type {rel} on {granule['title']}") |
| 62 | + yield first['href'] |
| 63 | + |
| 64 | + |
| 65 | +class Preprocess(beam.PTransform): |
| 66 | + """Filters variables to only be the non-optional L4 variables.""" |
| 67 | + |
| 68 | + @staticmethod |
| 69 | + def _preproc(item: Indexed[T]) -> Indexed[T]: |
| 70 | + SELECTED_VARS = {'analysed_sst', 'analysis_error', 'mask', 'sea_ice_fraction'} |
| 71 | + index, ds = item |
| 72 | + return index, ds.drop([k for k in ds.data_vars.keys() if k not in SELECTED_VARS]) |
| 73 | + |
| 74 | + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: |
| 75 | + return pcoll | beam.Map(self._preproc) |
| 76 | + |
| 77 | + |
| 78 | +# use HTTP_REL if S3 access is not possible. S3_REL is faster. |
| 79 | +selected_rel = S3_REL |
| 80 | +pattern = pattern_from_file_sequence( |
| 81 | + list(gen_data_links(selected_rel)), |
| 82 | + 'time', |
| 83 | +) |
| 84 | +open_kwargs = ( |
| 85 | + AUTH_HEADERS |
| 86 | + if selected_rel == HTTP_REL |
| 87 | + else { |
| 88 | + 'client_kwargs': earthdata_auth( |
| 89 | + os.environ['EARTHDATA_USERNAME'], os.environ['EARTHDATA_PASSWORD'] |
| 90 | + ) |
| 91 | + } |
| 92 | +) |
| 93 | +recipe = ( |
| 94 | + beam.Create(pattern.items()) |
| 95 | + | OpenURLWithFSSpec(open_kwargs=open_kwargs) |
| 96 | + | OpenWithXarray(file_type=pattern.file_type) |
| 97 | + | Preprocess() |
| 98 | + | StoreToZarr( |
| 99 | + store_name='mursst.zarr', |
| 100 | + combine_dims=pattern.combine_dim_keys, |
| 101 | + target_chunks={'time': 1, 'lat': 1800, 'lon': 3600}, |
| 102 | + ) |
| 103 | +) |
0 commit comments