Skip to content

Commit dc2b4ba

Browse files
samlamontmgdenno
andauthored
424 parsing xml fews timeseries is slow (#425)
* updated xml parser; hefs loading tests WIP * update null field checks, revert previous temp changes * remove previous xml parsing func * remove unneeded funcs * include timezone offset * parallelize timeseries loading from directory; xml parsing update * add max_workers argument * Update unique column set fields * Update null column checks * update test, add member to field check * update null column check * update max cpu count * Adds persist_dataframe argument for loading timeseries * Aggregate timeseries in cache according to file size * Add drop_duplicates test * change function name * fix function references * Update concurrent convert_timeseries logic * set default persist_dataframe to false * Validate dynamic overwrite * adds spark config. * notebook update * fix generate_weights_file() doc string * add drop_duplicates argument; remove null column checks * update write mode * remove unneeded if statement * start changelog updates * changelog updates * few more notes --------- Co-authored-by: Matt Denno <mgdenno@gmail.com>
1 parent 77430b3 commit dc2b4ba

24 files changed

+613
-274
lines changed

docs/sphinx/changelog/index.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
Release Notes
22
=============
33

4+
0.4.11 - 2025-05-19
5+
------------------
6+
7+
Changed
8+
^^^^^^^
9+
- Fixes bug in _write_spark_df() method in the BaseTable class that caused writing larger dataframes to fail.
10+
- Parallelizes `convert_single_timeseries()` when a directory is passed to the `in_path` argument.
11+
- Fixes doc string in `generate_weights_file()`
12+
- Switched to the built-in dropDuplicates() method in the `BaseTable` class to drop duplicates instead of using a custom implementation.
13+
- Added option to specify the number of partitions when writing dataframes in the BaseTable class.
14+
- Added the option to skip the dropDuplicates() method when writing dataframes in the BaseTable class.
15+
416
0.4.10 - 2025-04-14
517
------------------
618

docs/sphinx/user_guide/notebooks/03_introduction_class.ipynb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@
253253
"outputs": [],
254254
"source": [
255255
"# Filter using a raw SQL string\n",
256-
"ev.locations.filter(\"id = 'gage-A'\").to_geopandas()"
256+
"ev.locations.filter(\"id = 'usgs-A'\").to_geopandas()"
257257
]
258258
},
259259
{
@@ -266,7 +266,7 @@
266266
"ev.locations.filter({\n",
267267
" \"column\": \"id\",\n",
268268
" \"operator\": \"=\",\n",
269-
" \"value\": \"gage-A\"\n",
269+
" \"value\": \"usgs-A\"\n",
270270
"}).to_geopandas()"
271271
]
272272
},
@@ -286,7 +286,7 @@
286286
"lf = LocationFilter(\n",
287287
" column=fields.id,\n",
288288
" operator=Operators.eq,\n",
289-
" value=\"gage-A\"\n",
289+
" value=\"usgs-A\"\n",
290290
")\n",
291291
"ev.locations.filter(lf).to_geopandas()"
292292
]

poetry.lock

Lines changed: 155 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ bokeh = "^3.5.0"
3838
scoringrules = "^0.7.1"
3939
hvplot = "^0.11.1"
4040
geoviews = "^1.14.0"
41+
lxml = "^5.3.2"
4142

4243
[tool.poetry.group.test.dependencies]
4344
pytest = "^7.4.3"

src/teehr/const.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""This module contains constants
22
used throughout the package."""
3-
3+
import os
44

55
# Primary evaluation directories
66
DATASET_DIR = "dataset"
@@ -24,3 +24,5 @@
2424
LOADING_CACHE_DIR = "loading"
2525

2626
S3_EVALUATIONS_PATH = "s3://ciroh-rti-public-data/teehr-data-warehouse/v0_4_evaluations/evaluations.yaml"
27+
28+
MAX_CPUS = max(os.cpu_count() - 1, 1)

src/teehr/evaluation/evaluation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def __init__(
9393
.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
9494
.set("spark.sql.execution.arrow.pyspark.enabled", "true")
9595
.set("spark.sql.session.timeZone", "UTC")
96+
.set("spark.driver.host", "localhost")
9697
)
9798
self.spark = SparkSession.builder.config(conf=conf).getOrCreate()
9899

src/teehr/evaluation/fetch.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def usgs_streamflow(
131131
overwrite_output: Optional[bool] = False,
132132
timeseries_type: TimeseriesTypeEnum = "primary",
133133
write_mode: TableWriteEnum = "append",
134+
drop_duplicates: bool = True,
134135
):
135136
"""Fetch USGS gage data and load into the TEEHR dataset.
136137
@@ -175,6 +176,8 @@ def usgs_streamflow(
175176
that does not already exist.
176177
If "upsert", existing data will be replaced and new data that
177178
does not exist will be appended.
179+
drop_duplicates : bool
180+
Whether to drop duplicates in the data. Default is True.
178181
179182
180183
.. note::
@@ -268,7 +271,8 @@ def usgs_streamflow(
268271
self.usgs_cache_dir
269272
),
270273
timeseries_type=timeseries_type,
271-
write_mode=write_mode
274+
write_mode=write_mode,
275+
drop_duplicates=drop_duplicates
272276
)
273277

