Skip to content

Commit 8b22c20

Browse files
authored
Rewrite for intake2 (#154)
* Rewrite for intake2
1 parent 5365b08 commit 8b22c20

File tree

7 files changed

+98
-184
lines changed

7 files changed

+98
-184
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,10 @@ jobs:
3030
- uses: actions/checkout@v4
3131

3232
- name: set up conda environment
33-
uses: mamba-org/setup-micromamba@v1
33+
uses: conda-incubator/setup-miniconda@v3
3434
with:
3535
environment-file: ci/environment.yml
36-
init-shell: >-
37-
bash
38-
cache-environment: true
39-
cache-downloads: true
40-
post-cleanup: "all"
41-
create-args: |
42-
python=${{ matrix.python-version }}
36+
python-version: ${{ matrix.python-version }}
4337

4438
- name: Install intake-thredds
4539
run: |

.github/workflows/nightly.yaml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,10 @@ jobs:
2222
- uses: actions/checkout@v4
2323

2424
- name: set up conda environment
25-
uses: mamba-org/setup-micromamba@v1
25+
uses: conda-incubator/setup-miniconda@v3
2626
with:
2727
environment-file: ci/environment-upstream-dev.yml
28-
init-shell: >-
29-
bash
30-
cache-environment: true
31-
cache-downloads: true
32-
post-cleanup: "all"
33-
create-args: |
34-
python=3.12
28+
python-version: 3.12
3529

3630
- name: Install intake-thredds
3731
run: |

intake_thredds/cat.py

Lines changed: 45 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,45 @@
1-
from intake.catalog import Catalog
2-
from intake.catalog.local import LocalCatalogEntry
3-
4-
5-
class ThreddsCatalog(Catalog):
6-
"""Intake catalog interface to a thredds catalog.
7-
8-
Parameters
9-
----------
10-
url : str
11-
Location of thredds catalog.
12-
driver : str
13-
Select driver to access data. Choose from 'netcdf' and 'opendap'.
14-
intake_xarray_kwargs : dict
15-
Keyword arguments to pass to intake_xarray DataSource.
16-
**kwargs :
17-
Additional keyword arguments are passed through to the
18-
:py:class:`~intake.catalog.Catalog` base class.
19-
20-
Examples
21-
--------
22-
>>> import intake
23-
>>> cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
24-
>>> cat = intake.open_thredds_cat(cat_url)
25-
"""
26-
27-
name = 'thredds_cat'
28-
29-
def __init__(self, url: str, driver: str = 'opendap', intake_xarray_kwargs=None, **kwargs):
30-
self.url = url
31-
self.driver = driver
32-
self.intake_xarray_kwargs = intake_xarray_kwargs or {'chunks': {}}
33-
super().__init__(**kwargs)
34-
35-
def _load(self):
36-
from siphon.catalog import TDSCatalog
37-
38-
if 'simplecache::' in self.url:
39-
if self.driver == 'netcdf':
40-
self.cache = True
41-
self.url_no_simplecache = self.url.replace('simplecache::', '')
42-
self.metadata.update({'fsspec_pre_url': 'simplecache::'})
43-
else:
44-
raise ValueError(
45-
f'simplecache requires driver="netcdf", found driver="{self.driver}".'
46-
)
47-
else:
48-
self.cache = False
49-
self.url_no_simplecache = self.url
50-
51-
self.cat = TDSCatalog(self.url_no_simplecache)
52-
if self.name is None:
53-
self.name = self.cat.catalog_name
54-
self.metadata.update(self.cat.metadata)
55-
56-
# sub-cats
57-
self._entries = {
58-
r.title: LocalCatalogEntry(
59-
r.title,
60-
'THREDDS cat',
61-
'thredds_cat',
62-
True,
63-
{'url': r.href},
64-
[],
65-
[],
66-
self.metadata,
67-
None,
68-
catalog=self,
69-
)
70-
for r in self.cat.catalog_refs.values()
71-
}
72-
73-
def access_urls(ds, self):
74-
if self.driver == 'opendap':
75-
driver_for_access_urls = 'OPENDAP'
76-
elif self.driver == 'netcdf':
77-
driver_for_access_urls = 'HTTPServer'
78-
url = ds.access_urls[driver_for_access_urls]
79-
if 'fsspec_pre_url' in self.metadata.keys():
80-
url = f'{self.metadata["fsspec_pre_url"]}{url}'
81-
return url
82-
83-
def _update_args(ds):
84-
args = self.intake_xarray_kwargs.copy()
85-
args.update({'urlpath': access_urls(ds, self)})
86-
return args
87-
88-
self._entries.update(
89-
{
90-
ds.name: LocalCatalogEntry(
91-
ds.name,
92-
'THREDDS data',
93-
self.driver,
94-
True,
95-
_update_args(ds),
96-
[],
97-
[],
98-
{},
99-
None,
100-
catalog=self,
101-
)
102-
for ds in self.cat.datasets.values()
103-
}
104-
)
1+
from intake.readers import Service
2+
from intake.readers.catalogs import THREDDSCatalogReader
3+
4+
5+
class ThreddsCatalog:
6+
"""Intake catalog interface to a thredds catalog."""
7+
8+
def __new__(cls, url: str, driver: str = 'opendap', intake_xarray_kwargs=None, metadata=None):
9+
"""
10+
Parameters
11+
----------
12+
url : str
13+
Location of thredds catalog.
14+
driver : str
15+
Select driver to access data. Choose from 'netcdf' and 'opendap'.
16+
intake_xarray_kwargs : dict
17+
Keyword arguments to pass to intake_xarray DataSource.
18+
**kwargs :
19+
Additional keyword arguments are passed through to the
20+
:py:class:`~intake.catalog.Catalog` base class.
21+
22+
Examples
23+
--------
24+
>>> import intake
25+
>>> cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
26+
>>> cat = intake.open_thredds_cat(cat_url)
27+
"""
28+
29+
simplecache = url.startswith('simplecache:')
30+
if simplecache and driver == 'opendap':
31+
raise ValueError('simplecache requires driver="netcdf"')
32+
url = url.removeprefix('simplecache::')
33+
data = Service(url)
34+
reader = THREDDSCatalogReader(data, make=driver[-3:])
35+
cat = reader.read()
36+
if metadata:
37+
cat.metadata.update(metadata)
38+
if simplecache:
39+
for d in cat.data.values():
40+
d.kwargs['url'] = 'simplecache::' + d.kwargs['url']
41+
if intake_xarray_kwargs:
42+
intake_xarray_kwargs.update(intake_xarray_kwargs.pop('xarray_kwargs', {}))
43+
for d in cat.entries.values():
44+
d.kwargs.update(intake_xarray_kwargs)
45+
return cat

intake_thredds/source.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import fnmatch
22

3-
from intake_xarray.base import DataSourceMixin
4-
from tqdm.auto import tqdm
5-
63
from .cat import ThreddsCatalog
74

85

9-
class THREDDSMergedSource(DataSourceMixin):
6+
class THREDDSMergedSource:
107
"""Merges multiple datasets into a single datasets.
118
129
This source takes a THREDDS URL and a path to descend down, and calls the
@@ -50,21 +47,17 @@ class THREDDSMergedSource(DataSourceMixin):
5047
5148
"""
5249

53-
version = '1.0'
54-
container = 'xarray'
55-
name = 'thredds_merged'
56-
partition_access = True
57-
5850
def __init__(
5951
self,
6052
url,
6153
path,
6254
driver='opendap',
63-
xarray_kwargs={},
55+
xarray_kwargs=None,
6456
concat_kwargs=None,
6557
metadata=None,
6658
):
67-
super().__init__(metadata=metadata)
59+
xarray_kwargs = xarray_kwargs or {}
60+
self.metadata = metadata or {}
6861
self.urlpath = url
6962
if 'simplecache::' in url:
7063
self.metadata.update({'fsspec_pre_url': 'simplecache::'})
@@ -78,33 +71,34 @@ def __init__(
7871
self.driver = driver
7972
self.xarray_kwargs = xarray_kwargs
8073
self.concat_kwargs = concat_kwargs
81-
self._ds = None
8274

83-
def _open_dataset(self):
75+
def read(self, xarray_kwargs=None):
8476
import xarray as xr
77+
from tqdm import tqdm
8578

86-
if self._ds is None:
87-
cat = ThreddsCatalog(self.urlpath, driver=self.driver)
88-
for i in range(len(self.path)):
89-
part = self.path[i]
90-
if '*' not in part and '?' not in part:
91-
cat = cat[part](driver=self.driver)
92-
else:
93-
break
94-
path = self.path[i:]
95-
data = [
96-
ds(xarray_kwargs=self.xarray_kwargs).to_dask()
97-
for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)
98-
]
99-
if self.concat_kwargs:
100-
self._ds = xr.concat(data, **self.concat_kwargs)
79+
cat = ThreddsCatalog(self.urlpath, driver=self.driver, metadata=self.metadata)
80+
for i in range(len(self.path)):
81+
part = self.path[i]
82+
if '*' not in part and '?' not in part:
83+
cat = cat[part].read(make=self.driver[-3:])
10184
else:
102-
self._ds = xr.combine_by_coords(data, combine_attrs='override')
85+
break
86+
path = self.path[i:]
87+
data = [
88+
ds(**self.xarray_kwargs).read()
89+
for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)
90+
]
91+
if self.concat_kwargs:
92+
return xr.concat(data, **self.concat_kwargs)
93+
else:
94+
return xr.combine_by_coords(data, combine_attrs='override')
95+
96+
to_dask = read
10397

