Skip to content

Commit 6e91465

Browse files
Merge pull request #49 from nsidc/support-fetch-across-datasets
Support fetching data across datasets and writing data to parquet
2 parents 6e841ef + b0f2b42 commit 6e91465

File tree

6 files changed

+205
-34
lines changed

6 files changed

+205
-34
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
`target_epoch`.
77
- Add support for ILATM1B v2 and BLATM1B v1.
88
- Add support for ILVIS2 v1 and v2.
9+
- Improve API, providing ability to search across datasets and save to
10+
intermediate parquet file.
911

1012
# v0.2.0
1113

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ dependencies = [
3333
"shapely >=2.0.5",
3434
"pandera[mypy] >= 0.20.3",
3535
"pydantic >=2.8.2",
36-
"loguru",
36+
"loguru >=0.7.2",
37+
"dask[dataframe] >=2024.10.0",
3738
]
3839

3940
[project.urls]
@@ -117,6 +118,8 @@ module = [
117118
"h5py.*",
118119
"numpy.*",
119120
"gps_timemachine.*",
121+
"dask.*",
122+
"dask.dataframe.*",
120123
]
121124
disallow_incomplete_defs = true
122125
ignore_missing_imports = true

src/nsidc/iceflow/api.py

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,44 @@
11
from __future__ import annotations
22

3+
import datetime as dt
4+
import shutil
35
from pathlib import Path
46

7+
import dask.dataframe as dd
58
import pandas as pd
9+
from loguru import logger
610

711
from nsidc.iceflow.data.fetch import search_and_download
812
from nsidc.iceflow.data.models import (
13+
BoundingBox,
14+
Dataset,
915
DatasetSearchParameters,
1016
IceflowDataFrame,
1117
)
1218
from nsidc.iceflow.data.read import read_data
1319
from nsidc.iceflow.itrf.converter import transform_itrf
1420

1521

