From 08c67a56b39a86b7cd1c18b565f2d729b420fc69 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Wed, 7 May 2025 11:18:15 +0100 Subject: [PATCH 1/5] Probably good idea : keep factory refs as potentially lazy. --- lib/iris/fileformats/rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/iris/fileformats/rules.py b/lib/iris/fileformats/rules.py index d24f38d3f3..f6835e49b7 100644 --- a/lib/iris/fileformats/rules.py +++ b/lib/iris/fileformats/rules.py @@ -159,7 +159,7 @@ def _dereference_args(factory, reference_targets, regrid_cache, cube): src, cube = _ensure_aligned(regrid_cache, src, cube) if src is not None: new_coord = iris.coords.AuxCoord( - src.data, + src.core_data(), src.standard_name, src.long_name, src.var_name, From 9f1083db9e5b3ddc453871c008b4fdd69dcedbe2 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Wed, 7 May 2025 11:20:05 +0100 Subject: [PATCH 2/5] WIP testing concat memory probs. --- lib/iris/_concatenate.py | 23 ++++++++++++++++++++--- lib/iris/_data_manager.py | 38 +++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 6caee79c4f..7b3b18e574 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -459,7 +459,8 @@ def _array_id( bound: bool, ) -> str: """Get a unique key for looking up arrays associated with coordinates.""" - return f"{id(coord)}{bound}" + bound = "bounds" if bound else "points" + return f"{coord.name()}_{id(coord)}_{bound}" def _compute_hashes( @@ -586,18 +587,34 @@ def concatenate( # Compute hashes for parallel array comparison. arrays = {} + def prarrrep(arr): + if isinstance(arr, da.Array): + result = str(arr) + else: + result = f"ndarr{arr.shape}" + return result + def add_coords(cube_signature: _CubeSignature, coord_type: str) -> None: for coord_and_dims in getattr(cube_signature, coord_type): coord = coord_and_dims.coord array_id = _array_id(coord, bound=False) if isinstance(coord, (DimCoord, AuxCoord)): - arrays[array_id] = coord.core_points() + pts = coord.core_points() + arrays[array_id] = pts + bds = None if coord.has_bounds(): bound_array_id = _array_id(coord, bound=True) - arrays[bound_array_id] = coord.core_bounds() + bds = coord.core_bounds() + arrays[bound_array_id] = bds + # if coord_type == "aux_coords_and_dims": + msg = f"Concat debug {coord_type}: {array_id} ==> pts={prarrrep(pts)}" + if bds is not None: + msg += f" bds={prarrrep(bds)}" + print(msg) else: arrays[array_id] = coord.core_data() + print(f"\nDebug concat {len(cubes)}:\n{cubes}\n:") for cube_signature in cube_signatures: if check_aux_coords: add_coords(cube_signature, "aux_coords_and_dims") diff --git a/lib/iris/_data_manager.py b/lib/iris/_data_manager.py index 00de2b5ae6..ba0ce1cee8 100644 --- a/lib/iris/_data_manager.py +++ b/lib/iris/_data_manager.py @@ -13,6 +13,8 @@ import iris.exceptions import iris.warnings +# Control iris "realisation" behaviour, default=True +_TOUCH_REALISES = False class DataManager: """Provides a well defined API for management of real or lazy data.""" @@ -143,16 +145,17 @@ def __repr__(self): def _assert_axioms(self): """Definition of the manager state, that should never be violated.""" # Ensure there is a valid data state. - is_lazy = self._lazy_array is not None - is_real = self._real_array is not None - - if not (is_lazy ^ is_real): - if is_lazy and is_real: - msg = "Unexpected data state, got both lazy and real data." - raise ValueError(msg) - elif self._shape is None: - msg = "Unexpected data state, got no lazy or real data, and no shape." - raise ValueError(msg) + if _TOUCH_REALISES: + is_lazy = self._lazy_array is not None + is_real = self._real_array is not None + + if not (is_lazy ^ is_real): + if is_lazy and is_real: + msg = "Unexpected data state, got both lazy and real data." + raise ValueError(msg) + elif self._shape is None: + msg = "Unexpected data state, got no lazy or real data, and no shape." + raise ValueError(msg) def _deepcopy(self, memo, data=None): """Perform a deepcopy of the :class:`~iris._data_manager.DataManager` instance. @@ -205,14 +208,23 @@ def data(self): :class:`~numpy.ndarray` or :class:`numpy.ma.core.MaskedArray` or ``None``. """ - if self.has_lazy_data(): + if ( + (_TOUCH_REALISES and self._lazy_array is not None) + or + ( + not _TOUCH_REALISES + and self._real_array is None + and self._lazy_array is not None + ) + ): try: # Realise the lazy data. result = as_concrete_data(self._lazy_array) # Assign the realised result. self._real_array = result - # Reset the lazy data and the realised dtype. - self._lazy_array = None + if _TOUCH_REALISES: + # Reset the lazy data and the realised dtype. + self._lazy_array = None except MemoryError: emsg = ( "Failed to realise the lazy data as there was not " From 4afecf34ad4e8daa206153bcbdcfa6b1d1c37bbe Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Thu, 8 May 2025 15:04:59 +0100 Subject: [PATCH 3/5] More WIP debug --- lib/iris/_concatenate.py | 4 +++- lib/iris/aux_factory.py | 5 +++++ lib/iris/cube.py | 4 ++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 7b3b18e574..12ad1d7079 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -40,7 +40,6 @@ _DECREASING = -1 _INCREASING = 1 - class _CoordAndDims(namedtuple("CoordAndDims", ["coord", "dims"])): """Container for a coordinate and the associated data dimension(s). @@ -755,7 +754,10 @@ def name_key_func(factory): return factory.name() for factory in sorted(cube.aux_factories, key=name_key_func): + import iris.aux_factory as iaf + iaf._CATCH_COORD_BUILD = True coord = factory.make_coord(cube.coord_dims) + iaf._CATCH_COORD_BUILD = False dims = factory.derived_dims(cube.coord_dims) self.derived_metadata.append(_CoordMetaData(coord, dims)) self.derived_coords_and_dims.append( diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index ae341c0976..6f87fe5296 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -16,6 +16,8 @@ import iris.coords from iris.warnings import IrisIgnoringBoundsWarning +_CATCH_COORD_BUILD = False + class AuxCoordFactory(CFVariableMixin, metaclass=ABCMeta): """Represents a "factory" which can manufacture additional auxiliary coordinate. @@ -624,6 +626,9 @@ def make_coord(self, coord_dims_func): """ # Which dimensions are relevant? + if _CATCH_COORD_BUILD: + self._t_dbg = True + derived_dims = self.derived_dims(coord_dims_func) dependency_dims = self._dependency_dims(coord_dims_func) diff --git a/lib/iris/cube.py b/lib/iris/cube.py index 77191c3a9a..96c18b47bd 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -2916,12 +2916,12 @@ def aux_coords(self) -> tuple[AuxCoord | DimCoord, ...]: @property def derived_coords(self) -> tuple[AuxCoord, ...]: """Return a tuple of all the coordinates generated by the coordinate factories.""" - return tuple( + return tuple([ factory.make_coord(self.coord_dims) for factory in sorted( self.aux_factories, key=lambda factory: factory.name() ) - ) + ]) @property def aux_factories(self) -> tuple[AuxCoordFactory, ...]: From 6b3730ac6163c9bb1607f5d3011d9231ad0c1fca Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Sat, 10 May 2025 00:30:23 +0100 Subject: [PATCH 4/5] Latest ideas: WKG WIP to be generalised. --- lib/iris/_concatenate.py | 16 ++++---- lib/iris/aux_factory.py | 81 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 12ad1d7079..7db5da19d2 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -605,15 +605,15 @@ def add_coords(cube_signature: _CubeSignature, coord_type: str) -> None: bound_array_id = _array_id(coord, bound=True) bds = coord.core_bounds() arrays[bound_array_id] = bds - # if coord_type == "aux_coords_and_dims": - msg = f"Concat debug {coord_type}: {array_id} ==> pts={prarrrep(pts)}" - if bds is not None: - msg += f" bds={prarrrep(bds)}" - print(msg) + # # if coord_type == "aux_coords_and_dims": + # msg = f"Concat debug {coord_type}: {array_id} ==> pts={prarrrep(pts)}" + # if bds is not None: + # msg += f" bds={prarrrep(bds)}" + # print(msg) else: arrays[array_id] = coord.core_data() - print(f"\nDebug concat {len(cubes)}:\n{cubes}\n:") + # print(f"\nDebug concat {len(cubes)}:\n{cubes}\n:") for cube_signature in cube_signatures: if check_aux_coords: add_coords(cube_signature, "aux_coords_and_dims") @@ -755,9 +755,9 @@ def name_key_func(factory): for factory in sorted(cube.aux_factories, key=name_key_func): import iris.aux_factory as iaf - iaf._CATCH_COORD_BUILD = True + # iaf._CATCH_COORD_BUILD = True coord = factory.make_coord(cube.coord_dims) - iaf._CATCH_COORD_BUILD = False + # iaf._CATCH_COORD_BUILD = False dims = factory.derived_dims(cube.coord_dims) self.derived_metadata.append(_CoordMetaData(coord, dims)) self.derived_coords_and_dims.append( diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index 6f87fe5296..4df7c06db5 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -92,6 +92,37 @@ def make_coord(self, coord_dims_func): """ + def calculation(self, *dependencies): + """Construct actual values array from dependencies. + + Parameters + ---------- + dependencies : list of array-like + arrays of values, matching the dependencies in order, but all with the same + aligned dimensions. + + Result + ------ + array : dask.Array + a lazy array, of same dimensions as inputs + + Notes + ----- + Called for both points and bounds values. The result may be adjusted + """ + pass + + # @abstractmethod + def _inner_calc(self, *dependencies): + """Calculate result values from dependencies. + + Must be implemented by inheritor, to perform the essential calculation + characteristic of the particular hybrid coordinate type. + + This is called by "calculation", which will operate on the result to + """ + + def update(self, old_coord, new_coord=None): """Notify the factory of the removal/replacement of a coordinate. @@ -610,9 +641,55 @@ def dependencies(self): "orography": self.orography, } - def _derive(self, delta, sigma, orography): + def _inner_calc(self, delta, sigma, orography): return delta + sigma * orography + def _derive(self, *dependency_arrays): + result = self._inner_calc(*dependency_arrays) + + # The dims of all the given components should be the same and, **presumably**, + # the same as the result ?? + for i_dep, (dep, name) in enumerate(zip(dependency_arrays, self.dependencies.keys())): + if dep.ndim != result.ndim: + msg = ( + f"Dependency #{i_dep}, '{name}' has ndims={dep.ndim}, " + "not matching result {result.ndim!r}" + " (shapes {dep.shape}/{result.shape})." + ) + raise ValueError(msg) + + # See if we need to improve on the chunking of the result + from iris._lazy_data import _optimum_chunksize + adjusted_chunks = _optimum_chunksize( + chunks=result.chunksize, + shape=result.shape, + dtype=result.dtype, + limit=60.e6, # arbitrary for testing -- just for now + ) + + if adjusted_chunks != result.chunksize: + # Re-do the result calculation, re-chunking the inputs along dimensions + # which it is suggested to reduce. + # First make a (writable) copy of the inputs..... + new_deps = [] + for i_dep, dep in enumerate(dependency_arrays): + # Reduce each dependency chunksize to the result chunksize if smaller. + dep_chunks = dep.chunksize + new_chunks = tuple([ + min(dep_chunk, adj_chunk) + for dep_chunk, adj_chunk in zip(dep_chunks, adjusted_chunks) + ]) + # If the dep chunksize was reduced, replace with a rechunked version. + if new_chunks != dep_chunks: + dep = dep.rechunk(new_chunks) + new_deps.append(dep) + + # Finally, re-do the calculation, which hopefully results in a better + # overall chunking for the result + result = self._inner_calc(*new_deps) + + return result + def make_coord(self, coord_dims_func): """Return a new :class:`iris.coords.AuxCoord` as defined by this factory. @@ -637,7 +714,7 @@ def make_coord(self, coord_dims_func): points = self._derive( nd_points_by_key["delta"], nd_points_by_key["sigma"], - nd_points_by_key["orography"], + nd_points_by_key["orography"] ) bounds = None From 0818c81ef938dbfe75900ba6053c0242fe762c31 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Tue, 13 May 2025 14:44:50 +0100 Subject: [PATCH 5/5] Make HH implementation rechunk switchable. --- lib/iris/aux_factory.py | 79 +++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index 4df7c06db5..728965a9ce 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -641,52 +641,55 @@ def dependencies(self): "orography": self.orography, } + _RECHUNK_DERIVED = True + def _inner_calc(self, delta, sigma, orography): return delta + sigma * orography def _derive(self, *dependency_arrays): result = self._inner_calc(*dependency_arrays) - # The dims of all the given components should be the same and, **presumably**, - # the same as the result ?? - for i_dep, (dep, name) in enumerate(zip(dependency_arrays, self.dependencies.keys())): - if dep.ndim != result.ndim: - msg = ( - f"Dependency #{i_dep}, '{name}' has ndims={dep.ndim}, " - "not matching result {result.ndim!r}" - " (shapes {dep.shape}/{result.shape})." - ) - raise ValueError(msg) + if self._RECHUNK_DERIVED: + # The dims of all the given components should be the same and, **presumably**, + # the same as the result ?? + for i_dep, (dep, name) in enumerate(zip(dependency_arrays, self.dependencies.keys())): + if dep.ndim != result.ndim: + msg = ( + f"Dependency #{i_dep}, '{name}' has ndims={dep.ndim}, " + "not matching result {result.ndim!r}" + " (shapes {dep.shape}/{result.shape})." + ) + raise ValueError(msg) - # See if we need to improve on the chunking of the result - from iris._lazy_data import _optimum_chunksize - adjusted_chunks = _optimum_chunksize( - chunks=result.chunksize, - shape=result.shape, - dtype=result.dtype, - limit=60.e6, # arbitrary for testing -- just for now - ) + # See if we need to improve on the chunking of the result + from iris._lazy_data import _optimum_chunksize + adjusted_chunks = _optimum_chunksize( + chunks=result.chunksize, + shape=result.shape, + dtype=result.dtype, + limit=60.e6, # arbitrary for testing -- just for now + ) - if adjusted_chunks != result.chunksize: - # Re-do the result calculation, re-chunking the inputs along dimensions - # which it is suggested to reduce. - # First make a (writable) copy of the inputs..... - new_deps = [] - for i_dep, dep in enumerate(dependency_arrays): - # Reduce each dependency chunksize to the result chunksize if smaller. - dep_chunks = dep.chunksize - new_chunks = tuple([ - min(dep_chunk, adj_chunk) - for dep_chunk, adj_chunk in zip(dep_chunks, adjusted_chunks) - ]) - # If the dep chunksize was reduced, replace with a rechunked version. - if new_chunks != dep_chunks: - dep = dep.rechunk(new_chunks) - new_deps.append(dep) - - # Finally, re-do the calculation, which hopefully results in a better - # overall chunking for the result - result = self._inner_calc(*new_deps) + if adjusted_chunks != result.chunksize: + # Re-do the result calculation, re-chunking the inputs along dimensions + # which it is suggested to reduce. + # First make a (writable) copy of the inputs..... + new_deps = [] + for i_dep, dep in enumerate(dependency_arrays): + # Reduce each dependency chunksize to the result chunksize if smaller. + dep_chunks = dep.chunksize + new_chunks = tuple([ + min(dep_chunk, adj_chunk) + for dep_chunk, adj_chunk in zip(dep_chunks, adjusted_chunks) + ]) + # If the dep chunksize was reduced, replace with a rechunked version. + if new_chunks != dep_chunks: + dep = dep.rechunk(new_chunks) + new_deps.append(dep) + + # Finally, re-do the calculation, which hopefully results in a better + # overall chunking for the result + result = self._inner_calc(*new_deps) return result