10498

10599
def _match(cat, patterns):
106100
out = []
107-
for name in cat:
101+
for name in cat.entries:
108102
if fnmatch.fnmatch(name, patterns[0]):
109103
if len(patterns) == 1:
110104
out.append(cat[name](chunks={}))

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
aiohttp >=3.7
22
fsspec >=0.8.5
33
h5netcdf >=0.8.1
4-
intake-xarray >=0.3
5-
intake>=0.6.6,<2
4+
intake-xarray
5+
intake>2
66
pydap
77
siphon
88
tqdm

tests/test_cat.py

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import intake
2-
import intake_xarray
32
import pytest
43
import xarray as xr
54

@@ -13,9 +12,8 @@ def thredds_cat_url():
1312
def test_ThreddsCatalog_init_catalog(thredds_cat_url):
1413
"""Test initialization of ThreddsCatalog."""
1514
cat = intake.open_thredds_cat(thredds_cat_url)
16-
assert isinstance(cat, intake.catalog.Catalog)
17-
assert cat.metadata == cat.cat.metadata
18-
assert cat.discover()['container'] == 'catalog'
15+
assert isinstance(cat, intake.Catalog)
16+
assert cat.metadata
1917

2018
assert 'err.mnmean.v3.nc' in cat
2119

@@ -28,42 +26,32 @@ def test_ThreddsCatalog(thredds_cat_url, driver):
2826
"""Test entry.to_dask() is xr.Dataset and allows opendap and netcdf as source."""
2927
cat = intake.open_thredds_cat(thredds_cat_url, driver=driver)
3028
entry = cat['sst.mon.19712000.ltm.v3.nc']
31-
if driver == 'opendap':
32-
assert isinstance(entry, intake_xarray.opendap.OpenDapSource)
33-
elif driver == 'netcdf':
34-
assert isinstance(entry, intake_xarray.netcdf.NetCDFSource)
35-
d = entry.describe()
36-
assert d['name'] == 'sst.mon.19712000.ltm.v3.nc'
37-
assert d['container'] == 'xarray'
38-
assert d['plugin'] == [driver]
29+
assert isinstance(entry, intake.readers.XArrayDatasetReader)
3930
if driver == 'opendap':
4031
loc = 'dodsC'
4132
elif driver == 'netcdf':
4233
loc = 'fileServer'
4334
assert (
44-
d['args']['urlpath']
35+
entry.data.url
4536
== f'https://psl.noaa.gov/thredds/{loc}/Datasets/noaa.ersst/sst.mon.19712000.ltm.v3.nc'
4637
)
47-
ds = entry(chunks={}).to_dask()
38+
ds = entry.read()
4839
assert isinstance(ds, xr.Dataset)
4940