274278
def nwm_retrospective_points(
@@ -281,7 +285,8 @@ def nwm_retrospective_points(
281285
overwrite_output: Optional[bool] = False,
282286
domain: Optional[SupportedNWMRetroDomainsEnum] = "CONUS",
283287
timeseries_type: TimeseriesTypeEnum = "secondary",
284-
write_mode: TableWriteEnum = "append"
288+
write_mode: TableWriteEnum = "append",
289+
drop_duplicates: bool = True,
285290
):
286291
"""Fetch NWM retrospective point data and load into the TEEHR dataset.
287292
@@ -334,6 +339,8 @@ def nwm_retrospective_points(
334339
that does not already exist.
335340
If "upsert", existing data will be replaced and new data that
336341
does not exist will be appended.
342+
drop_duplicates : bool
343+
Whether to drop duplicates in the data. Default is True.
337344
338345
339346
.. note::
@@ -427,7 +434,8 @@ def nwm_retrospective_points(
427434
self.nwm_cache_dir
428435
),
429436
timeseries_type=timeseries_type,
430-
write_mode=write_mode
437+
write_mode=write_mode,
438+
drop_duplicates=drop_duplicates
431439
)
432440

433441
def nwm_retrospective_grids(
@@ -443,7 +451,8 @@ def nwm_retrospective_grids(
443451
location_id_prefix: Optional[str] = None,
444452
timeseries_type: TimeseriesTypeEnum = "primary",
445453
write_mode: TableWriteEnum = "append",
446-
zonal_weights_filepath: Optional[Union[Path, str]] = None
454+
zonal_weights_filepath: Optional[Union[Path, str]] = None,
455+
drop_duplicates: bool = True,
447456
):
448457
"""
449458
Fetch NWM retrospective gridded data, calculate zonal statistics (currently only
@@ -507,6 +516,8 @@ def nwm_retrospective_grids(
507516
The path to the zonal weights file. If None and calculate_zonal_weights
508517
is False, the weights file must exist in the cache for the configuration.
509518
Default is None.
519+
drop_duplicates : bool
520+
Whether to drop duplicates in the data. Default is True.
510521
511522
Examples
512523
--------
@@ -622,7 +633,8 @@ def nwm_retrospective_grids(
622633
self.nwm_cache_dir
623634
),
624635
timeseries_type=timeseries_type,
625-
write_mode=write_mode
636+
write_mode=write_mode,
637+
drop_duplicates=drop_duplicates
626638
)
627639

628640
def nwm_operational_points(
@@ -644,7 +656,8 @@ def nwm_operational_points(
644656
timeseries_type: TimeseriesTypeEnum = "secondary",
645657
starting_z_hour: Optional[int] = None,
646658
ending_z_hour: Optional[int] = None,
647-
write_mode: TableWriteEnum = "append"
659+
write_mode: TableWriteEnum = "append",
660+
drop_duplicates: bool = True,
648661
):
649662
"""Fetch operational NWM point data and load into the TEEHR dataset.
650663
@@ -732,6 +745,8 @@ def nwm_operational_points(
732745
that does not already exist.
733746
If "upsert", existing data will be replaced and new data that
734747
does not exist will be appended.
748+
drop_duplicates : bool
749+
Whether to drop duplicates in the data. Default is True.
735750
736751
737752
.. note::
@@ -863,7 +878,8 @@ def nwm_operational_points(
863878
self.nwm_cache_dir
864879
),
865880
timeseries_type=timeseries_type,
866-
write_mode=write_mode
881+
write_mode=write_mode,
882+
drop_duplicates=drop_duplicates
867883
)
868884

869885
def nwm_operational_grids(
@@ -887,6 +903,7 @@ def nwm_operational_grids(
887903
ending_z_hour: Optional[int] = None,
888904
write_mode: TableWriteEnum = "append",
889905
zonal_weights_filepath: Optional[Union[Path, str]] = None,
906+
drop_duplicates: bool = True,
890907
):
891908
"""
892909
Fetch NWM operational gridded data, calculate zonal statistics (currently only
@@ -980,6 +997,8 @@ def nwm_operational_grids(
980997
The path to the zonal weights file. If None and calculate_zonal_weights
981998
is False, the weights file must exist in the cache for the configuration.
982999
Default is None.
1000+
drop_duplicates : bool
1001+
Whether to drop duplicates in the data. Default is True.
9831002
9841003
9851004
.. note::
@@ -1135,5 +1154,6 @@ def nwm_operational_grids(
11351154
ev=self.ev,
11361155
in_path=Path(self.nwm_cache_dir),
11371156
timeseries_type=timeseries_type,
1138-
write_mode=write_mode
1157+
write_mode=write_mode,
1158+
drop_duplicates=drop_duplicates
11391159
)

0 commit comments

Comments
 (0)