diff --git a/common/config/attributes.py b/common/config/attributes.py index b19a25e..84de2ed 100644 --- a/common/config/attributes.py +++ b/common/config/attributes.py @@ -1,6 +1,6 @@ from pathlib import Path from typing import Annotated, Any - +import yaml import xarray as xr from pydantic import BaseModel, Field @@ -25,6 +25,25 @@ class NcAttributes(BaseModel): Field(description="Variable-specific attributes", default_factory=dict), ] + + additional_attributes_path: Annotated[ + str, + Field( + description="Path to the file that defines additional attributes for source" + ) + ] = None + + def add_attributes_from_yaml(self): + + if self.additional_attributes_path is not None: + with Path.open(self.additional_attributes_path, 'r') as file: + data = yaml.safe_load(file) + if 'global_attributes' in data: + self.global_attributes = data['global_attributes'] | self.global_attributes + if 'variable_attributes' in data: + self.variables = data['variable_attributes'] | self.variables + + def apply_to_dataset(self, ds: xr.Dataset): """Apply the configured attributes to an xarray Dataset""" for var_name, attrs in self.variables.items(): diff --git a/common/config/mappings.py b/common/config/mappings.py index efbce82..51a7739 100644 --- a/common/config/mappings.py +++ b/common/config/mappings.py @@ -73,5 +73,72 @@ class OptionalDepthMappingMixin: list[DepthGroup] | None, Field( description="Depth mappings for variables with multiple depth levels", + ), ] = None + + + +class SplitOperator(BaseModel): + ''' Takes the source variable, splits it on the separator and maps the resulting array to new variables''' + + sep : Annotated [ + str, + Field(description="The separator"), + ] + + output_variables : Annotated[ + dict[int,str], + Field(description="Mapping of index number to output variable." ), + ] + + + source_variable : Annotated[ + str, + Field(description="The source variable to split into multiple columns"), + ] + +class SplitOperations(BaseModel): + split_operations : Annotated[ + list[SplitOperator], + Field(description="List of variables to split into multiple variables"), + ] + + +class VariableConverterMixIn: + ''' Mixin to add column conversion rules to a dataset ''' + variable_converter : Annotated [ + SplitOperations, + Field( + description="Split variable converter"), + ] =None + + + +class ProfileDepthMappings(BaseModel): + depth : Annotated [ + float, + Field( + description="Optional- fixed depth for the mapping." + ), + ] = None + + mappings : Annotated [ + dict[str,str], + Field( + description="Maps input variables to output variables at the current depth ", + ), + ] + + + +class OptionalProfileDepthMixin: + ''' Mixin to add profile depth mappings configuration to a dataset''' + + profile_data: Annotated[ + list[ProfileDepthMappings], + Field( + description="Mapping for variables with multiple depth levels", + ), + ] =None + \ No newline at end of file diff --git a/common/readers/pandas_csv.py b/common/readers/pandas_csv.py index ce3064a..e3e66f7 100644 --- a/common/readers/pandas_csv.py +++ b/common/readers/pandas_csv.py @@ -15,6 +15,7 @@ class PandasCSVReader(BaseModel): comment: Annotated[str | None, Field(description="CSV line comment character")] = ( None ) + na_values: str = "None" # delim_whitespace # skiprows # sep @@ -24,4 +25,5 @@ class PandasCSVReader(BaseModel): def read_df(self, file_path) -> pd.DataFrame: """Read a CSV file from S3 into a Pandas DataFrame""" reader_kwargs = self.model_dump() + return pd.read_csv(file_path, **reader_kwargs) diff --git a/pipeline/s3_timeseries/datasets_config/cvow.json b/pipeline/s3_timeseries/datasets_config/cvow.json new file mode 100644 index 0000000..8d1981e --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/cvow.json @@ -0,0 +1,88 @@ +{ + "reader": { + "sep": ",", + "comment": "#", + "na_values": "No data" + }, + "station": "CVOW Wave Rider 1", + "drop_vars": [ + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV DATAWELL DIRECTIONAL SPECTRUM () (last)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV PEAK SPECTRAL DENSITY POWER (m2/Hz) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WATER CURRENT VERTICAL MEAN SPEED (m/s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE M01 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE M13 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE M24 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE MM10 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE MM20 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE PEAK PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV LOCATION () (last)", + "CVOW WRB01_DWR4_HQ PRV PEAK SPECTRAL DENSITY POWER (m2/Hz) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WATER CURRENT MEAN DIRECTION (Deg) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WATER CURRENT MEAN SPEED (m/s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WATER TEMPERATURE (Cel) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE M01 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE M13 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE M24 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE MEAN DIRECTION (Deg) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE MM10 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE MM20 PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE PEAK PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE PERIOD (s) (nodatamode_repeat)", + "CVOW WRB01_DWR4_HQ PRV WAVE SIGNIFICANT HEIGHT (m) (nodatamode_repeat)" + ], + "s3_source": { + "bucket": "ott-cvow-temp", + "prefix": "/" + }, + "attributes": { + "additional_attributes_path": "datasets_config/cvow.yaml" + }, + "start_date": "2025-05-02", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "CVOW_{partition_date:%Y-%m-%d}*.csv" + }, + "source_time_var": "Time range", + "variable_mappings": [ + { + "output": "time", + "source": "Time range" + }, + { + "output": "WATER_CURRENT_MEAN_DIRECTION", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WATER CURRENT MEAN DIRECTION (Deg) (nodatamode_repeat)" + }, + { + "output": "WATER_CURRENT_MEAN_SPEED", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WATER CURRENT MEAN SPEED (m/s) (nodatamode_repeat)" + }, + { + "output": "WATER_TEMPERATURE", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WATER TEMPERATURE (Cel) (nodatamode_repeat)" + }, + { + "output": "WAVE_MEAN_DIRECTION", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE MEAN DIRECTION (Deg) (nodatamode_repeat)" + }, + { + "output": "WAVE_PERIOD", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE PERIOD (s) (nodatamode_repeat)" + }, + { + "output": "WAVE_SIGNIFICANT_HEIGHT", + "source": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV WAVE SIGNIFICANT HEIGHT (m) (nodatamode_repeat)" + } + ], + "variable_converter": { + "split_operations": [ + { + "sep": ":", + "source_variable": "CVOW WRB03_DWR4_OSS PRV_HRD_150 PRV LOCATION () (last)", + "output_variables": { + "0": "latitude", + "2": "longitude" + } + } + ] + } +} diff --git a/pipeline/s3_timeseries/datasets_config/cvow.yaml b/pipeline/s3_timeseries/datasets_config/cvow.yaml new file mode 100644 index 0000000..1022102 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/cvow.yaml @@ -0,0 +1,84 @@ +global_attributes: + title: CVOW Wave Buoy, Timeseries Data + acknowledgement: This dataset is part of a MetOcean Cyberinfrastructure supported by the Integrated Ocean Observing System (IOOS) to streamline integration of data from offshore operations. + comment: CVOW Wave Rider Buoy deployed by DEME Offshore US + contributor_name: DEME Offshore US + contributor_address: 150 Boush St, Norfolk, Suite 1000, VA 23510 + contributor_url: www.deme-group.com + contributor_email: de.beuf.hendrik at deme-group.com + creator_country: USA + creator_name: Coastal Virginia Offshore Wind + creator_address: 500 Orapax Street, Norfolk, VA 23507 + creator_state: Virginia + creator_URL: https://coastalvawind.com/ + creator_email: cvowops@dominionenergy.com + institution: Virginia Electric and Power Company, d/b/a Dominion Energy Virginia + creator_sector: industry + geospatial_lat_max: 36.93842989 + geospatial_lat_min: 36.93842989 + geospatial_lon_max: -75.4423258 + geospatial_lon_min: -75.4423258 + id: CVOW_WR1 + infoUrl: https://coastalvawind.com/ + license: The data may be used and redistributed for free but is not intended for legal use, since it may contain inaccuracies. Neither the data Contributor, Dominion, NOAA, nor the United States Government, nor any of their employees or contractors, makes any warranty, express or implied, including warranties of merchantability and fitness for a particular purpose, or assumes any legal liability for the accuracy, completeness, or usefulness, of this information. + platform_name: CVOW_WR1 + processing_level: raw dataset, no qc provided + project: Coastal Virginia Offshore Wind + publisher_country: USA + publisher_email: devops at rpsgroup.com, info at neracoos.org + publisher_institution: MARACOOS, NERACOOS + publisher_name: MARACOOS, NERACOOS + publisher_phone: (401) 789-6224, (603) 319-1785 + publisher_type: institution + publisher_url: https://www.maracoos.org, https://www.neracoos.org + summary: This dataset contains observations to support further understanding of metocean conditions off the coast of Virginia. Data is collected from a wave rider buoy and the dataset includes measurements of wave peak height, wave significant height, wave peak direction, and wave significant period. + Instrument: Datawell DWR4 (https://datawell.nl/products/directional-waverider-4/) + cdm_data_type: TimeSeries + featureType: timeSeries + cdm_timeseries_variables: 'station,longitude,latitude' +variable_attributes: + time: + ioos_category: Time + long_name: Time + short_name: time + standard_name: time + station: + ioos_category: Identifier + long_name: Station CVOW Wave Buoy + cf_role: timeseries_id + WATER_CURRENT_MEAN_DIRECTION: + standard_name: sea_water_velocity_to_direction + long_name: Current direction at water surface + ioos_category: currents + units: degree + WATER_CURRENT_MEAN_SPEED: + standard_name: sea_water_speed + long_name: Current speed + ioos_category: currents + units: m s-1 + WATER_TEMPERATURE: + ioos_category: Water Property + standard_name: sea_water_temperature + long_name: Sea Water Temperature + units: degree_Celsius + WAVE_MEAN_DIRECTION: + long_name: mean wave direction + standard_name: sea_surface_wave_mean_from_direction + units: degrees + ioos_category: Surface Waves + WAVE_PEAK_PERIOD: + long_name: peak period + standard_name: sea_surface_wave_period_at_variance_spectral_density_maximum + units: seconds + ioos_category: Surface Waves + WAVE_PERIOD: + long_name: mean wave period + standard_name: sea_surface_wave_mean_period + units: seconds + ioos_category: Surface Waves + WAVE_SIGNIFICANT_HEIGHT: + long_name: significant wave height + standard_name: sea_surface_wave_significant_height + units: m + ioos_category: Surface Waves + diff --git a/pipeline/s3_timeseries/datasets_config/empire_adcp_currents.json b/pipeline/s3_timeseries/datasets_config/empire_adcp_currents.json new file mode 100644 index 0000000..f736ec3 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_adcp_currents.json @@ -0,0 +1,200 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "Empire Wind 1", + "latitude": 40.319, + "drop_vars": [ + "ZCel_WTmp" + ], + "longitude": -73.4461, + "s3_source": { + "bucket": "ott-empire", + "prefix": "/" + }, + "attributes": { + "variables": { + "depth": { + "axis": "Z", + "units": "m", + "positive": "down", + "long_name": "Depth", + "ioos_category": "Location", + "standard_name": "depth" + }, + "CurrSpd": { + "units": "m s-1", + "long_name": "Current Speed", + "ncei_name": "CURRENT METER - SPEED", + "ioos_category": "Currents", + "standard_name": "sea_water_speed", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0334", + "coverage_content_type": "physicalMeasurement" + }, + "CurrDirTo": { + "units": "degree", + "long_name": "Current Direction", + "ncei_name": "CURRENT METER - DIRECTION", + "ioos_category": "Currents", + "standard_name": "sea_water_velocity_to_direction", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/5D4D7YMF", + "coverage_content_type": "physicalMeasurement" + } + }, + "additional_attributes_path": "datasets_config/empire_attributes.yaml" + }, + "start_date": "2025-07-02", + "dataset_type": "profile", + "file_pattern": { + "day_pattern": "EW01_ADCP_{partition_date:%Y%m%d}_*.txt" + }, + "profile_data": [ + { + "mappings": { + "CurrSpd1": "CurrSpd", + "ZCelDpth1": "depth", + "CurrDirTo1": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd2": "CurrSpd", + "ZCelDpth2": "depth", + "CurrDirTo2": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd3": "CurrSpd", + "ZCelDpth3": "depth", + "CurrDirTo3": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd4": "CurrSpd", + "ZCelDpth4": "depth", + "CurrDirTo4": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd5": "CurrSpd", + "ZCelDpth5": "depth", + "CurrDirTo5": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd6": "CurrSpd", + "ZCelDpth6": "depth", + "CurrDirTo6": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd7": "CurrSpd", + "ZCelDpth7": "depth", + "CurrDirTo7": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd8": "CurrSpd", + "ZCelDpth8": "depth", + "CurrDirTo8": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd9": "CurrSpd", + "ZCelDpth9": "depth", + "CurrDirTo9": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd10": "CurrSpd", + "ZCelDpth10": "depth", + "CurrDirTo10": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd11": "CurrSpd", + "ZCelDpth11": "depth", + "CurrDirTo11": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd12": "CurrSpd", + "ZCelDpth12": "depth", + "CurrDirTo12": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd13": "CurrSpd", + "ZCelDpth13": "depth", + "CurrDirTo13": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd14": "CurrSpd", + "ZCelDpth14": "depth", + "CurrDirTo14": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd15": "CurrSpd", + "ZCelDpth15": "depth", + "CurrDirTo15": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd16": "CurrSpd", + "ZCelDpth16": "depth", + "CurrDirTo16": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd17": "CurrSpd", + "ZCelDpth17": "depth", + "CurrDirTo17": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd18": "CurrSpd", + "ZCelDpth18": "depth", + "CurrDirTo18": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd19": "CurrSpd", + "ZCelDpth19": "depth", + "CurrDirTo19": "CurrDirTo" + } + }, + { + "mappings": { + "CurrSpd20": "CurrSpd", + "ZCelDpth20": "depth", + "CurrDirTo20": "CurrDirTo" + } + } + ], + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/empire_adcp_water.json b/pipeline/s3_timeseries/datasets_config/empire_adcp_water.json new file mode 100644 index 0000000..dd6f0e9 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_adcp_water.json @@ -0,0 +1,100 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "Empire Wind 1", + "latitude": 40.319, + "drop_vars": [ + "CurrSpd1", + "CurrDirTo1", + "CurrSpd2", + "CurrDirTo2", + "CurrSpd3", + "CurrDirTo3", + "CurrSpd4", + "CurrDirTo4", + "CurrSpd5", + "CurrDirTo5", + "CurrSpd6", + "CurrDirTo6", + "CurrSpd7", + "CurrDirTo7", + "CurrSpd8", + "CurrDirTo8", + "CurrSpd9", + "CurrDirTo9", + "CurrSpd10", + "CurrDirTo10", + "CurrSpd11", + "CurrDirTo11", + "CurrSpd12", + "CurrDirTo12", + "CurrSpd13", + "CurrDirTo13", + "CurrSpd14", + "CurrDirTo14", + "CurrSpd15", + "CurrDirTo15", + "CurrSpd16", + "CurrDirTo16", + "CurrSpd17", + "CurrDirTo17", + "CurrSpd18", + "CurrDirTo18", + "CurrSpd19", + "CurrDirTo19", + "CurrSpd20", + "CurrDirTo20", + "ZCelDpth1", + "ZCelDpth2", + "ZCelDpth3", + "ZCelDpth4", + "ZCelDpth5", + "ZCelDpth6", + "ZCelDpth7", + "ZCelDpth8", + "ZCelDpth9", + "ZCelDpth10", + "ZCelDpth11", + "ZCelDpth12", + "ZCelDpth13", + "ZCelDpth14", + "ZCelDpth15", + "ZCelDpth16", + "ZCelDpth17", + "ZCelDpth18", + "ZCelDpth19", + "ZCelDpth20" + ], + "longitude": -73.4461, + "s3_source": { + "bucket": "ott-empire", + "prefix": "/" + }, + "attributes": { + "variables": { + "ZCel_WTmp": { + "units": "C", + "long_name": "Water Temperature", + "ncei_name": "WATER TEMPERATURE", + "ioos_category": "Temperature", + "standard_name": "sea_water_temperature", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0335", + "coverage_content_type": "physicalMeasurement" + } + }, + "additional_attributes_path": "datasets_config/empire_attributes.yaml" + }, + "start_date": "2025-07-02", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "EW01_ADCP_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/empire_attributes.yaml b/pipeline/s3_timeseries/datasets_config/empire_attributes.yaml new file mode 100644 index 0000000..1c92387 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_attributes.yaml @@ -0,0 +1,59 @@ +global_attributes: + title: Empire Metocean Buoy, Timeseries Data + acknowledgement: This dataset is part of a MetOcean Cyberinfrastructure supported by the Integrated Ocean Observing System (IOOS) to streamline integration of data from offshore operations. + comment: Empire Metocean Buoy developed and deployed by Woods Hole Group + contributor_name: Woods Hole Group + contributor_address: 107 Waterhouse Rd, Bourne, MA 02532 + contributor_url: https://www.woodsholegroup.com/ + contributor_email: rnewell at woodsholegroup + creator_country: USA + creator_name: Empire Wind + creator_address: 34 35th Sreet Suite A415, Brooklyn, NY 11232 + creator_state: New York + creator_URL: https://www.empirewind.com/ + creator_email: empirewind at equinor.com + institution: Empire Wind + creator_sector: industry + geospatial_lat_max: 40.3192 + geospatial_lat_min: 40.3192 + geospatial_lon_max: -73.4459 + geospatial_lon_min: -73.4459 + id: EW1 + infoURL: https://www.empirewind.com/ + license: The data may be used and redistributed for free but is not intended for legal use, since it may contain inaccuracies. Neither the data Contributor, Empire Wind, Equinor, NOAA, nor the United States Government, nor any of their employees or contractors, makes any warranty, express or implied, including warranties of merchantability and fitness for a particular purpose, or assumes any legal liability for the accuracy, completeness, or usefulness, of this information. + platform_name: Empire Wind Metocean Buoy + processing_level: raw dataset, no qc provided + project: Empire Wind + publisher_country: USA + publisher_email: devops at rpsgroup.com, info at neracoos.org + publisher_institution: MARACOOS, NERACOOS + publisher_name: MARACOOS, NERACOOS + publisher_phone: (401) 789-6224, (603) 319-1785 + publisher_type: institution + publisher_url: https://www.maracoos.org, https://www.neracoos.org + summary: This dataset contains observations to support further understanding of metocean conditions off the coast of New York. Data is collected from a floating wave buoy and the dataset includes measurements of sea temperature, conductivity, current speed, current direction, wave height, wave direction, and wave period, air temperature, atmospheric pressure, relative humidity, visibility, surface wind speed, surface wind direction, and surface wind gust. + + infoUrl: https://www.empirewind.com/ + cdm_data_type: TimeSeries + featureType: timeSeries + cdm_timeseries_variables: 'station,longitude,latitude' + standard_name_vocabulary: CF Standard Name Table v90 +variable_attributes: + time: + ioos_category: Time + standard_name: time + axis: T + station: + cf_role: timeseries_id + standard_name: platform_name + ioos_category: Identifier + latitude: + standard_name: latitude + units: degrees_north + long_name: Latitude + ioos_category: Location + longitude: + standard_name: longitude + units: degrees_east + long_name: Longitude + ioos_category: Location \ No newline at end of file diff --git a/pipeline/s3_timeseries/datasets_config/empire_ctd.json b/pipeline/s3_timeseries/datasets_config/empire_ctd.json new file mode 100644 index 0000000..b4b81e7 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_ctd.json @@ -0,0 +1,62 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "Empire Wind 1", + "latitude": 40.319, + "longitude": -73.4461, + "s3_source": { + "bucket": "ott-empire", + "prefix": "/" + }, + "attributes": { + "variables": { + "Cond": { + "units": "S m-1", + "long_name": "Conductivity", + "ncei_name": "WATER CONDUCTIVITY", + "ioos_category": "Physical Oceanography", + "standard_name": "sea_water_electrical_conductivity", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0394", + "coverage_content_type": "physicalMeasurement" + }, + "Pres": { + "units": "dbar", + "long_name": "Water Pressure", + "standard_name": "sea_water_pressure" + }, + "Sali": { + "units": "1", + "long_name": "Salinity", + "ncei_name": "SALINITY", + "nodc_name": "SALINITY", + "ioos_category": "Salinity", + "standard_name": "sea_water_practical_salinity", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/IADIHDIJ/", + "coverage_content_type": "physicalMeasurement" + }, + "CTD_WTmp": { + "unit": "C", + "long_name": "Water Temperature", + "ncei_name": "WATER TEMPERATURE", + "ioos_category": "Temperature", + "standard_name": "sea_water_temperature", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0335", + "coverage_content_type": "physicalMeasurement" + } + }, + "additional_attributes_path": "datasets_config/empire_attributes.yaml" + }, + "start_date": "2025-07-02", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "EW01_ctd_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/empire_met.json b/pipeline/s3_timeseries/datasets_config/empire_met.json new file mode 100644 index 0000000..1feab88 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_met.json @@ -0,0 +1,104 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "Empire Wind 1", + "latitude": 40.319, + "longitude": -73.4461, + "s3_source": { + "bucket": "ott-empire", + "prefix": "/" + }, + "attributes": { + "variables": { + "MetSENS_RH_Avg": { + "units": "1", + "long_name": "Relative Humidity", + "ioos_category": "Meteorology", + "standard_name": "relative_humidity", + "coverage_content_type": "physicalMeasurement" + }, + "MetS3SecGst_Max": { + "long_name": "Wind Speed Gust", + "ncei_name": "WIND GUST", + "ioos_category": "Wind", + "standard_name": "wind_speed_of_gust", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0039", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_WSpd_Avg": { + "units": "m/s", + "long_name": "Wind Speed", + "ncei_name": "WIND SPEED", + "ioos_category": "Wind", + "standard_name": "wind_speed", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0038", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_WSpd_Std": { + "units": "m/s", + "long_name": "Std Dev Wind Speed" + }, + "MetSENS_ATemp_Avg": { + "unit": "C", + "long_name": "Air Temperature", + "ncei_name": "AIR TEMPERATURE", + "ioos_category": " Meteorology", + "standard_name": "air_temperature", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0023", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_BPress_Avg": { + "units": "Pa", + "long_name": "Barometric Pressure", + "ncei_name": "PRESSURE - BAROMETRIC", + "ioos_category": "Meteorology", + "standard_name": "air_pressure", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0015", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_Dwpoint_Avg": { + "unit": "C", + "long_name": "Dew Point Temperature", + "ioos_category": "Meteorology", + "standard_name": "dew_point_temperature", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_WDir_From_Avg": { + "units": "degree", + "comment": "Wind direction is the direction from which wind is blowing. It is a meteorological convention (degrees clockwise from North). (IOOS)", + "long_name": "Wind From Direction", + "ncei_name": "WIND DIRECTION", + "ioos_category": "Wind", + "standard_name": "wind_from_direction", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0036", + "coverage_content_type": "physicalMeasurement" + }, + "MetSENS_WDir_From_Std": { + "units": "Degrees", + "long_name": "Std Dev Wind Direction" + } + }, + "additional_attributes_path": "datasets_config/empire_attributes.yaml" + }, + "start_date": "2025-07-02", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "EW01_met_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + }, + { + "output": "latitude", + "source": "Latitude_Avg" + }, + { + "output": "longitude", + "source": "Longitude_Avg" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/empire_waves.json b/pipeline/s3_timeseries/datasets_config/empire_waves.json new file mode 100644 index 0000000..d5855b3 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/empire_waves.json @@ -0,0 +1,109 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "Empire Wind 1", + "latitude": 40.319, + "longitude": -73.4461, + "s3_source": { + "bucket": "ott-empire", + "prefix": "/" + }, + "attributes": { + "variables": { + "Hmax": { + "unit": "m", + "long_name": "Maximum Wave Height", + "ncei_name": "WAVE HEIGHT - MAXIMUM", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_wave_maximum_height", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/NQS0CMX", + "coverage_content_type": "physicalMeasurement" + }, + "Wdir": { + "units": "degrees", + "long_name": "Wave Direction Swell 2", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_swell_wave_from_direction" + }, + "HpSea": { + "unit": "m", + "long_name": "Significant Wave Height", + "ncei_name": "WAVE HEIGHT - SIGNIFICANT", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_wave_significant_height", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSN0385", + "coverage_content_type": "physicalMeasurement" + }, + "TpSea": { + "unit": "s", + "long_name": "Dominant Wave Period", + "ncei_name": "WAVE PERIOD - DOMINANT", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_wave_period_at_variance_spectral_density_maximum", + "standard_name_url": "vocab.nerc.ac.uk/collection/P07/current/CFV13N31", + "coverage_content_type": "physicalMeasurement" + }, + "TzSea": { + "unit": "s", + "long_name": "Mean Wave Period", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_wave_mean_period" + }, + "WdirSea": { + "units": "degree", + "long_name": "Mean Wave Direction", + "ncei_name": "WAVE DIRECTION - Average", + "ioos_category": "Surface Waves", + "standard_name": "sea_surface_wave_from_direction", + "standard_name_url": "https://vocab.nerc.ac.uk/collection/P07/current/CFSM0384", + "coverage_content_type": "physicalMeasurement" + }, + "HsSwell1": { + "unit": "m", + "long_name": "Sig Wave Height Swell 1" + }, + "HsSwell2": { + "unit": "m", + "long_name": "Sig Wave Height Swell 2" + }, + "TpSwell1": { + "unit": "s", + "long_name": "Peak Wave Period Swell 1" + }, + "TpSwell2": { + "unit": "s", + "long_name": "Peak Wave Period Swell 2" + }, + "TzSwell1": { + "unit": "s", + "long_name": "Mean Wave Period Swell 1" + }, + "TzSwell2": { + "unit": "s", + "long_name": "Mean Wave Period Swell 2" + }, + "WdirSwell1": { + "unit": "degrees", + "long_name": "Wave Direction Swell 1" + }, + "WdirSwell2": { + "unit": "degrees", + "long_name": "Wave Direction Swell 2" + } + }, + "additional_attributes_path": "datasets_config/empire_attributes.yaml" + }, + "start_date": "2025-07-02", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "EW01_wave_ioos_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/south_fork_attributes.yaml b/pipeline/s3_timeseries/datasets_config/south_fork_attributes.yaml new file mode 100644 index 0000000..8908f0f --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/south_fork_attributes.yaml @@ -0,0 +1,42 @@ +global_attributes: + standard_name_vocabulary: CF Standard Name Table v90 + processing_leve: raw dataset, no qc provided + license: > + The data may be used and redistributed for free but is not intended + for legal use, since it may contain inaccuracies. Neither the data + Contributor, Orsted, NOAA, nor the United States Government, nor any + of their employees or contractors, makes any warranty, express or + implied, including warranties of merchantability and fitness for a + particular purpose, or assumes any legal liability for the accuracy, + completeness, or usefulness, of this information + project: South Fork Wind + inforURL: https://southforkwind.com + publisher_country: USA + publisher_institution: MARACOOS,NERACOOS + publisher_name: MARACOOS,NERACOOS + publisher_phone: (401) 789-6224, (603) 319-1785 + publisher_type: instiution + summary: > + This dataset contains observations to support further understanding of metocean + conditions off the coast of New York, Rhode Island and Southeastern Massachusetts. + Data is collected from a floating lidar buoy and the dataset includes measurements + of sea temperature (surface and sea floor), current speed, current direction, + wave height, wave direction, and wave period, air temperature, atmospheric pressure, + relative humidity, visibility, surface wind speed, surface wind direction, and surface wind gust. + instrument: Campbell Scientific MetSENS 500, Campbell Scientific CS120 Visibility Sensor, Garmin GPS16X-HVS, Seaview SVS-603b, Nortek Aquadopp Profiler Z-Cell 600kHz + acknowledgement: > + This dataset is part of a MetOcean Cyberinfrastructure supported by the Integrated + Ocean Observing System (IOOS) to streamline integration of data from offshore operations. + contributor_name: Woods Hole Group + contributor_address: 107 Waterhouse Rd, Bourne, MA 02532 + contributor_url: https://www.woodsholegroup.com/ + contributor_email: dwalsh at woodsholegroup.com + creator_country: USA + creator_name: South Fork Wind + creator_address: 107 Waterhouse Rd, Bourne, MA 02532 + creator_state: New York + creator_URL: https://southforkwind.com + creator_email: info at southforkwind.com + institution: South Fork Wind - a partnership between Orsted and Global Infrastructure Partners + creator_sector: industry + platform_name: South Fork Wind \ No newline at end of file diff --git a/pipeline/s3_timeseries/datasets_config/south_fork_currents.json b/pipeline/s3_timeseries/datasets_config/south_fork_currents.json new file mode 100644 index 0000000..8c5e264 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/south_fork_currents.json @@ -0,0 +1,209 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "SFW01", + "latitude": 41.0775, + "drop_vars": [ + "ZCel_WTmp" + ], + "longitude": -71.1855, + "s3_source": { + "bucket": "ott-south-fork-wind", + "prefix": "/" + }, + "attributes": { + "variables": { + "time": { + "standard_name": "time" + }, + "depth": { + "units": "meters", + "positive": "down", + "standard_name": "depth" + }, + "CurrSpd": { + "name": "current speed", + "units": "m/s", + "long_name": "current_speed", + "instrument": "Nortek Aquadopp Profiler Z-Cell 600kHz", + "standard_name": "sea_water_speed" + }, + "station": { + "cf_role": "timeseries_id", + "long_name": "Station SFW01-current", + "standard_name": "platform_name" + }, + "latitude": { + "units": "degrees_north", + "standard_name": "latitude" + }, + "CurrDirTo": { + "name": "current_direction", + "units": "degree", + "long_name": "current direction", + "instrument": "Nortek Aquadopp Profiler Z-Cell 600kHz", + "standard_name": "sea_water_velocity_to_direction" + }, + "longitude": { + "units": "degrees_east", + "standard_name": "longitude" + } + }, + "additional_attributes_path": "datasets_config/south_fork_attributes.yaml" + }, + "start_date": "2025-07-03", + "dataset_type": "profile", + "file_pattern": { + "day_pattern": "SFW01_WB_02_current_{partition_date:%Y%m%d}_*.txt" + }, + "profile_data": [ + { + "depth": 1, + "mappings": { + "CurrSpd1": "CurrSpd", + "CurrDirTo1": "CurrDirTo" + } + }, + { + "depth": 4, + "mappings": { + "CurrSpd2": "CurrSpd", + "CurrDirTo2": "CurrDirTo" + } + }, + { + "depth": 6, + "mappings": { + "CurrSpd3": "CurrSpd", + "CurrDirTo3": "CurrDirTo" + } + }, + { + "depth": 8, + "mappings": { + "CurrSpd4": "CurrSpd", + "CurrDirTo4": "CurrDirTo" + } + }, + { + "depth": 10, + "mappings": { + "CurrSpd5": "CurrSpd", + "CurrDirTo5": "CurrDirTo" + } + }, + { + "depth": 12, + "mappings": { + "CurrSpd6": "CurrSpd", + "CurrDirTo6": "CurrDirTo" + } + }, + { + "depth": 14, + "mappings": { + "CurrSpd7": "CurrSpd", + "CurrDirTo7": "CurrDirTo" + } + }, + { + "depth": 16, + "mappings": { + "CurrSpd8": "CurrSpd", + "CurrDirTo8": "CurrDirTo" + } + }, + { + "depth": 18, + "mappings": { + "CurrSpd9": "CurrSpd", + "CurrDirTo9": "CurrDirTo" + } + }, + { + "depth": 20, + "mappings": { + "CurrSpd10": "CurrSpd", + "CurrDirTo10": "CurrDirTo" + } + }, + { + "depth": 22, + "mappings": { + "CurrSpd11": "CurrSpd", + "CurrDirTo11": "CurrDirTo" + } + }, + { + "depth": 24, + "mappings": { + "CurrSpd12": "CurrSpd", + "CurrDirTo12": "CurrDirTo" + } + }, + { + "depth": 26, + "mappings": { + "CurrSpd13": "CurrSpd", + "CurrDirTo13": "CurrDirTo" + } + }, + { + "depth": 28, + "mappings": { + "CurrSpd14": "CurrSpd", + "CurrDirTo14": "CurrDirTo" + } + }, + { + "depth": 30, + "mappings": { + "CurrSpd15": "CurrSpd", + "CurrDirTo15": "CurrDirTo" + } + }, + { + "depth": 32, + "mappings": { + "CurrSpd16": "CurrSpd", + "CurrDirTo16": "CurrDirTo" + } + }, + { + "depth": 34, + "mappings": { + "CurrSpd17": "CurrSpd", + "CurrDirTo17": "CurrDirTo" + } + }, + { + "depth": 36, + "mappings": { + "CurrSpd18": "CurrSpd", + "CurrDirTo18": "CurrDirTo" + } + }, + { + "depth": 38, + "mappings": { + "CurrSpd19": "CurrSpd", + "CurrDirTo19": "CurrDirTo" + } + }, + { + "depth": 40, + "mappings": { + "CurrSpd20": "CurrSpd", + "CurrDirTo20": "CurrDirTo" + } + } + ], + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/south_fork_water.json b/pipeline/s3_timeseries/datasets_config/south_fork_water.json new file mode 100644 index 0000000..5c75963 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/south_fork_water.json @@ -0,0 +1,94 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "SFW01", + "latitude": 41.0775, + "drop_vars": [ + "CurrSpd1", + "CurrSpd2", + "CurrSpd3", + "CurrSpd4", + "CurrSpd5", + "CurrSpd6", + "CurrSpd7", + "CurrSpd8", + "CurrSpd9", + "CurrSpd10", + "CurrSpd11", + "CurrSpd12", + "CurrSpd13", + "CurrSpd14", + "CurrSpd15", + "CurrSpd16", + "CurrSpd17", + "CurrSpd18", + "CurrSpd19", + "CurrSpd20", + "CurrDirTo1", + "CurrDirTo2", + "CurrDirTo3", + "CurrDirTo4", + "CurrDirTo5", + "CurrDirTo6", + "CurrDirTo7", + "CurrDirTo8", + "CurrDirTo9", + "CurrDirTo10", + "CurrDirTo11", + "CurrDirTo12", + "CurrDirTo13", + "CurrDirTo14", + "CurrDirTo15", + "CurrDirTo16", + "CurrDirTo17", + "CurrDirTo18", + "CurrDirTo19", + "CurrDirTo20" + ], + "longitude": -71.1855, + "s3_source": { + "bucket": "ott-south-fork-wind", + "prefix": "/" + }, + "attributes": { + "variables": { + "time": { + "standard_name": "time" + }, + "station": { + "cf_role": "timeseries_id", + "long_name": "Station SFW01-current", + "standard_name": "platform_name" + }, + "latitude": { + "units": "degrees_north", + "standard_name": "latitude" + }, + "ZCel_WTmp": { + "name": "water_temperature", + "units": "celsius", + "long_name": "ZCel WTmp", + "instrument": "Nortek Aquadopp Profiler Z-Cell 600kHz", + "standard_name": "sea_water_temperature" + }, + "longitude": { + "units": "degrees_east", + "standard_name": "longitude" + } + }, + "additional_attributes_path": "datasets_config/south_fork_attributes.yaml" + }, + "start_date": "2025-07-03", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "SFW01_WB_02_current_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/datasets_config/south_fork_waves.json b/pipeline/s3_timeseries/datasets_config/south_fork_waves.json new file mode 100644 index 0000000..b5c5a14 --- /dev/null +++ b/pipeline/s3_timeseries/datasets_config/south_fork_waves.json @@ -0,0 +1,164 @@ +{ + "reader": { + "sep": ";", + "comment": "#" + }, + "station": "SFW01", + "latitude": 41.0775, + "longitude": -71.1855, + "s3_source": { + "bucket": "ott-south-fork-wind", + "prefix": "/" + }, + "attributes": { + "variables": { + "Hs": { + "name": "sig_wave_height", + "units": "m", + "long_name": "significant wave height", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wave_significant_height" + }, + "Tp": { + "name": "peak_period", + "units": "seconds", + "long_name": "peak period", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wave_period_at_variance_spectral_density_maximum" + }, + "Tz": { + "name": "mean_wave_period", + "units": "seconds", + "long_name": "mean wave period", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wave_mean_period" + }, + "Hmax": { + "name": "max_wave_height", + "units": "m", + "long_name": "maximum wave height", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wave_maximum_height" + }, + "Wdir": { + "name": "mean_wave_direction", + "units": "degrees", + "long_name": "mean wave direction", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wave_mean_from_direction" + }, + "time": { + "standard_name": "time" + }, + "HsSea": { + "name": "sig_wave_height_sea", + "units": "m", + "long_name": "significant wave height wind sea component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wind_wave_significant_height" + }, + "TpSea": { + "name": "peak_wave_period_sea", + "units": "seconds", + "long_name": "peak period wind sea component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wind_wave_period_at_variance_spectral_density_maximum" + }, + "TzSea": { + "name": "mean_wave_period_sea", + "units": "seconds", + "long_name": "mean period wind sea component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wind_wave_mean_period" + }, + "WdirSea": { + "name": "wave_direction_sea", + "units": "degrees", + "long_name": "mean wave direction wind sea component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_wind_wave_mean_direction" + }, + "station": { + "cf_role": "timeseries_id", + "long_name": "Station SFW01-waves", + "standard_name": "platform_name" + }, + "HsSwell1": { + "name": "sig_wave_height_swell_1", + "units": "m", + "long_name": "significant Wave Height 1st swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_primary_swell_wave_significant_height" + }, + "HsSwell2": { + "name": "sig_wave_height_swell_2", + "units": "m", + "long_name": "significant wave height 2nd swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_secondary_swell_significant_height" + }, + "TpSwell1": { + "name": "peak_wave_period_swell_1", + "units": "seconds", + "long_name": "peak period 1st swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_primary_swell_wave_period_at_variance_spectral_density_maximum" + }, + "TpSwell2": { + "name": "peak_wave_period_swell_2", + "units": "seconds", + "long_name": "peak period 2nd swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_secondary_swell_wave_period_at_variance_spectral_density_maximum" + }, + "TzSwell1": { + "name": "mean_wave_period_swell_1", + "units": "seconds", + "long_name": "mean period 1st swell componnent", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_primary_swell_wave_mean_period" + }, + "TzSwell2": { + "name": "mean_wave_period_swell_2", + "units": "seconds", + "long_name": "mean period 2nd swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_secondary_swell_wave_mean_period" + }, + "latitude": { + "units": "degrees_north", + "standard_name": "latitude" + }, + "longitude": { + "units": "degrees_east", + "standard_name": "longitude" + }, + "WdirSwell1": { + "name": "wave_direction_swell_1", + "units": "degrees", + "long_name": "mean wave direction 1st swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_primary_swell_wave_from_direction" + }, + "WdirSwell2": { + "name": "wave_direction_swell_2", + "units": "degrees", + "long_name": "wave direction 2nd swell component", + "instrument": "Seaview SVS-603b", + "standard_name": "sea_surface_secondary_swell_wave_from_direction" + } + }, + "additional_attributes_path": "datasets_config/south_fork_attributes.yaml" + }, + "start_date": "2025-07-03", + "dataset_type": "timeseries", + "file_pattern": { + "day_pattern": "SFW01_WB_02_wave_{partition_date:%Y%m%d}_*.txt" + }, + "variable_mappings": [ + { + "output": "time", + "source": "datetime" + } + ] +} diff --git a/pipeline/s3_timeseries/pipeline.py b/pipeline/s3_timeseries/pipeline.py index 43225b2..6285b58 100644 --- a/pipeline/s3_timeseries/pipeline.py +++ b/pipeline/s3_timeseries/pipeline.py @@ -10,11 +10,11 @@ from common import assets, config, io from common.backend_api import BackendAPIClient -from common.config import mappings, s3_source +from common.config import mappings, s3_source, attributes from common.readers.pandas_csv import PandasCSVReader from common.resource.s3fs_resource import S3Credentials, S3FSResource from common.sentry import SentryConfig - +import yaml sentry = SentryConfig(pipeline_name="s3_timeseries") @@ -40,8 +40,10 @@ class S3TimeseriesConfig( config.DatasetConfigBase, # config.AttributeConfigMixin, mappings.VariableMappingMixin, - # mappings.OptionalDepthMappingMixin, + mappings.OptionalProfileDepthMixin, s3_source.S3SourceMixin, + attributes.AttributeConfigMixin, + mappings.VariableConverterMixIn ): """Configuration for S3 Timeseries Dataset.""" @@ -49,8 +51,17 @@ class S3TimeseriesConfig( reader: PandasCSVReader - file_pattern: Annotated[DayGlob, Field(description="Source file name pattern")] + dataset_type: Annotated[str, Field(description="Dateset type (timeseries or profile)")] + + source_time_var : str = "datetime" + + file_pattern: Annotated[DayGlob, Field(description="Source file name pattern")] + drop_vars : Annotated[list[str], + Field( + description="Variables to drop from the dataset", + default_factory=list, + ),] = None latitude: Annotated[ float | None, Field(description="Fixed latitude of the station"), @@ -116,12 +127,19 @@ def daily_df(context: dg.AssetExecutionContext, s3fs: S3FSResource) -> pd.DataFr """Download daily dataframe from S3.""" partition_date_string = context.asset_partition_key_for_output() partition_date = date.fromisoformat(partition_date_string) - day_glob = f"{dataset.config.s3_source.bucket}{dataset.config.s3_source.prefix}{dataset.config.file_pattern.day_pattern.format(partition_date=partition_date)}" + + day_glob = ( + dataset.config.s3_source.bucket + + dataset.config.s3_source.prefix + + dataset.config.file_pattern.day_pattern.format( + partition_date=partition_date, + ) + ) context.log.info( f"Reading daily data for {partition_date_string} from S3 with glob: {day_glob}", ) - + s3_keys = s3fs.fs.glob(day_glob) s3_keys.sort() @@ -129,14 +147,59 @@ def daily_df(context: dg.AssetExecutionContext, s3fs: S3FSResource) -> pd.DataFr context.add_output_metadata({"Source S3 keys": dg.MetadataValue.json(s3_keys)}) daily_dfs = [] + for day_f in s3_keys: context.log.debug(f"Reading {day_f}") with s3fs.fs.open(day_f, "rb") as f: + + df = dataset.config.reader.read_df(f) - daily_dfs.append(df) + + if dataset.config.variable_converter is not None: + for split_conv in dataset.config.variable_converter.split_operations: + splt_col =df[split_conv.source_variable].str.split(split_conv.sep, expand=True) + for n_var in split_conv.output_variables: + + df[split_conv.output_variables[n_var]] = splt_col[n_var] + + + df = df.drop(split_conv.source_variable,axis=1) + if dataset.config.drop_vars is not None: + df = df.drop(columns=dataset.config.drop_vars) + + if dataset.config.dataset_type =='profile': + # Translate the profile data from multiple columns for each variable (CurSpd1, curSpd2,..curSpdN) to + # two columns: curSpd, depth + + all_profile_vars = [var for depth_conf in dataset.config.profile_data for var in depth_conf.mappings.keys()] + + non_profile_vars = df.columns.difference(all_profile_vars).tolist() + + for depth in dataset.config.profile_data: + + keep = non_profile_vars + list(depth.mappings.keys()) + + + df_depth = df[keep].copy() + df_depth = df_depth.rename(columns=depth.mappings) + + if depth.depth is not None: + df_depth['depth'] = float(depth.depth) + + daily_dfs.append(df_depth) + indx_var = [dataset.config.source_time_var,"depth"] + + else: + daily_dfs.append(df) + indx_var = dataset.config.source_time_var df = pd.concat(daily_dfs) + df[dataset.config.source_time_var] = pd.to_datetime(df[dataset.config.source_time_var]) + df = df.sort_values(indx_var) + df = df.reset_index() + + return df @dg.asset( @@ -183,10 +246,16 @@ def monthly_ds( daily_dfs.append(df) df = pd.concat(daily_dfs, ignore_index=True) - df = df.sort_values("time") - df = df.drop_duplicates(subset=["time"]) + if dataset.config.dataset_type =='profile': + indx_var = ["time","depth"] + else: + indx_var = "time" + + + df = df.sort_values(indx_var) + df = df.drop_duplicates(subset=indx_var) df["time"] = pd.to_datetime(df["time"]) - df = df.set_index("time") + df = df.set_index(indx_var) ds = df.to_xarray() @@ -197,13 +266,17 @@ def monthly_ds( ds["longitude"] = dataset.config.longitude ds = ds.set_coords(["station", "latitude", "longitude"]) - + ds = ds.drop_vars("index") # apply attributes ds["time"].encoding.update( - {"units": "seconds since 1970-01-01T00:00:00Z", "calendar": "gregorian"}, + {"units": "seconds since 1970-01-01T00:00:00Z", "calendar": "gregorian", "standard_name":"time"}, ) + dataset.config.attributes.add_attributes_from_yaml() + + + dataset.config.attributes.apply_to_dataset(ds) return ds return dg.Definitions(assets=[daily_df, monthly_ds])