-
Notifications
You must be signed in to change notification settings - Fork 60
MUR SST Kerchunk Recipe #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 16 commits
8e77f78
37000c6
1f0218b
fe4f047
ca7c666
7897490
b82e52b
e90707b
32b7c48
40e2885
26af6c7
df6ec0e
00f5992
31d6b9e
01f9ced
ece4650
b4e2fbb
02d80c2
d270f9b
b56af98
293c453
78bb384
7204f67
30cad0a
bc075dc
88394b4
f881b30
bd4a3d6
b97b227
271d320
2c411b0
f62d830
36f2e71
bc3f2df
b5e1b18
46faa71
a4da5b6
644c753
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| title: 'GHRSST Level 4 MUR Global Foundation Sea Surface Temperature Analysis (v4.1)' | ||
| description: 'A Group for High Resolution Sea Surface Temperature (GHRSST) Level 4 sea surface temperature analysis produced as a retrospective dataset (four day latency) and near-real-time dataset (one day latency) at the JPL Physical Oceanography DAAC using wavelets as basis functions in an optimal interpolation approach on a global 0.01 degree grid' | ||
| pangeo_forge_version: '0.9.2' | ||
| recipes: | ||
| - id: MUR-JPL-L4-GLOB-v4.1 | ||
| object: 'recipe:recipe' | ||
| provenance: | ||
| providers: | ||
| - name: 'NASA JPL PO.DAAC' | ||
| description: 'Physical Oceanography Distributed Active Archive Center' | ||
| roles: | ||
| - producer | ||
| - licensor | ||
| url: https://podaac.jpl.nasa.gov/dataset/MUR-JPL-L4-GLOB-v4.1 | ||
| license: 'Open Data' | ||
| maintainers: | ||
| - name: 'Development Seed' | ||
| github: developmentseed | ||
| bakery: | ||
| id: 'pangeo-ldeo-nsf-earthcube' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| import base64 | ||
| import json | ||
| import os | ||
| from dataclasses import dataclass, field | ||
| from typing import Dict, Set | ||
|
|
||
| import apache_beam as beam | ||
| import pandas as pd | ||
| import requests | ||
| import xarray as xr | ||
| import zarr | ||
| from requests.auth import HTTPBasicAuth | ||
|
|
||
| from pangeo_forge_recipes.patterns import ConcatDim, FilePattern | ||
| from pangeo_forge_recipes.transforms import OpenWithKerchunk, WriteCombinedReference, ConsolidateMetadata | ||
|
|
||
| HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#' | ||
| S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#' | ||
|
|
||
| ED_USERNAME = os.environ['EARTHDATA_USERNAME'] | ||
| ED_PASSWORD = os.environ['EARTHDATA_PASSWORD'] | ||
| earthdata_protocol = os.environ['PROTOCOL'] or 'https' | ||
|
|
||
| if earthdata_protocol not in ('https', 's3'): | ||
| raise ValueError(f'Unknown ED_PROTOCOL: {earthdata_protocol}') | ||
|
|
||
| CREDENTIALS_API = 'https://archive.podaac.earthdata.nasa.gov/s3credentials' | ||
| SHORT_NAME = 'MUR-JPL-L4-GLOB-v4.1' | ||
| CONCAT_DIMS = ['time'] | ||
| IDENTICAL_DIMS = ['lat', 'lon'] | ||
| SELECTED_VARS = ['analysed_sst', 'analysis_error', 'mask', 'sea_ice_fraction'] | ||
|
|
||
| dates = [ | ||
| d.to_pydatetime().strftime('%Y%m%d') | ||
| for d in pd.date_range('2002-06-01', '2002-07-31', freq='D') | ||
| ] | ||
|
|
||
|
|
||
| def make_filename(time): | ||
| if earthdata_protocol == 'https': | ||
| base_url = ( | ||
| f'https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/{SHORT_NAME}/' | ||
| ) | ||
| else: | ||
| base_url = f's3://podaac-ops-cumulus-protected/{SHORT_NAME}/' | ||
| # example file: "/20020601090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc" | ||
| return f'{base_url}{time}090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc' | ||
|
|
||
|
|
||
| concat_dim = ConcatDim('time', dates, nitems_per_file=1) | ||
| pattern = FilePattern(make_filename, concat_dim) | ||
|
|
||
|
|
||
| def get_earthdata_token(username, password): | ||
| # URL for the Earthdata login endpoint | ||
| login_url = 'https://urs.earthdata.nasa.gov/api/users/token' | ||
| auth = HTTPBasicAuth(username, password) | ||
|
|
||
| # Request a new token | ||
| response = requests.get(f'{login_url}s', auth=auth) | ||
|
|
||
| # Check if the request was successful | ||
| if response.status_code == 200: | ||
| if len(response.json()) == 0: | ||
| # create new token | ||
| response = requests.post(login_url, auth=auth) | ||
| if response.status_code == 200: | ||
| token = response.json()['access_token'] | ||
| else: | ||
| raise Exception('Error: Unable to generate Earthdata token.') | ||
| else: | ||
| # Token is usually in the response's JSON data | ||
| token = response.json()[0]['access_token'] | ||
| return token | ||
| else: | ||
| raise Exception('Error: Unable to retrieve Earthdata token.') | ||
|
|
||
|
|
||
| def get_s3_creds(username, password, credentials_api=CREDENTIALS_API): | ||
| login_resp = requests.get(CREDENTIALS_API, allow_redirects=False) | ||
| login_resp.raise_for_status() | ||
|
|
||
| encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii')) | ||
| auth_redirect = requests.post( | ||
| login_resp.headers['location'], | ||
| data={'credentials': encoded_auth}, | ||
| headers={'Origin': credentials_api}, | ||
| allow_redirects=False, | ||
| ) | ||
| auth_redirect.raise_for_status() | ||
|
|
||
| final = requests.get(auth_redirect.headers['location'], allow_redirects=False) | ||
|
|
||
| results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']}) | ||
| results.raise_for_status() | ||
|
|
||
| creds = json.loads(results.content) | ||
| return { | ||
| 'key': creds['accessKeyId'], | ||
| 'secret': creds['secretAccessKey'], | ||
| 'token': creds['sessionToken'], | ||
| 'anon': False, | ||
| } | ||
|
|
||
|
|
||
| def earthdata_auth(username: str, password: str): | ||
| if earthdata_protocol == 's3': | ||
| return get_s3_creds(username, password) | ||
| else: | ||
| token = get_earthdata_token(username, password) | ||
| return {'headers': {'Authorization': f'Bearer {token}'}} | ||
|
|
||
|
|
||
| fsspec_open_kwargs = earthdata_auth(ED_USERNAME, ED_PASSWORD) | ||
|
|
||
|
|
||
| @dataclass | ||
| class FilterVars(beam.PTransform): | ||
| """Filter kerchunk variables by name.""" | ||
|
|
||
| keep: Set[str] = field(default_factory=set) | ||
|
|
||
| @staticmethod | ||
| def _filter(item: list[dict], keep: Set[str]) -> list[dict]: | ||
| filtered = [] | ||
| for translated in item: | ||
| # see Kerchunk docs for dropping a variable example | ||
| translated['refs'] = { | ||
| k: v | ||
| for k, v in translated['refs'].items() | ||
| if '/' not in k or k.split('/')[0] in keep | ||
| } | ||
| filtered.append(translated) | ||
| return filtered | ||
|
|
||
| def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||
| return pcoll | beam.Map(self._filter, keep=self.keep) | ||
|
|
||
|
|
||
| @dataclass | ||
| class ValidateDatasetDimensions(beam.PTransform): | ||
| """Open the reference.json in xarray and validate dimensions.""" | ||
|
|
||
| expected_dims: Dict = field(default_factory=dict) | ||
|
|
||
| @staticmethod | ||
| def _validate(zarr_store: zarr.storage.FSStore, expected_dims: Dict) -> None: | ||
| ds = xr.open_dataset(zarr_store, engine='zarr') | ||
| if set(ds.dims) != expected_dims.keys(): | ||
| raise ValueError(f'Expected dimensions {expected_dims.keys()}, got {ds.dims}') | ||
| for dim, bounds in expected_dims.items(): | ||
| if bounds is None: | ||
| continue | ||
| lo, hi = bounds | ||
| actual_lo, actual_hi = round(ds[dim].data.min()), round(ds[dim].data.max()) | ||
| if actual_lo != lo or actual_hi != hi: | ||
| raise ValueError(f'Expected {dim} range [{lo}, {hi}], got {actual_lo, actual_hi}') | ||
| return ds | ||
|
|
||
| def expand( | ||
| self, | ||
| pcoll: beam.PCollection, | ||
| ) -> beam.PCollection: | ||
| return pcoll | beam.Map(self._validate, expected_dims=self.expected_dims) | ||
|
|
||
|
|
||
| recipe = ( | ||
| beam.Create(pattern.items()) | ||
| | OpenWithKerchunk( | ||
| remote_protocol=earthdata_protocol, | ||
| file_type=pattern.file_type, | ||
| # lat/lon are around 5k, this is the best option for forcing kerchunk to inline them | ||
| inline_threshold=6000, | ||
| storage_options=fsspec_open_kwargs, | ||
| ) | ||
| | FilterVars(keep={*pattern.concat_dims, *IDENTICAL_DIMS, *SELECTED_VARS}) | ||
| | WriteCombinedReference( | ||
| concat_dims=CONCAT_DIMS, | ||
| identical_dims=IDENTICAL_DIMS, | ||
| store_name=SHORT_NAME | ||
| ) | ||
| | ConsolidateMetadata() | ||
| # | ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-90, 90), 'lon': (-180, 180)}) | ||
|
||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| s3fs==2023.10.0 | ||
| boto3==1.28.71 | ||
| python-cmr==0.9.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This depends on the main branch of pangeo-forge-recipes