16-
def fetch_iceflow_df(
22+
def _df_for_one_dataset(
1723
*,
18-
dataset_search_params: DatasetSearchParameters,
24+
dataset: Dataset,
25+
bounding_box: BoundingBox,
26+
temporal: tuple[dt.datetime | dt.date, dt.datetime | dt.date],
1927
output_dir: Path,
20-
output_itrf: str | None = None,
28+
# TODO: also add option for target epoch!!
29+
output_itrf: str | None,
2130
) -> IceflowDataFrame:
22-
"""Search for data matching parameters and return an IceflowDataframe.
23-
24-
Optionally transform data to the given ITRF for consistency.
25-
"""
26-
2731
results = search_and_download(
28-
short_name=dataset_search_params.dataset.short_name,
29-
version=dataset_search_params.dataset.version,
30-
bounding_box=dataset_search_params.bounding_box,
31-
temporal=dataset_search_params.temporal,
32+
short_name=dataset.short_name,
33+
version=dataset.version,
34+
bounding_box=bounding_box,
35+
temporal=temporal,
3236
output_dir=output_dir,
3337
)
3438

3539
all_dfs = []
3640
for result in results:
37-
data_df = read_data(dataset_search_params.dataset, result)
41+
data_df = read_data(dataset, result)
3842
all_dfs.append(data_df)
3943

4044
complete_df = IceflowDataFrame(pd.concat(all_dfs))
@@ -46,3 +50,110 @@ def fetch_iceflow_df(
4650
)
4751

4852
return complete_df
53+
54+
55+
def fetch_iceflow_df(
56+
*,
57+
dataset_search_params: DatasetSearchParameters,
58+
output_dir: Path,
59+
# TODO: also add option for target epoch!!
60+
output_itrf: str | None = None,
61+
) -> IceflowDataFrame:
62+
"""Search for data matching parameters and return an IceflowDataframe.
63+
64+
Optionally transform data to the given ITRF for consistency.
65+
66+
Note: a potentially large amount of data may be returned, especially if the
67+
user requests a large spatial/temporal area across multiple datasets. The
68+
result may not even fit in memory!
69+
70+
Consider using `create_iceflow_parquet` to fetch and store data in parquet
71+
format.
72+
"""
73+
74+
dfs = []
75+
for dataset in dataset_search_params.datasets:
76+
result = _df_for_one_dataset(
77+
dataset=dataset,
78+
temporal=dataset_search_params.temporal,
79+
bounding_box=dataset_search_params.bounding_box,
80+
output_dir=output_dir,
81+
output_itrf=output_itrf,
82+
)
83+
dfs.append(result)
84+
85+
complete_df = IceflowDataFrame(pd.concat(dfs))
86+
87+
return complete_df
88+
89+
90+
def create_iceflow_parquet(
91+
*,
92+
dataset_search_params: DatasetSearchParameters,
93+
output_dir: Path,
94+
target_itrf: str,
95+
overwrite: bool = False,
96+
target_epoch: str | None = None,
97+
) -> Path:
98+
"""Create a parquet dataset containing the lat/lon/elev data matching the dataset search params.
99+
100+
This function creates a parquet dataset that can be easily used alongside dask,
101+
containing lat/lon/elev data.
102+
103+
Note: this function writes a single `iceflow.parquet` to the output
104+
dir. This code does not currently support updates to the parquet after being
105+
written. This is intended to help facilitate analysis of a specific area
106+
over time. If an existing `iceflow.parquet` exists and the user wants to
107+
create a new `iceflow.parquet` for a different area or timespan, they will
108+
need to move/remove the existing `iceflow.parquet` first (e.g., with the
109+
`overwrite=True` kwarg).
110+
"""
111+
output_subdir = output_dir / "iceflow.parquet"
112+
if output_subdir.exists():
113+
if overwrite:
114+
logger.info("Removing existing iceflow.parquet")
115+
shutil.rmtree(output_subdir)
116+
else:
117+
raise RuntimeError(
118+
"An iceflow parquet file already exists. Use `overwrite=True` to overwrite."
119+
)
120+
121+
for dataset in dataset_search_params.datasets:
122+
results = search_and_download(
123+
short_name=dataset.short_name,
124+
version=dataset.version,
125+
temporal=dataset_search_params.temporal,
126+
bounding_box=dataset_search_params.bounding_box,
127+
output_dir=output_dir,
128+
)
129+
130+
for result in results:
131+
data_df = read_data(dataset, result)
132+
df = IceflowDataFrame(data_df)
133+
134+
df = transform_itrf(
135+
data=df,
136+
target_itrf=target_itrf,
137+
target_epoch=target_epoch,
138+
)
139+
140+
# Add a string col w/ dataset name and version.
141+
df["dataset"] = [f"{dataset.short_name}v{dataset.version}"] * len(
142+
df.latitude
143+
)
144+
common_columns = ["latitude", "longitude", "elevation", "dataset"]
145+
common_dask_df = dd.from_pandas(df[common_columns]) # type: ignore[attr-defined]
146+
if output_subdir.exists():
147+
dd.to_parquet( # type: ignore[attr-defined]
148+
df=common_dask_df,
149+
path=output_subdir,
150+
append=True,
151+
ignore_divisions=True,
152+
)
153+
else:
154+
dd.to_parquet( # type: ignore[attr-defined]
155+
df=common_dask_df,
156+
path=output_subdir,
157+
)
158+
159+
return output_subdir

src/nsidc/iceflow/data/fetch.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55

66
import earthaccess
7+
from loguru import logger
78

89
from nsidc.iceflow.data.models import BoundingBox
910

@@ -21,26 +22,47 @@ def search_and_download(
2122
Wraps EDL auth and CMR search using `earthaccess`.
2223
2324
Data matching the given parameters are downloaded to a subfolder of the
24-
given `output_dir` named after th e`short_name`.
25+
given `output_dir` named after the `short_name`.
2526
"""
2627
earthaccess.login()
2728

28-
results = earthaccess.search_data(
29-
short_name=short_name,
30-
version=version,
31-
bounding_box=(
32-
bounding_box.lower_left_lon,
33-
bounding_box.lower_left_lat,
34-
bounding_box.upper_right_lon,
35-
bounding_box.upper_right_lat,
36-
),
37-
temporal=temporal,
38-
)
29+
ctx_string = f"{short_name=} {version=} with {bounding_box=} {temporal=}"
30+
31+
try:
32+
results = earthaccess.search_data(
33+
short_name=short_name,
34+
version=version,
35+
bounding_box=(
36+
bounding_box.lower_left_lon,
37+
bounding_box.lower_left_lat,
38+
bounding_box.upper_right_lon,
39+
bounding_box.upper_right_lat,
40+
),
41+
temporal=temporal,
42+
)
43+
except IndexError:
44+
# There's no data matching the given parameters.
45+
logger.error(f"Found no results for {ctx_string}")
46+
return []
47+
48+
num_results = len(results)
49+
50+
if not num_results:
51+
logger.error(f"Found no results for {ctx_string}")
52+
return []
3953

4054
# short_name based subdir for data.
4155
output_subdir = output_dir / short_name
56+
logger.info(
57+
f"Found {num_results} granules for {ctx_string}."
58+
f" Downloading to {output_subdir}."
59+
)
60+
4261
output_subdir.mkdir(exist_ok=True)
4362
downloaded_files = earthaccess.download(results, str(output_subdir))
4463
downloaded_filepaths = [Path(filepath_str) for filepath_str in downloaded_files]
64+
# There may be duplicate filepaths returned by earthaccess because of data
65+
# existing both in the cloud and on ECS.
66+
downloaded_filepaths = list(set(downloaded_filepaths))
4567

4668
return downloaded_filepaths

src/nsidc/iceflow/data/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,6 @@ class BoundingBox(pydantic.BaseModel):
232232

233233

234234
class DatasetSearchParameters(pydantic.BaseModel):
235-
dataset: Dataset
235+
datasets: list[Dataset]
236236
bounding_box: BoundingBox
237237
temporal: tuple[dt.datetime | dt.date, dt.datetime | dt.date]

tests/integration/test_e2e.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212

1313
import datetime as dt
1414

15+
import dask.dataframe as dd
1516
import pandas as pd
1617

17-
from nsidc.iceflow.api import fetch_iceflow_df
18+
from nsidc.iceflow.api import create_iceflow_parquet, fetch_iceflow_df
1819
from nsidc.iceflow.data.models import (
1920
BLATM1BDataset,
2021
BoundingBox,
@@ -38,7 +39,7 @@ def test_atm1b_ilatm1b(tmp_path):
3839
# Native ITRF is ITRF2005
3940
results_ilatm1b_v1_2009 = fetch_iceflow_df(
4041
dataset_search_params=DatasetSearchParameters(
41-
dataset=ILATM1BDataset(version="1"),
42+
datasets=[ILATM1BDataset(version="1")],
4243
bounding_box=common_bounding_box,
4344
temporal=(dt.date(2009, 11, 1), dt.date(2009, 12, 1)),
4445
),
@@ -49,7 +50,7 @@ def test_atm1b_ilatm1b(tmp_path):
4950
# Native ITRF is ITRF2008
5051
results_ilatm1b_v2_2014 = fetch_iceflow_df(
5152
dataset_search_params=DatasetSearchParameters(
52-
dataset=ILATM1BDataset(version="2"),
53+
datasets=[ILATM1BDataset(version="2")],
5354
bounding_box=common_bounding_box,
5455
temporal=(dt.date(2014, 11, 1), dt.date(2014, 12, 1)),
5556
),
@@ -74,7 +75,7 @@ def test_atm1b_blatm1b(tmp_path):
7475

7576
results_blamt1b_v2_2014 = fetch_iceflow_df(
7677
dataset_search_params=DatasetSearchParameters(
77-
dataset=BLATM1BDataset(),
78+
datasets=[BLATM1BDataset()],
7879
bounding_box=common_bounding_box,
7980
temporal=(dt.date(2002, 11, 27), dt.date(2002, 11, 28)),
8081
),
@@ -87,7 +88,7 @@ def test_atm1b_blatm1b(tmp_path):
8788
def test_ivlis2(tmp_path):
8889
results_v1 = fetch_iceflow_df(
8990
dataset_search_params=DatasetSearchParameters(
90-
dataset=ILVIS2Dataset(version="1"),
91+
datasets=[ILVIS2Dataset(version="1")],
9192
bounding_box=BoundingBox(
9293
lower_left_lon=-120.0,
9394
lower_left_lat=-80.0,
@@ -103,7 +104,7 @@ def test_ivlis2(tmp_path):
103104

104105
results_v2 = fetch_iceflow_df(
105106
dataset_search_params=DatasetSearchParameters(
106-
dataset=ILVIS2Dataset(version="2"),
107+
datasets=[ILVIS2Dataset(version="2")],
107108
bounding_box=BoundingBox(
108109
lower_left_lon=-180,
109110
lower_left_lat=60.0,
@@ -133,7 +134,7 @@ def test_glah06(tmp_path):
133134

134135
results = fetch_iceflow_df(
135136
dataset_search_params=DatasetSearchParameters(
136-
dataset=GLAH06Dataset(),
137+
datasets=[GLAH06Dataset()],
137138
bounding_box=common_bounding_box,
138139
temporal=(
139140
dt.datetime(2003, 2, 20, 22, 25),
@@ -144,3 +145,35 @@ def test_glah06(tmp_path):
144145
)
145146

146147
assert (results.ITRF == "ITRF2008").all()
148+
149+
150+
def test_create_iceflow_parquet(tmp_path):
151+
target_itrf = "ITRF2014"
152+
common_bounding_box = BoundingBox(
153+
lower_left_lon=-49.149,
154+
lower_left_lat=69.186,
155+
upper_right_lon=-48.949,
156+
upper_right_lat=69.238,
157+
)
158+
159+
# This should finds 4 results for ILATM1B v1 and 3 results for v2.
160+
parquet_path = create_iceflow_parquet(
161+
dataset_search_params=DatasetSearchParameters(
162+
datasets=[ILATM1BDataset(version="1"), ILATM1BDataset(version="2")],
163+
bounding_box=common_bounding_box,
164+
temporal=((dt.date(2007, 1, 1), dt.date(2014, 10, 28))),
165+
),
166+
output_dir=tmp_path,
167+
target_itrf=target_itrf,
168+
)
169+
170+
df = dd.read_parquet(parquet_path) # type: ignore[attr-defined]
171+
172+
# Assert that the parquet data has the expected columns
173+
expected_columns = sorted(["latitude", "longitude", "elevation", "dataset"])
174+
assert expected_columns == sorted(df.columns)
175+
176+
# Assert that the two datasets we expect are present.
177+
assert sorted(["ILATM1Bv1", "ILATM1Bv2"]) == sorted(
178+
df.dataset.unique().compute().values
179+
)

0 commit comments

Comments
 (0)