Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion mapchete_eo/array/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import numpy.ma as ma
from numpy.typing import DTypeLike
import xarray as xr
from mapchete.types import NodataVal

Expand All @@ -19,7 +20,9 @@


def to_masked_array(
xarr: Union[xr.Dataset, xr.DataArray], copy: bool = False
xarr: Union[xr.Dataset, xr.DataArray],
copy: bool = False,
out_dtype: Optional[DTypeLike] = None,
) -> ma.MaskedArray:
"""Convert xr.DataArray to ma.MaskedArray."""
if isinstance(xarr, xr.Dataset):
Expand All @@ -31,6 +34,9 @@ def to_masked_array(
"Cannot create masked_array because DataArray fill value is None"
)

if out_dtype:
xarr = xarr.astype(out_dtype, copy=False)

if xarr.dtype in _NUMPY_FLOAT_DTYPES:
return ma.masked_values(xarr, fill_value, copy=copy, shrink=False)
else:
Expand Down
30 changes: 29 additions & 1 deletion mapchete_eo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import croniter
from mapchete import Bounds
import numpy as np
import numpy.ma as ma
import xarray as xr
from dateutil.tz import tzutc
Expand All @@ -18,6 +19,8 @@
from mapchete.types import MPathLike, NodataVal, NodataVals
from pydantic import BaseModel
from rasterio.enums import Resampling
from rasterio.features import geometry_mask
from shapely.geometry import mapping
from shapely.geometry.base import BaseGeometry

from mapchete_eo.archives.base import Archive
Expand Down Expand Up @@ -62,6 +65,7 @@ class EODataCube(base.InputTile):
eo_bands: dict
time: List[TimeRange]
area: BaseGeometry
area_pixelbuffer: int = 0

