-
Notifications
You must be signed in to change notification settings - Fork 138
Fix parquet io #1749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix parquet io #1749
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,66 +1,106 @@ | ||
| import sys | ||
| import os | ||
| from datetime import datetime, timedelta | ||
| import logging | ||
|
|
||
| logging.captureWarnings(True) | ||
| logger = logging.getLogger(__name__) | ||
| import string | ||
| from shutil import move | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| from opendrift.models.basemodel import Mode | ||
| import xarray as xr | ||
|
|
||
| logging.captureWarnings(True) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # KFD 4 Feb 2025 | ||
| # recarray self.history is now replaced with Xarray Dataset self.result | ||
| # Thus the below code must be updated accordingly. | ||
| # E.g. self.history['ID"] -> self.result.ID | ||
|
|
||
| def init(self, filename): | ||
| """Initialise parquet output.""" | ||
| self.outfile_name = filename | ||
|
|
||
| self.outfile = filename | ||
| dummy_data = { | ||
| k: pd.Series([], dtype=t) for k, (t, _) in self.history.dtype.fields.items() | ||
| } | ||
| dummy_data["time"] = pd.Series([], dtype="datetime64[ns]") | ||
| df = pd.DataFrame(dummy_data) | ||
| df.to_parquet(self.outfile, engine="fastparquet") | ||
| if os.path.exists(self.outfile_name): | ||
| logger.warning(f'Deleting existing parquet file {self.outfile_name}') | ||
| os.remove(self.outfile_name) | ||
|
|
||
| self._parquet_initialized = False | ||
|
|
||
| def write_buffer(self): | ||
| num_steps_to_export = self.steps_output - self.steps_exported | ||
|
|
||
| data = { | ||
| k: self.history[k][:, 0:num_steps_to_export][ | ||
| ~self.history[k].mask[:, 0:num_steps_to_export] | ||
| ] # automatically flattens array | ||
| for k in self.history.dtype.fields | ||
| } | ||
| def _result_chunk_to_dataframe(self): | ||
| """Flatten current self.result (xarray.Dataset) to tidy DataFrame.""" | ||
| ds = self.result | ||
|
|
||
| if "time" not in ds.dims or "trajectory" not in ds.dims: | ||
| raise ValueError( | ||
| "Expected self.result to have 'time' and 'trajectory' dimensions " | ||
| f"but got dims: {list(ds.dims)}" | ||
| ) | ||
|
|
||
| df = ds.to_dataframe().reset_index() | ||
|
|
||
| times = [ | ||
| self.start_time + n * self.time_step_output | ||
| for n in range(self.steps_exported, self.steps_output) | ||
| ] | ||
| # Drop unseeded/invalid rows (simple proxy: lon is NaN) | ||
| if "lon" in df.columns: | ||
| df = df[~df["lon"].isna()] | ||
|
|
||
| _arr_template = self.history["ID"][:, 0:num_steps_to_export] | ||
| time_arr = np.repeat([times], _arr_template.shape[0], axis=0) | ||
| data["time"] = time_arr[~_arr_template.mask] # automatically flattens array | ||
| df["time"] = pd.to_datetime(df["time"]) | ||
|
|
||
| df = pd.DataFrame(data) | ||
| df.to_parquet(self.outfile, engine="fastparquet", append=True) | ||
| return df | ||
|
|
||
| logger.info("Wrote %s steps to file %s" % (num_steps_to_export, self.outfile)) | ||
| self.history.mask = True # Reset history array, for new data | ||
| self.steps_exported = self.steps_exported + num_steps_to_export | ||
|
|
||
| def write_buffer(self): | ||
| """Append current buffer (self.result) to parquet file.""" | ||
| if self.result is None or self.result.sizes.get("time", 0) == 0: | ||
| logger.debug("No timesteps in result; nothing to write to parquet.") | ||
| return | ||
|
|
||
| df = _result_chunk_to_dataframe(self) | ||
| if df.empty: | ||
| logger.debug("Flattened DataFrame is empty; skipping parquet write.") | ||
| return | ||
|
|
||
| if not getattr(self, "_parquet_initialized", False): | ||
| df.to_parquet(self.outfile_name, engine="fastparquet") | ||
| self._parquet_initialized = True | ||
| logger.info( | ||
| "Initialized parquet file %s with %d rows, %d timesteps", | ||
| self.outfile_name, | ||
| len(df), | ||
| self.result.sizes.get("time", 0), | ||
| ) | ||
| else: | ||
| df.to_parquet(self.outfile_name, engine="fastparquet", append=True) | ||
| logger.info( | ||
| "Appended %d rows (%d timesteps) to %s", | ||
| len(df), | ||
| self.result.sizes.get("time", 0), | ||
| self.outfile_name, | ||
| ) | ||
|
|
||
|
|
||
| def close(self): | ||
| logger.warning("`.close` not strictly needed...?") | ||
| """Finalize parquet output and reconstruct self.result as xarray.Dataset.""" | ||
| if not getattr(self, "_parquet_initialized", False): | ||
| logger.debug("Parquet file was never written; nothing to close.") | ||
| return | ||
|
|
||
| logger.debug(f"Reopening parquet file {self.outfile_name} to build result") | ||
|
|
||
| df = pd.read_parquet(self.outfile_name, engine="fastparquet") | ||
|
|
||
| # Ensure we have trajectory and time as index | ||
| if not {"trajectory", "time"}.issubset(df.columns): | ||
| raise ValueError( | ||
| "Parquet file must contain 'trajectory' and 'time' columns " | ||
| f"but columns are: {list(df.columns)}" | ||
| ) | ||
|
|
||
| df = df.set_index(["trajectory", "time"]).sort_index() | ||
|
|
||
| # Convert back to xarray.Dataset | ||
| ds = df.to_xarray() | ||
|
|
||
| # Make sure time coord is datetime64[ns] | ||
| ds = ds.assign_coords(time=("time", pd.to_datetime(ds.coords["time"].values))) | ||
|
|
||
| self.result = ds | ||
| logger.debug("Parquet close: self.result has been rebuilt from parquet.") | ||
|
|
||
|
|
||
| def import_file(self, filename, times=None, elements=None, load_history=True): | ||
| """Create OpenDrift object from imported file. | ||
| This reimport is potentially very costly anyway | ||
| """ | ||
| logger.info("Skipping reimport") | ||
| """Parquet import placeholder.""" | ||
| logger.info("Parquet import is not implemented; returning self unchanged.") | ||
| return self |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2225,14 +2225,18 @@ def run(self, | |
| # Remove any elements scheduled for deactivation during last step | ||
| self.remove_deactivated_elements() | ||
|
|
||
| if export_buffer_length is None: | ||
| pass # TODO - do this for self.result | ||
| else: # If output has been flushed to file during run, we | ||
| # need to reimport from file to get all data in memory | ||
| del self.environment | ||
| if hasattr(self, 'environment_profiles'): | ||
| del self.environment_profiles | ||
| self.result = xr.open_dataset(outfile) | ||
| if outfile is not None: | ||
| ext = os.path.splitext(outfile)[1].lower() | ||
| if ext in [".nc", ".nc4", ".netcdf"]: | ||
| # NetCDF: open with xarray as before | ||
| self.result = xr.open_dataset(outfile) | ||
| elif ext == ".parquet": | ||
| # Parquet: let the parquet exporter set self.result in its close() | ||
| # (we do nothing here) | ||
| pass | ||
| else: | ||
| # Fallback: try normal xarray open | ||
| self.result = xr.open_dataset(outfile) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lines 2230-2239 seem a bit overly complicated. Could this not be simply:
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I am note completely sure about replacing |
||
|
|
||
| return self.result | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deletion of
self.environmentandself.environment_profileshas here been removed. Tests are passing, so might be ok, but I seem to recall that this deletion was necessary for some reason. Was this removed deliberately, or was it just ChatGPT not seing the point?