5041

51-
def test_ThreddsCatalog_simplecache_netcdf(thredds_cat_url):
42+
def test_ThreddsCatalog_simplecache_netcdf(thredds_cat_url, tmpdir):
5243
"""Test that ThreddsCatalog allows simplecache:: in url if netcdf as source."""
5344
import os
5445

5546
import fsspec
5647

57-
fsspec.config.conf['simplecache'] = {'cache_storage': 'my_caching_folder', 'same_names': True}
48+
fsspec.config.conf['simplecache'] = {'cache_storage': str(tmpdir), 'same_names': True}
5849
cat = intake.open_thredds_cat(f'simplecache::{thredds_cat_url}', driver='netcdf')
5950
entry = cat['sst.mon.19712000.ltm.v3.nc']
60-
ds = entry(chunks={}).to_dask()
51+
ds = entry().read()
6152
assert isinstance(ds, xr.Dataset)
62-
# test files present
63-
cached_file = 'my_caching_folder/sst.mon.19712000.ltm.v3.nc'
53+
cached_file = f'{tmpdir}/sst.mon.19712000.ltm.v3.nc'
6454
assert os.path.exists(cached_file)
65-
os.remove(cached_file)
66-
assert not os.path.exists(cached_file)
6755

6856

6957
def test_ThreddsCatalog_simplecache_fails_opendap(thredds_cat_url):
@@ -83,5 +71,5 @@ def test_ThreddsCatalog_intake_xarray_kwargs():
8371
},
8472
)
8573
entry = cat['sst.mon.19712000.ltm.v3.nc']
86-
ds = entry(chunks={}).to_dask()
74+
ds = entry(chunks={}).read()
8775
assert isinstance(ds, xr.Dataset)

0 commit comments

Comments
 (0)