def __init__(
self,
Expand Down Expand Up @@ -367,6 +371,29 @@ def default_read_values(
nodatavals=nodatavals,
merge_products_by=merge_products_by,
merge_method=merge_method,
read_mask=self.get_read_mask(),
)

def get_read_mask(self) -> np.ndarray:
"""
Determine read mask according to input area.

This will generate a numpy array where pixel overlapping the input area
are set True and thus will get filled by the read function. Pixel outside
of the area are not considered for reading.

On staged reading, i.e. first checking the product masks to assess valid
pixels, this will avoid reading product bands in cases the product only covers
pixels outside of the intended reading area.
"""
area = self.area.buffer(self.area_pixelbuffer * self.tile.pixel_x_size)
if area.is_empty:
return np.zeros((self.tile.shape), dtype=bool)
return geometry_mask(
geometries=[mapping(area)],
out_shape=self.tile.shape,
transform=self.tile.transform,
invert=True,
)


Expand Down Expand Up @@ -443,8 +470,9 @@ def _init_area(self, input_params: dict) -> BaseGeometry:
input_params.get("delimiters", {}).get("bounds"),
crs=getattr(input_params.get("pyramid"), "crs"),
),
raise_if_empty=False,
)
return process_area.intersection(
process_area = process_area.intersection(
reproject_geometry(
configured_area,
src_crs=configured_area_crs or self.crs,
Expand Down
8 changes: 5 additions & 3 deletions mapchete_eo/io/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def item_to_np_array(
return out


def expand_params(param, length):
def expand_params(param: Any, length: int) -> List[Any]:
"""
Expand parameters if they are not a list.
"""
Expand Down Expand Up @@ -104,8 +104,10 @@ def get_item_property(
| ``collection`` | The collection ID of an Item's collection. |
+--------------------+--------------------------------------------------------+
"""
if property in ["year", "month", "day", "date", "datetime"]:
if item.datetime is None:
if property == "id":
return item.id
elif property in ["year", "month", "day", "date", "datetime"]:
if item.datetime is None: # pragma: no cover
raise ValueError(
f"STAC item has no datetime attached, thus cannot get property {property}"
)
Expand Down
101 changes: 66 additions & 35 deletions mapchete_eo/io/levelled_cubes.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,50 @@ def read_levelled_cube_to_np_array(
raise_empty: bool = True,
out_dtype: DTypeLike = np.uint16,
out_fill_value: NodataVal = 0,
read_mask: Optional[np.ndarray] = None,
) -> ma.MaskedArray:
"""
Read products as slices into a cube by filling up nodata gaps with next slice.

If a read_mask is provided, only the pixels marked True are considered to be read.
"""
if len(products) == 0:
if len(products) == 0: # pragma: no cover
raise NoSourceProducts("no products to read")

bands = assets or eo_bands
if bands is None:
if bands is None: # pragma: no cover
raise ValueError("either assets or eo_bands have to be set")

out_shape = (target_height, len(bands), *grid.shape)

# 2D read_mask shape
if read_mask is None:
read_mask = np.ones(grid.shape, dtype=bool)
elif read_mask.ndim != 2: # pragma: no cover
raise ValueError(
"read_mask must be 2-dimensional, not %s-dimensional",
read_mask.ndim,
)
out: ma.MaskedArray = ma.masked_array(
data=np.zeros(out_shape, dtype=out_dtype),
mask=np.ones(out_shape, dtype=out_dtype),
data=np.full(out_shape, out_fill_value, dtype=out_dtype),
mask=np.ones(out_shape, dtype=bool),
fill_value=out_fill_value,
)

if not read_mask.any():
logger.debug("nothing to read")
return out

# extrude mask to match each layer
layer_read_mask = np.stack([read_mask for _ in bands])

def _cube_read_mask() -> np.ndarray:
# This is only needed for debug output, thus there is no need to materialize always
return np.stack([layer_read_mask for _ in range(target_height)])

logger.debug(
"empty cube with shape %s has %s",
"empty cube with shape %s has %s and %s pixels to be filled",
out.shape,
pretty_bytes(out.size * out.itemsize),
_cube_read_mask().sum(),
)

logger.debug("sort products into slices ...")
Expand All @@ -76,25 +99,25 @@ def read_levelled_cube_to_np_array(
slices_read_count, slices_skip_count = 0, 0

# pick slices one by one
for slice_count, slice in enumerate(slices, 1):
for slice_count, slice_ in enumerate(slices, 1):
# all filled up? let's get outta here!
if not out.mask.any():
logger.debug("cube is full, quitting!")
logger.debug("cube has no pixels to be filled, quitting!")
break

# generate 2D mask of holes to be filled in output cube
cube_nodata_mask = out.mask.any(axis=0).any(axis=0)
cube_nodata_mask = np.logical_and(out.mask.any(axis=0).any(axis=0), read_mask)

# read slice
try:
logger.debug(
"see if slice %s %s has some of the %s unmasked pixels for cube",
slice_count,
slice,
slice_,
cube_nodata_mask.sum(),
)
with slice.cached():
slice_array = slice.read(
with slice_.cached():
slice_array = slice_.read(
merge_method=merge_method,
product_read_kwargs=dict(
product_read_kwargs,
Expand All @@ -104,17 +127,18 @@ def read_levelled_cube_to_np_array(
resampling=resampling,
nodatavals=nodatavals,
raise_empty=raise_empty,
target_mask=~cube_nodata_mask.copy(),
read_mask=cube_nodata_mask.copy(),
out_dtype=out_dtype,
),
)
slices_read_count += 1
except (EmptySliceException, CorruptedSlice) as exc:
logger.debug("skipped slice %s: %s", slice, str(exc))
logger.debug("skipped slice %s: %s", slice_, str(exc))
slices_skip_count += 1
continue

# if slice was not empty, fill pixels into cube
logger.debug("add slice %s array to cube", slice)
logger.debug("add slice %s array to cube", slice_)

# iterate through layers of cube
for layer_index in range(target_height):
Expand All @@ -124,34 +148,35 @@ def read_levelled_cube_to_np_array(
continue

# determine empty patches of current layer
empty_patches = out[layer_index].mask.copy()
pixels_for_layer = (~slice_array[empty_patches].mask).sum()
empty_patches = np.logical_and(out[layer_index].mask, layer_read_mask)
remaining_pixels_for_layer = (~slice_array[empty_patches].mask).sum()

# when slice has nothing to offer for this layer, skip
if pixels_for_layer == 0:
if remaining_pixels_for_layer == 0:
logger.debug(
"layer %s: slice has no pixels for this layer, jump to next",
layer_index,
)
continue

# insert slice data into empty patches of layer
logger.debug(
"layer %s: fill with %s pixels ...",
layer_index,
pixels_for_layer,
remaining_pixels_for_layer,
)
# insert slice data into empty patches of layer
out[layer_index][empty_patches] = slice_array[empty_patches]
masked_pixels = out[layer_index].mask.sum()
total_pixels = out[layer_index].size
percent_full = round(
100 * ((total_pixels - masked_pixels) / total_pixels), 2
)

# report on layer fill status
logger.debug(
"layer %s: %s%% filled (%s empty pixels remaining)",
"layer %s: %s",
layer_index,
percent_full,
out[layer_index].mask.sum(),
_percent_full(
remaining=np.logical_and(
out[layer_index].mask, layer_read_mask
).sum(),
total=layer_read_mask.sum(),
),
)

# remove slice values which were just inserted for next layer
Expand All @@ -161,13 +186,13 @@ def read_levelled_cube_to_np_array(
logger.debug("slice fully inserted into cube, skipping")
break

masked_pixels = out.mask.sum()
total_pixels = out.size
percent_full = round(100 * ((total_pixels - masked_pixels) / total_pixels), 2)
# report on layer fill status
logger.debug(
"cube is %s%% filled (%s empty pixels remaining)",
percent_full,
masked_pixels,
"cube is %s",
_percent_full(
remaining=np.logical_and(out.mask, _cube_read_mask()).sum(),
total=_cube_read_mask().sum(),
),
)

logger.debug(
Expand Down Expand Up @@ -197,6 +222,7 @@ def read_levelled_cube_to_xarray(
band_axis_name: str = "bands",
x_axis_name: str = "x",
y_axis_name: str = "y",
read_mask: Optional[np.ndarray] = None,
) -> xr.Dataset:
"""
Read products as slices into a cube by filling up nodata gaps with next slice.
Expand All @@ -218,6 +244,7 @@ def read_levelled_cube_to_xarray(
sort=sort,
product_read_kwargs=product_read_kwargs,
raise_empty=raise_empty,
read_mask=read_mask,
),
slice_names=[f"layer-{ii}" for ii in range(target_height)],
band_names=variables,
Expand All @@ -226,3 +253,7 @@ def read_levelled_cube_to_xarray(
x_axis_name=x_axis_name,
y_axis_name=y_axis_name,
)


def _percent_full(remaining: int, total: int, ndigits: int = 2) -> str:
return f"{round(100 * (total - remaining) / total, ndigits=ndigits)}% full ({remaining} remaining emtpy pixels)"
Loading