|
1 | | -from . import __version__ |
2 | | -from intake.source.base import DataSource, Schema |
| 1 | +class IntakeXarraySourceAdapter: |
| 2 | + container = "xarray" |
| 3 | + name = "xarray" |
| 4 | + version = "" |
3 | 5 |
|
| 6 | + def to_dask(self): |
| 7 | + if "chunks" not in self.reader.kwargs: |
| 8 | + return self.reader(chunks={}).read() |
| 9 | + else: |
| 10 | + return self.reader.read() |
4 | 11 |
|
5 | | -class DataSourceMixin(DataSource): |
6 | | - """Common behaviours for plugins in this repo""" |
7 | | - version = __version__ |
8 | | - container = 'xarray' |
9 | | - partition_access = True |
10 | | - |
11 | | - def _get_schema(self): |
12 | | - """Make schema object, which embeds xarray object and some details""" |
13 | | - from .xarray_container import serialize_zarr_ds |
14 | | - |
15 | | - self.urlpath = self._get_cache(self.urlpath)[0] |
16 | | - |
17 | | - if self._ds is None: |
18 | | - self._open_dataset() |
| 12 | + def __call__(self, *args, **kwargs): |
| 13 | + return self |
19 | 14 |
|
20 | | - metadata = { |
21 | | - 'dims': dict(self._ds.dims), |
22 | | - 'data_vars': {k: list(self._ds[k].coords) |
23 | | - for k in self._ds.data_vars.keys()}, |
24 | | - 'coords': tuple(self._ds.coords.keys()), |
25 | | - } |
26 | | - if getattr(self, 'on_server', False): |
27 | | - metadata['internal'] = serialize_zarr_ds(self._ds) |
28 | | - metadata.update(self._ds.attrs) |
29 | | - self._schema = Schema( |
30 | | - datashape=None, |
31 | | - dtype=None, |
32 | | - shape=None, |
33 | | - npartitions=None, |
34 | | - extra_metadata=metadata) |
35 | | - return self._schema |
| 15 | + get = __call__ |
36 | 16 |
|
37 | 17 | def read(self): |
38 | | - """Return a version of the xarray with all the data in memory""" |
39 | | - self._load_metadata() |
40 | | - return self._ds.load() |
| 18 | + return self.reader(chunks=None).read() |
41 | 19 |
|
42 | | - def read_chunked(self): |
43 | | - """Return xarray object (which will have chunks)""" |
44 | | - self._load_metadata() |
45 | | - return self._ds |
46 | | - |
47 | | - def read_partition(self, i): |
48 | | - """Fetch one chunk of data at tuple index i |
49 | | - """ |
50 | | - import numpy as np |
51 | | - self._load_metadata() |
52 | | - if not isinstance(i, (tuple, list)): |
53 | | - raise TypeError('For Xarray sources, must specify partition as ' |
54 | | - 'tuple') |
55 | | - if isinstance(i, list): |
56 | | - i = tuple(i) |
57 | | - if hasattr(self._ds, 'variables') or i[0] in self._ds.coords: |
58 | | - arr = self._ds[i[0]].data |
59 | | - i = i[1:] |
60 | | - else: |
61 | | - arr = self._ds.data |
62 | | - if isinstance(arr, np.ndarray): |
63 | | - return arr |
64 | | - # dask array |
65 | | - return arr.blocks[i].compute() |
66 | | - |
67 | | - def to_dask(self): |
68 | | - """Return xarray object where variables are dask arrays""" |
69 | | - return self.read_chunked() |
| 20 | + discover = read |
70 | 21 |
|
71 | | - def close(self): |
72 | | - """Delete open file from memory""" |
73 | | - self._ds = None |
74 | | - self._schema = None |
| 22 | + read_chunked = to_dask |
0 commit comments