Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 72 additions & 33 deletions src/CSET/operators/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

"""Operators to aggregate across either 1 or 2 dimensions."""

import logging

import iris
import iris.analysis
import iris.coord_categorisation
import iris.cube
import iris.exceptions
import iris.util
import isodate

from CSET._common import iter_maybe
Expand Down Expand Up @@ -88,32 +92,29 @@ def time_aggregate(


def ensure_aggregatable_across_cases(
cube: iris.cube.Cube | iris.cube.CubeList,
) -> iris.cube.Cube:
cubes: iris.cube.Cube | iris.cube.CubeList,
) -> iris.cube.CubeList:
"""Ensure a Cube or CubeList can be aggregated across multiple cases.

The cubes are grouped into buckets of compatible cubes, then each bucket is
converted into a single aggregatable cube with ``forecast_period`` and
``forecast_reference_time`` dimension coordinates.

Arguments
---------
cube: iris.cube.Cube | iris.cube.CubeList
If a Cube is provided it is checked to determine if it has the
the necessary dimensional coordinates to be aggregatable.
These necessary coordinates are 'forecast_period' and
'forecast_reference_time'.If a CubeList is provided a Cube is created
by slicing over all time coordinates and the resulting list is merged
to create an aggregatable cube.
cubes: iris.cube.Cube | iris.cube.CubeList
Each cube is checked to determine if it has the the necessary
dimensional coordinates to be aggregatable, being processed if needed.

Returns
-------
cube: iris.cube.Cube
A time aggregatable cube with dimension coordinates including
'forecast_period' and 'forecast_reference_time'.
cubes: iris.cube.CubeList
A CubeList of time aggregatable cubes.

Raises
------
ValueError
If a Cube is provided and it is not aggregatable a ValueError is
raised. The user should then provide a CubeList to be turned into an
aggregatable cube to allow aggregation across multiple cases to occur.
If any of the provided cubes cannot be made aggregatable.

Notes
-----
Expand All @@ -123,27 +124,65 @@ def ensure_aggregatable_across_cases(
to ensure that the full dataset can be loaded as a single cube. This
functionality is particularly useful for percentiles, Q-Q plots, and
histograms.

The necessary dimension coordinates for a cube to be aggregatable are
``forecast_period`` and ``forecast_reference_time``.
"""
# Check to see if a cube is input and if that cube is iterable.
if isinstance(cube, iris.cube.Cube):
if is_time_aggregatable(cube):
return cube
else:

# Group compatible cubes.
class Buckets:
def __init__(self):
self.buckets = []

def add(self, cube: iris.cube.Cube):
"""Add a cube into a bucket.

If the cube is compatible with an existing bucket it is added there.
Otherwise it gets its own bucket.
"""
for bucket in self.buckets:
if bucket[0].is_compatible(cube):
bucket.append(cube)
return
self.buckets.append(iris.cube.CubeList([cube]))

def get_buckets(self) -> list[iris.cube.CubeList]:
return self.buckets

b = Buckets()
for cube in iter_maybe(cubes):
b.add(cube)
buckets = b.get_buckets()

logging.debug("Buckets:\n%s", "\n---\n".join(str(b) for b in buckets))

# Ensure each bucket is a single aggregatable cube.
aggregatable_cubes = iris.cube.CubeList()
for bucket in buckets:
# Single cubes that are already aggregatable won't need processing.
if len(bucket) == 1 and is_time_aggregatable(bucket[0]):
aggregatable_cube = bucket[0]
aggregatable_cubes.append(aggregatable_cube)
continue

# Create an aggregatable cube from the provided CubeList.
to_merge = iris.cube.CubeList()
for cube in bucket:
to_merge.extend(
cube.slices_over(["forecast_period", "forecast_reference_time"])
)
logging.debug("Cubes to merge:\n%s", to_merge)
aggregatable_cube = to_merge.merge_cube()

# Verify cube is now aggregatable.
if not is_time_aggregatable(aggregatable_cube):
raise ValueError(
"Single Cube should have 'forecast_period' and"
"'forecast_reference_time' dimensional coordinates. "
"To make a time aggregatable Cube input a CubeList."
"Cube should have 'forecast_period' and 'forecast_reference_time' dimension coordinates.",
aggregatable_cube,
)
# Create an aggregatable cube from the provided CubeList.
else:
new_cube_list = iris.cube.CubeList()
for sub_cube in cube:
for cube_slice in sub_cube.slices_over(
["forecast_period", "forecast_reference_time"]
):
new_cube_list.append(cube_slice)
new_merged_cube = new_cube_list.merge_cube()
return new_merged_cube
aggregatable_cubes.append(aggregatable_cube)

return aggregatable_cubes


def add_hour_coordinate(
Expand Down
Loading