Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/environment-py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies:
- python=3.7
- aiohttp
- boto3
- exifread
- flask
- h5netcdf
- intake
Expand All @@ -13,7 +14,7 @@ dependencies:
- pydap
- pytest
- rasterio
- s3fs
- s3fs >= 2021.08.0
- scikit-image
- xarray >= 0.17
- zarr
Expand Down
3 changes: 2 additions & 1 deletion ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies:
- python=3.8
- aiohttp
- boto3
- exifread
- flask
- h5netcdf
- intake
Expand All @@ -13,7 +14,7 @@ dependencies:
- pydap
- pytest
- rasterio
- s3fs
- s3fs >= 2021.08.0
- scikit-image
- xarray >= 0.17
- zarr
Expand Down
3 changes: 2 additions & 1 deletion ci/environment-py39.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies:
- python=3.9
- aiohttp
- boto3
- exifread
- flask
- h5netcdf
- intake
Expand All @@ -13,7 +14,7 @@ dependencies:
- pydap
- pytest
- rasterio
- s3fs
- s3fs >= 2021.08.0
- scikit-image
- xarray >= 0.17
- zarr
Expand Down
3 changes: 2 additions & 1 deletion ci/environment-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ dependencies:
- python
- aiohttp
- boto3
- exifread
- flask
- h5netcdf
- netcdf4
- pip
- pydap
- pytest
- rasterio
- s3fs
- s3fs >= 2021.08.0
- scikit-image
- zarr
- pip:
Expand Down
180 changes: 148 additions & 32 deletions intake_xarray/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def _coerce_shape(array, shape):
return new_array


def _add_leading_dimension(x):
"""Add a new dimension to an array-like"""
return x[None, ...]


def _dask_imread(files, imread=None, preprocess=None, coerce_shape=None):
""" Read a stack of images into a dask array """
from dask.array import Array
Expand All @@ -63,9 +68,6 @@ def _imread(open_file):
with open_file as f:
return imread(f)

def add_leading_dimension(x):
return x[None, ...]

filenames = [f.path for f in files]

name = 'imread-%s' % tokenize(filenames)
Expand All @@ -85,23 +87,23 @@ def add_leading_dimension(x):

