diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 39b758f..e4477af 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -15,10 +15,12 @@ get_codes, get_continuous, get_daily, + get_date_range_stats, get_field_measurements, get_latest_continuous, get_latest_daily, get_monitoring_locations, + get_por_stats, get_samples, get_time_series_metadata, ) @@ -33,10 +35,12 @@ "get_codes", "get_continuous", "get_daily", + "get_date_range_stats", "get_field_measurements", "get_latest_continuous", "get_latest_daily", "get_monitoring_locations", + "get_por_stats", "get_samples", "get_time_series_metadata", "_check_profiles", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 63f7b81..7f04f9a 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -20,7 +20,11 @@ PROFILES, SERVICES, ) -from dataretrieval.waterdata.utils import SAMPLES_URL, get_ogc_data +from dataretrieval.waterdata.utils import ( + SAMPLES_URL, + get_ogc_data, + get_stats_data +) # Set up logger for this module logger = logging.getLogger(__name__) @@ -1641,6 +1645,210 @@ def get_samples( return df, BaseMetadata(response) +def get_por_stats( + approval_status: Optional[str] = None, + computation_type: Optional[Union[str, list[str]]] = None, + country_code: Optional[Union[str, list[str]]] = None, + state_code: Optional[Union[str, list[str]]] = None, + county_code: Optional[Union[str, list[str]]] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + monitoring_location_id: Optional[Union[str, list[str]]] = None, + page_size: int = 1000, + parent_time_series_id: Optional[Union[str, list[str]]] = None, + site_type_code: Optional[Union[str, list[str]]] = None, + site_type_name: Optional[Union[str, list[str]]] = None, + parameter_code: Optional[Union[str, list[str]]] = None, + expand_percentiles: bool = True + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """Get water data statistics from the USGS Water Data API. + This service provides endpoints for access to computations on the + historical record regarding water conditions, including minimum, maximum, + mean, median, and percentiles for day of year, month, month-year, and + water/calendar years. For more information regarding the calculation of + statistics and other details, please visit the Statistics documentation + page: https://waterdata.usgs.gov/statistics-documentation/. + + Note: This API is under active beta development and subject to + change. Improved handling of significant figures will be + addressed in a future release. + + Parameters + ---------- + service: string, One of the following options: "observationNormals" + or "observationIntervals". "observationNormals" returns + day-of-year and month-of-year statistics matching your query, + while "observationIntervals" returns monthly and annual statistics + matching your query. + approval_status: string, optional + Whether to include approved and/or provisional observations. + At this time, only approved observations are returned. + computation_type: string, optional + Desired statistical computation method. Available values are: + arithmetic_mean, maximum, median, minimum, percentile. + country_code: string, optional + Country query parameter. API defaults to "US". + state_code: string, optional + State query parameter. Takes the format "US:XX", where XX is + the two-digit state code. API defaults to "US:42" (Pennsylvania). + county_code: string, optional + County query parameter. Takes the format "US:XX:YYY", where XX is + the two-digit state code and YYY is the three-digit county code. + API defaults to "US:42:103" (Pennsylvania, Pike County). + start_date: string or datetime, optional + Start day for the query in the month-day format (MM-DD). + end_date: string or datetime, optional + End day for the query in the month-day format (MM-DD). + monitoring_location_id : string or list of strings, optional + A unique identifier representing a single monitoring location. This + corresponds to the id field in the monitoring-locations endpoint. + Monitoring location IDs are created by combining the agency code of the + agency responsible for the monitoring location (e.g. USGS) with the ID + number of the monitoring location (e.g. 02238500), separated by a hyphen + (e.g. USGS-02238500). + page_size : int, optional + The number of results to return per page, where one result represents a + monitoring location. The default is 1000. + parent_time_series_id: string, optional + The parent_time_series_id returns statistics tied to a particular datbase entry. + site_type_code: string, optional + Site type code query parameter. You can see a list of valid site type codes here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "GW" (Groundwater site) + site_type_name: string, optional + Site type name query parameter. You can see a list of valid site type names here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "Well" + parameter_code : string or list of strings, optional + Parameter codes are 5-digit codes used to identify the constituent + measured and the units of measure. A complete list of parameter codes + and associated groupings can be found at + https://help.waterdata.usgs.gov/codes-and-parameters/parameters. + expand_percentiles : boolean + Percentile data for a given day of year or month of year by default + are returned from the service as lists of string values and percentile + thresholds in the "values" and "percentiles" columns, respectively. + When `expand_percentiles` is set to True (default), each value and + percentile threshold specific to a computation id are returned as + individual rows in the dataframe. Missing percentile values expressed + as 'nan' in the list of string values are removed from the dataframe + to save space. + """ + params = { + k: v + for k, v in locals().items() + if k not in ["expand_percentiles"] and v is not None + } + + return get_stats_data( + args=params, + service="observationNormals", + expand_percentiles=expand_percentiles + ) + +def get_date_range_stats( + approval_status: Optional[str] = None, + computation_type: Optional[Union[str, list[str]]] = None, + country_code: Optional[Union[str, list[str]]] = None, + state_code: Optional[Union[str, list[str]]] = None, + county_code: Optional[Union[str, list[str]]] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + monitoring_location_id: Optional[Union[str, list[str]]] = None, + page_size: int = 1000, + parent_time_series_id: Optional[Union[str, list[str]]] = None, + site_type_code: Optional[Union[str, list[str]]] = None, + site_type_name: Optional[Union[str, list[str]]] = None, + parameter_code: Optional[Union[str, list[str]]] = None, + expand_percentiles: bool = True + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """Get water data statistics from the USGS Water Data API. + This service provides endpoints for access to computations on the + historical record regarding water conditions, including minimum, maximum, + mean, median, and percentiles for day of year, month, month-year, and + water/calendar years. For more information regarding the calculation of + statistics and other details, please visit the Statistics documentation + page: https://waterdata.usgs.gov/statistics-documentation/. + + Note: This API is under active beta development and subject to + change. Improved handling of significant figures will be + addressed in a future release. + + Parameters + ---------- + service: string, One of the following options: "observationNormals" + or "observationIntervals". "observationNormals" returns + day-of-year and month-of-year statistics matching your query, + while "observationIntervals" returns monthly and annual statistics + matching your query. + approval_status: string, optional + Whether to include approved and/or provisional observations. + At this time, only approved observations are returned. + computation_type: string, optional + Desired statistical computation method. Available values are: + arithmetic_mean, maximum, median, minimum, percentile. + country_code: string, optional + Country query parameter. API defaults to "US". + state_code: string, optional + State query parameter. Takes the format "US:XX", where XX is + the two-digit state code. API defaults to "US:42" (Pennsylvania). + county_code: string, optional + County query parameter. Takes the format "US:XX:YYY", where XX is + the two-digit state code and YYY is the three-digit county code. + API defaults to "US:42:103" (Pennsylvania, Pike County). + start_date: string or datetime, optional + Start date for the query in the year-month-day format + (YYYY-MM-DD). + end_date: string or datetime, optional + End date for the query in the year-month-day format + (YYYY-MM-DD). + monitoring_location_id : string or list of strings, optional + A unique identifier representing a single monitoring location. This + corresponds to the id field in the monitoring-locations endpoint. + Monitoring location IDs are created by combining the agency code of the + agency responsible for the monitoring location (e.g. USGS) with the ID + number of the monitoring location (e.g. 02238500), separated by a hyphen + (e.g. USGS-02238500). + page_size : int, optional + The number of results to return per page, where one result represents a + monitoring location. The default is 1000. + parent_time_series_id: string, optional + The parent_time_series_id returns statistics tied to a particular datbase entry. + site_type_code: string, optional + Site type code query parameter. You can see a list of valid site type codes here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "GW" (Groundwater site) + site_type_name: string, optional + Site type name query parameter. You can see a list of valid site type names here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "Well" + parameter_code : string or list of strings, optional + Parameter codes are 5-digit codes used to identify the constituent + measured and the units of measure. A complete list of parameter codes + and associated groupings can be found at + https://help.waterdata.usgs.gov/codes-and-parameters/parameters. + expand_percentiles : boolean + Percentile data for a given day of year or month of year by default + are returned from the service as lists of string values and percentile + thresholds in the "values" and "percentiles" columns, respectively. + When `expand_percentiles` is set to True (default), each value and + percentile threshold specific to a computation id are returned as + individual rows in the dataframe. Missing percentile values expressed + as 'nan' in the list of string values are removed from the dataframe + to save space. + """ + params = { + k: v + for k, v in locals().items() + if k not in ["expand_percentiles"] and v is not None + } + + return get_stats_data( + args=params, + service="observationIntervals", + expand_percentiles=expand_percentiles + ) + def _check_profiles( service: SERVICES, diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 46d58b6..0eb4a75 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -1,6 +1,5 @@ import json import logging -import warnings import os import re from datetime import datetime @@ -27,6 +26,8 @@ OGC_API_VERSION = "v0" OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}" SAMPLES_URL = f"{BASE_URL}/samples-data" +STATISTICS_API_VERSION = "v0" +STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}" def _switch_arg_id(ls: Dict[str, Any], id_name: str, service: str): @@ -542,7 +543,7 @@ def _walk_pages( Raises ------ Exception - If a request fails or returns a non-200 status code. + If a request fails/returns a non-200 status code. """ logger.info("Requesting: %s", req.url) @@ -579,14 +580,11 @@ def _walk_pages( headers=headers, data=content if method == "POST" else None, ) - if resp.status_code != 200: - error_text = _error_body(resp) - raise Exception(error_text) df1 = _get_resp_data(resp, geopd=geopd) dfs = pd.concat([dfs, df1], ignore_index=True) curr_url = _next_req_url(resp) except Exception: - warnings.warn(f"{error_text}. Data request incomplete.") + error_text = _error_body(resp) logger.error("Request incomplete. %s", error_text) logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) curr_url = None @@ -820,4 +818,193 @@ def get_ogc_data( metadata = BaseMetadata(response) return return_list, metadata +def _handle_stats_nesting( + body: Dict[str, Any], + geopd: bool = False, +) -> pd.DataFrame: + """ + Takes nested json from stats service and flattens into a dataframe with + one row per monitoring location, parameter, and statistic. + + Parameters + ---------- + body : Dict[str, Any] + The JSON response body from the statistics service containing nested data. + + Returns + ------- + pd.DataFrame + A DataFrame containing the flattened statistical data. + """ + if body is None: + return pd.DataFrame() + + if not geopd: + logger.info( + "Geopandas not installed. Geometries will be flattened into pandas DataFrames." + ) + + # If geopandas not installed, return a pandas dataframe + # otherwise return a geodataframe + if not geopd: + df = pd.json_normalize(body['features']).drop(columns=['type', 'properties.data']) + df.columns = df.columns.str.split('.').str[-1] + else: + df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data']) + + # Unnest json features, properties, data, and values while retaining necessary + # metadata to merge with main dataframe. + dat = pd.json_normalize( + body, + record_path=["features", "properties", "data", "values"], + meta=[ + ["features", "properties", "monitoring_location_id"], + ["features", "properties", "data", "parameter_code"], + ["features", "properties", "data", "unit_of_measure"], + ["features", "properties", "data", "parent_time_series_id"], + #["features", "geometry", "coordinates"], + ], + meta_prefix="", + errors="ignore", + ) + dat.columns = dat.columns.str.split('.').str[-1] + + return df.merge(dat, on='monitoring_location_id', how='left') + + +def _expand_percentiles(df: pd.DataFrame) -> pd.DataFrame: + """ + Takes percentile value and thresholds columns containing lists + of values and turns each list element into its own row in the + original dataframe. 'nan's are removed from the dataframe. + + Parameters + ---------- + df : pd.DataFrame + The dataframe returned from using one of the statistics services. + + Returns + ------- + pd.DataFrame + A DataFrame containing the flattened percentile data. + """ + if len(df) > 0 and "percentile" in df['computation'].unique(): + + # Explode percentile lists into rows called "value" and "percentile" + percentiles = df.loc[df['computation'] == "percentile"] + percentiles_explode = percentiles[['computation_id', 'values', 'percentiles']].explode(['values', 'percentiles'], ignore_index=True) + percentiles_explode = percentiles_explode.loc[percentiles_explode['values']!="nan"] + percentiles_explode['value'] = pd.to_numeric(percentiles_explode['values']) + percentiles_explode['percentile'] = pd.to_numeric(percentiles_explode['percentiles']) + percentiles_explode = percentiles_explode.drop(columns=['values', 'percentiles']) + + # Merge exploded values back to other metadata/geometry + percentiles = percentiles.drop(columns=['values', 'percentiles', 'value']).merge(percentiles_explode, on='computation_id', how='left') + + # Concatenate back to original + dfs = pd.concat([df.loc[df['computation'] != "percentile"], percentiles]).drop(columns=['values', 'percentiles']) + + # Move percentile column + cols = dfs.columns.tolist() + cols.remove("percentile") + col_index = cols.index("value") + 1 + cols.insert(col_index, "percentile") + + return dfs[cols] + + else: + return df + +def get_stats_data( + args: Dict[str, Any], + service: str, + expand_percentiles: bool, + client: Optional[requests.Session] = None, + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """ + Retrieves statistical data from a specified water data endpoint and returns it as a pandas DataFrame with metadata. + + This function prepares request arguments, constructs API requests, handles pagination, processes the results, + and formats the output DataFrame according to the specified parameters. + + Parameters + ---------- + args : Dict[str, Any] + Dictionary of request arguments for the statistics service. + service : str + The statistics service type (e.g., "observationNormals", "observationIntervals"). + + Returns + ------- + pd.DataFrame + A DataFrame containing the retrieved and processed statistical data. + BaseMetadata + A metadata object containing request information including URL and query time. + """ + + url = f"{STATISTICS_API_URL}/{service}" + + headers = _default_headers() + + request = requests.Request( + method="GET", + url=url, + headers=headers, + params=args, + ) + req = request.prepare() + logger.info("Request: %s", req.url) + + # create temp client if not provided + # and close it after the request is done + close_client = client is None + client = client or requests.Session() + + try: + resp = client.send(req) + if resp.status_code != 200: + raise Exception(_error_body(resp)) + + # Store the initial response for metadata + initial_response = resp + + # Grab some aspects of the original request: headers and the + # request type (GET or POST) + method = req.method.upper() + headers = dict(req.headers) + + body = resp.json() + dfs = _handle_stats_nesting(body, geopd=GEOPANDAS) + + # Look for a next code in the response body + next_token = body['next'] + + while next_token: + args['next_token'] = next_token + + try: + resp = client.request( + method, + url=url, + params=args, + headers=headers, + ) + body = resp.json() + df1 = _handle_stats_nesting(body, geopd=False) + dfs = pd.concat([dfs, df1], ignore_index=True) + next_token = body['next'] + except Exception: + error_text = _error_body(resp) + logger.error("Request incomplete. %s", error_text) + logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url) + next_token = None + + if expand_percentiles: + dfs = _expand_percentiles(dfs) + + return dfs, BaseMetadata(initial_response) + finally: + if close_client: + client.close() +