11from __future__ import annotations
22
3+ import datetime as dt
4+ import shutil
35from pathlib import Path
46
7+ import dask .dataframe as dd
58import pandas as pd
9+ from loguru import logger
610
711from nsidc .iceflow .data .fetch import search_and_download
812from nsidc .iceflow .data .models import (
13+ BoundingBox ,
14+ Dataset ,
915 DatasetSearchParameters ,
1016 IceflowDataFrame ,
1117)
1218from nsidc .iceflow .data .read import read_data
1319from nsidc .iceflow .itrf .converter import transform_itrf
1420
1521
16- def fetch_iceflow_df (
22+ def _df_for_one_dataset (
1723 * ,
18- dataset_search_params : DatasetSearchParameters ,
24+ dataset : Dataset ,
25+ bounding_box : BoundingBox ,
26+ temporal : tuple [dt .datetime | dt .date , dt .datetime | dt .date ],
1927 output_dir : Path ,
20- output_itrf : str | None = None ,
28+ # TODO: also add option for target epoch!!
29+ output_itrf : str | None ,
2130) -> IceflowDataFrame :
22- """Search for data matching parameters and return an IceflowDataframe.
23-
24- Optionally transform data to the given ITRF for consistency.
25- """
26-
2731 results = search_and_download (
28- short_name = dataset_search_params . dataset .short_name ,
29- version = dataset_search_params . dataset .version ,
30- bounding_box = dataset_search_params . bounding_box ,
31- temporal = dataset_search_params . temporal ,
32+ short_name = dataset .short_name ,
33+ version = dataset .version ,
34+ bounding_box = bounding_box ,
35+ temporal = temporal ,
3236 output_dir = output_dir ,
3337 )
3438
3539 all_dfs = []
3640 for result in results :
37- data_df = read_data (dataset_search_params . dataset , result )
41+ data_df = read_data (dataset , result )
3842 all_dfs .append (data_df )
3943
4044 complete_df = IceflowDataFrame (pd .concat (all_dfs ))
@@ -46,3 +50,110 @@ def fetch_iceflow_df(
4650 )
4751
4852 return complete_df
53+
54+
55+ def fetch_iceflow_df (
56+ * ,
57+ dataset_search_params : DatasetSearchParameters ,
58+ output_dir : Path ,
59+ # TODO: also add option for target epoch!!
60+ output_itrf : str | None = None ,
61+ ) -> IceflowDataFrame :
62+ """Search for data matching parameters and return an IceflowDataframe.
63+
64+ Optionally transform data to the given ITRF for consistency.
65+
66+ Note: a potentially large amount of data may be returned, especially if the
67+ user requests a large spatial/temporal area across multiple datasets. The
68+ result may not even fit in memory!
69+
70+ Consider using `create_iceflow_parquet` to fetch and store data in parquet
71+ format.
72+ """
73+
74+ dfs = []
75+ for dataset in dataset_search_params .datasets :
76+ result = _df_for_one_dataset (
77+ dataset = dataset ,
78+ temporal = dataset_search_params .temporal ,
79+ bounding_box = dataset_search_params .bounding_box ,
80+ output_dir = output_dir ,
81+ output_itrf = output_itrf ,
82+ )
83+ dfs .append (result )
84+
85+ complete_df = IceflowDataFrame (pd .concat (dfs ))
86+
87+ return complete_df
88+
89+
90+ def create_iceflow_parquet (
91+ * ,
92+ dataset_search_params : DatasetSearchParameters ,
93+ output_dir : Path ,
94+ target_itrf : str ,
95+ overwrite : bool = False ,
96+ target_epoch : str | None = None ,
97+ ) -> Path :
98+ """Create a parquet file containing the lat/lon/elev data matching the dataset search params.
99+
100+ This function creates a parquet file that can be easily used alongside dask,
101+ containing lat/lon/elev data compatible with a comparison to icesat 2 data.
102+
103+ Note: this function writes a single `iceflow.parquet` to the output
104+ dir. This code does not currently support updates to the parquet after being
105+ written. This is intended to help facilitate analysis of a specific area
106+ over time. If an existing `iceflow.parquet` exists and the user wants to
107+ create a new `iceflow.parquet` for a different area or timespan, they will
108+ need to move/remove the existing `iceflow.parquet` first (e.g., with the
109+ `overwrite=True` kwarg).
110+ """
111+ output_subdir = output_dir / "iceflow.parquet"
112+ if output_subdir .exists ():
113+ if overwrite :
114+ logger .info ("Removing existing iceflow.parquet" )
115+ shutil .rmtree (output_subdir )
116+ else :
117+ raise RuntimeError (
118+ "An iceflow parquet file already exists. Use `overwrite=True` to overwrite."
119+ )
120+
121+ for dataset in dataset_search_params .datasets :
122+ results = search_and_download (
123+ short_name = dataset .short_name ,
124+ version = dataset .version ,
125+ temporal = dataset_search_params .temporal ,
126+ bounding_box = dataset_search_params .bounding_box ,
127+ output_dir = output_dir ,
128+ )
129+
130+ for result in results :
131+ data_df = read_data (dataset , result )
132+ df = IceflowDataFrame (data_df )
133+
134+ df = transform_itrf (
135+ data = df ,
136+ target_itrf = target_itrf ,
137+ target_epoch = target_epoch ,
138+ )
139+
140+ # Add a string col w/ dataset name and version.
141+ df ["dataset" ] = [f"{ dataset .short_name } v{ dataset .version } " ] * len (
142+ df .latitude
143+ )
144+ common_columns = ["latitude" , "longitude" , "elevation" , "dataset" ]
145+ common_dask_df = dd .from_pandas (df [common_columns ]) # type: ignore[attr-defined]
146+ if output_subdir .exists ():
147+ dd .to_parquet ( # type: ignore[attr-defined]
148+ df = common_dask_df ,
149+ path = output_subdir ,
150+ append = True ,
151+ ignore_divisions = True ,
152+ )
153+ else :
154+ dd .to_parquet ( # type: ignore[attr-defined]
155+ df = common_dask_df ,
156+ path = output_subdir ,
157+ )
158+
159+ return output_subdir
0 commit comments