if coerce_shape is not None:
if preprocess:
values = [(add_leading_dimension,
values = [(_add_leading_dimension,
(preprocess,
(reshape,
(_imread, f))))
for f in files]
else:
values = [(add_leading_dimension,
values = [(_add_leading_dimension,
(reshape,
(_imread, f)))
for f in files]
elif preprocess:
values = [(add_leading_dimension,
values = [(_add_leading_dimension,
(preprocess,
(_imread, f)))
for f in files]
else:
values = [(add_leading_dimension,
values = [(_add_leading_dimension,
(_imread, f))
for f in files]
dsk = dict(zip(keys, values))
Expand All @@ -111,7 +113,54 @@ def add_leading_dimension(x):
return Array(dsk, name, chunks, sample.dtype)


def reader(file, chunks, imread=None, preprocess=None, coerce_shape=None):
def _dask_exifread(files, exif_tags):
"""Construct a dask Array to read each tag in `exif_tags` (list of
str) from the EXIF data of the images in `files`
"""
from numpy import array
from dask.array import Array
from dask.base import tokenize
from exifread import process_file as read_exif

def _read_exif(open_file):
# Using the context manager (as below) occasionally results
# in 'I/O operation on closed file' and similar errors
# with open_file as f:
# return read_exif(f)
#
f = open_file.open()
return read_exif(f)

if not isinstance(exif_tags, list):
sample = _read_exif(files[0])
exif_tags = sample.keys()

ntags = len(exif_tags)

def extract_tags(d):
return array([d.get(tag) for tag in exif_tags])

filenames = [f.path for f in files]
name = 'exifread-%s' % tokenize(filenames)

keys = [(name, i, 0) for i in range(len(files))]
values = [(_add_leading_dimension,
(extract_tags,
(_read_exif, f)))
for f in files]

dsk = dict(zip(keys, values))

chunks = ((1,) * len(files), (ntags,))

exif_data = Array(dsk, name, chunks, object)

return {'EXIF ' + tag: exif_data[:,i] for i, tag in enumerate(exif_tags)}


def reader(
file, chunks, imread=None, preprocess=None, coerce_shape=None, exif_tags=None
):
"""Read a file object and output an dask xarray object

NOTE: inspired by dask.array.image.imread but altering the input to accept
Expand All @@ -135,14 +184,24 @@ def reader(file, chunks, imread=None, preprocess=None, coerce_shape=None):
coerce_shape : tuple len 2 (optional)
Optionally coerce the shape of the height and width of the image
by setting `coerce_shape` to desired shape.
exif_tags : boolean or list of str (optional)
Controls whether exif tags are extracted from the images. If a
list, the elements are treated as the particular tags to
extract from each image. For any other truthy value, all tags
that were able to be extracted from a sample image are used.
When tags are extracted, an xarray Dataset is returned, with
each exif tag in a corresponding data variable of the Dataset,
(of type `Optional[exifread.classes.IfdTag]`), and the image
data in a data variable 'raster'.

Returns
-------
Dask xarray.DataArray of the image. Treated as one chunk unless
chunks kwarg is specified.
Dask xarray.DataArray or xarray.Dataset of the image, and
(optionally) the value of any requested EXIF tags. Treated as one
chunk unless chunks kwarg is specified.
"""
import numpy as np
from xarray import DataArray
from xarray import DataArray, Dataset

if not imread:
from skimage.io import imread
Expand All @@ -164,10 +223,22 @@ def reader(file, chunks, imread=None, preprocess=None, coerce_shape=None):
coords['channel'] = np.arange(nchannel)
dims += ('channel',)

return DataArray(array, coords=coords, dims=dims).chunk(chunks=chunks)
if exif_tags:
exif_dict = _dask_exifread([file], exif_tags)
exif_dict_ds = {tag: ((), arr[0]) for tag, arr in exif_dict.items()}

return Dataset(
{
'raster': (dims, array),
**exif_dict_ds,
},
coords=coords,
).chunk(chunks=chunks)
else:
return DataArray(array, coords=coords, dims=dims).chunk(chunks=chunks)


def multireader(files, chunks, concat_dim, **kwargs):
def multireader(files, chunks, concat_dim, exif_tags, **kwargs):
"""Read a stack of images into a dask xarray object

NOTE: copied from dask.array.image.imread but altering the input to accept
Expand Down Expand Up @@ -196,32 +267,58 @@ def multireader(files, chunks, concat_dim, **kwargs):
coerce_shape : iterable of len 2 (optional)
Optionally coerce the shape of the height and width of the image
by setting `coerce_shape` to desired shape.
exif_tags : boolean or list of str (optional)
Controls whether exif tags are extracted from the images. If a
list, the elements are treated as the particular tags to
extract from each image. For any other truthy value, all tags
that were able to be extracted from a sample image are used.
When tags are extracted, an xarray Dataset is returned, with
each exif tag in a corresponding data variable of the Dataset,
(of type `Optional[exifread.classes.IfdTag]`), and the image
data in a data variable 'raster'.

Returns
-------
Dask xarray.DataArray of all images stacked along the first dimension.
All images will be treated as individual chunks unless
chunks kwarg is specified.
A Dask xarray.DataArray or xarray.Dataset, of all images stacked
along the first dimension, and (optionally) the value of any
requested EXIF tags. All images will be treated as individual
chunks unless chunks kwarg is specified.
"""
import numpy as np
from xarray import DataArray
from xarray import DataArray, Dataset

dask_array = _dask_imread(files, **kwargs)

ny, nx = dask_array.shape[1:3]
coords = {'y': np.arange(ny),
'x': np.arange(nx)}
if isinstance(concat_dim, list):
dims = ('dim_0', 'y', 'x')
dims = ('dim_0',)
else:
dims = (concat_dim, 'y', 'x')
dims = (concat_dim,)
coords = {concat_dim: np.arange(dask_array.shape[0]),
**coords}

raster_dims = dims + ('y', 'x')
if len(dask_array.shape) == 4:
nchannel = dask_array.shape[3]
coords['channel'] = np.arange(nchannel)
dims += ('channel',)

return DataArray(dask_array, coords=coords, dims=dims).chunk(chunks=chunks)
raster_dims += ('channel',)

if exif_tags:
exif_dict = _dask_exifread(files, exif_tags)
exif_dict_ds = {tag: (dims, arr) for tag, arr in exif_dict.items()}
return Dataset(
{
'raster': (raster_dims, dask_array),
**exif_dict_ds,
},
coords=coords,
).chunk(chunks=chunks)
else:
return DataArray(
dask_array, coords=coords, dims=raster_dims
).chunk(chunks=chunks)


class ImageSource(DataSourceMixin, PatternMixin):
Expand Down Expand Up @@ -268,17 +365,28 @@ class ImageSource(DataSourceMixin, PatternMixin):
coerce_shape : iterable of len 2 (optional)
Optionally coerce the shape of the height and width of the image
by setting `coerce_shape` to desired shape.
exif_tags : boolean or list of str (optional)
Controls whether exif tags are extracted from the images. If a
list, the elements are treated as the particular tags to
extract from each image. For any other truthy value, all tags
that were able to be extracted from a sample image are used.
When tags are extracted, an xarray Dataset is returned, with
each exif tag in a corresponding data variable of the Dataset,
(of type `Optional[exifread.classes.IfdTag]`), and the image
data in a data variable 'raster'.

"""
name = 'xarray_image'

def __init__(self, urlpath, chunks=None, concat_dim='concat_dim',
metadata=None, path_as_pattern=True,
storage_options=None, **kwargs):
storage_options=None, exif_tags=None, **kwargs):
self.path_as_pattern = path_as_pattern
self.urlpath = urlpath
self.chunks = chunks
self.concat_dim = concat_dim
self.storage_options = storage_options or {}
self.exif_tags = exif_tags
self._kwargs = kwargs
self._ds = None
super(ImageSource, self).__init__(metadata=metadata)
Expand All @@ -297,7 +405,9 @@ def _open_files(self, files):
import pandas as pd
from xarray import DataArray

out = multireader(files, self.chunks, self.concat_dim, **self._kwargs)
out = multireader(
files, self.chunks, self.concat_dim, self.exif_tags, **self._kwargs
)
if not self.pattern:
return out

Expand Down Expand Up @@ -325,7 +435,7 @@ def _open_files(self, files):
k: DataArray(v, dims=self.concat_dim)
for k, v in field_values.items()
}
return out.assign_coords(**coords).chunk(self.chunks)
return out.assign_coords(**coords).chunk(self.chunks).unify_chunks()

def _open_dataset(self):
"""
Expand All @@ -338,7 +448,9 @@ def _open_dataset(self):
if len(files) == 0:
raise Exception("No files found at {}".format(self.urlpath))
if len(files) == 1:
self._ds = reader(files[0], self.chunks, **self._kwargs)
self._ds = reader(
files[0], self.chunks, exif_tags=self.exif_tags, **self._kwargs
)
else:
self._ds = self._open_files(files)

Expand All @@ -353,8 +465,12 @@ def _get_schema(self):
if self._ds is None:
self._open_dataset()

# convert to dataset for serialization
ds2 = xr.Dataset({'raster': self._ds})
# coerce to dataset for serialization
if isinstance(self._ds, xr.Dataset):
ds2 = self._ds
else:
ds2 = xr.Dataset({'raster': self._ds})

metadata = {
'dims': dict(ds2.dims),
'data_vars': {k: list(ds2[k].coords)
Expand All @@ -364,7 +480,7 @@ def _get_schema(self):
}
if getattr(self, 'on_server', False):
metadata['internal'] = serialize_zarr_ds(ds2)
for k, v in self._ds.attrs.items():
for k, v in ds2.raster.attrs.items():
try:
# ensure only sending serializable attrs from remote
msgpack.packb(v)
Expand All @@ -373,9 +489,9 @@ def _get_schema(self):
pass
self._schema = Schema(
datashape=None,
dtype=str(self._ds.dtype),
shape=self._ds.shape,
npartitions=self._ds.data.npartitions,
dtype=str(ds2.raster.dtype),
shape=ds2.raster.shape,
npartitions=ds2.raster.data.npartitions,
extra_metadata=metadata)

return self._schema
Loading