Skip to content

Commit 52c56c4

Browse files
authored
Listing refactoring and optimizations (#1114)
1 parent b7d1ca0 commit 52c56c4

File tree

9 files changed

+56
-50
lines changed

9 files changed

+56
-50
lines changed

src/datachain/catalog/catalog.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
)
6767
from datachain.dataset import DatasetListVersion
6868
from datachain.job import Job
69+
from datachain.lib.listing_info import ListingInfo
6970
from datachain.listing import Listing
7071

7172
logger = logging.getLogger("datachain")
@@ -1116,13 +1117,16 @@ def get_dataset_dependencies(
11161117
return direct_dependencies
11171118

11181119
def ls_datasets(
1119-
self, include_listing: bool = False, studio: bool = False
1120+
self,
1121+
prefix: Optional[str] = None,
1122+
include_listing: bool = False,
1123+
studio: bool = False,
11201124
) -> Iterator[DatasetListRecord]:
11211125
from datachain.remote.studio import StudioClient
11221126

11231127
if studio:
11241128
client = StudioClient()
1125-
response = client.ls_datasets()
1129+
response = client.ls_datasets(prefix=prefix)
11261130
if not response.ok:
11271131
raise DataChainError(response.message)
11281132
if not response.data:
@@ -1133,6 +1137,8 @@ def ls_datasets(
11331137
for d in response.data
11341138
if not d.get("name", "").startswith(QUERY_DATASET_PREFIX)
11351139
)
1140+
elif prefix:
1141+
datasets = self.metastore.list_datasets_by_prefix(prefix)
11361142
else:
11371143
datasets = self.metastore.list_datasets()
11381144

@@ -1142,39 +1148,55 @@ def ls_datasets(
11421148

11431149
def list_datasets_versions(
11441150
self,
1151+
prefix: Optional[str] = None,
11451152
include_listing: bool = False,
1153+
with_job: bool = True,
11461154
studio: bool = False,
11471155
) -> Iterator[tuple[DatasetListRecord, "DatasetListVersion", Optional["Job"]]]:
11481156
"""Iterate over all dataset versions with related jobs."""
11491157
datasets = list(
1150-
self.ls_datasets(include_listing=include_listing, studio=studio)
1158+
self.ls_datasets(
1159+
prefix=prefix, include_listing=include_listing, studio=studio
1160+
)
11511161
)
11521162

11531163
# preselect dataset versions jobs from db to avoid multiple queries
1154-
jobs_ids: set[str] = {
1155-
v.job_id for ds in datasets for v in ds.versions if v.job_id
1156-
}
11571164
jobs: dict[str, Job] = {}
1158-
if jobs_ids:
1159-
jobs = {j.id: j for j in self.metastore.list_jobs_by_ids(list(jobs_ids))}
1165+
if with_job:
1166+
jobs_ids: set[str] = {
1167+
v.job_id for ds in datasets for v in ds.versions if v.job_id
1168+
}
1169+
if jobs_ids:
1170+
jobs = {
1171+
j.id: j for j in self.metastore.list_jobs_by_ids(list(jobs_ids))
1172+
}
11601173

11611174
for d in datasets:
11621175
yield from (
1163-
(d, v, jobs.get(str(v.job_id)) if v.job_id else None)
1176+
(d, v, jobs.get(str(v.job_id)) if with_job and v.job_id else None)
11641177
for v in d.versions
11651178
)
11661179

1167-
def listings(self):
1180+
def listings(self, prefix: Optional[str] = None) -> list["ListingInfo"]:
11681181
"""
11691182
Returns list of ListingInfo objects which are representing specific
11701183
storage listing datasets
11711184
"""
1172-
from datachain.lib.listing import is_listing_dataset
1185+
from datachain.lib.listing import LISTING_PREFIX, is_listing_dataset
11731186
from datachain.lib.listing_info import ListingInfo
11741187

1188+
if prefix and not prefix.startswith(LISTING_PREFIX):
1189+
prefix = LISTING_PREFIX + prefix
1190+
1191+
listing_datasets_versions = self.list_datasets_versions(
1192+
prefix=prefix,
1193+
include_listing=True,
1194+
with_job=False,
1195+
)
1196+
11751197
return [
11761198
ListingInfo.from_models(d, v, j)
1177-
for d, v, j in self.list_datasets_versions(include_listing=True)
1199+
for d, v, j in listing_datasets_versions
11781200
if is_listing_dataset(d.name)
11791201
]
11801202

src/datachain/dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def dataset_name(self) -> str:
9393
if self.type == DatasetDependencyType.DATASET:
9494
return self.name
9595

96-
list_dataset_name, _, _ = parse_listing_uri(self.name.strip("/"), {})
96+
list_dataset_name, _, _ = parse_listing_uri(self.name.strip("/"))
9797
assert list_dataset_name
9898
return list_dataset_name
9999

src/datachain/lib/listing.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,10 @@ def _file_c(name: str) -> Column:
107107
return dc.filter(pathfunc.parent(_file_c("path")) == path.lstrip("/").rstrip("/*"))
108108

109109

110-
def parse_listing_uri(uri: str, client_config) -> tuple[str, str, str]:
110+
def parse_listing_uri(uri: str) -> tuple[str, str, str]:
111111
"""
112112
Parsing uri and returns listing dataset name, listing uri and listing path
113113
"""
114-
client_config = client_config or {}
115114
storage_uri, path = Client.parse_url(uri)
116115
if uses_glob(path):
117116
lst_uri_path = posixpath.dirname(path)
@@ -175,7 +174,7 @@ def get_listing(
175174
_, path = Client.parse_url(uri)
176175
return None, uri, path, False
177176

178-
ds_name, list_uri, list_path = parse_listing_uri(uri, client_config)
177+
ds_name, list_uri, list_path = parse_listing_uri(uri)
179178
listing = None
180179
listings = [
181180
ls for ls in catalog.listings() if not ls.is_expired and ls.contains(ds_name)

src/datachain/remote/studio.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,10 @@ def ls(self, paths: Iterable[str]) -> Iterator[tuple[str, Response[LsData]]]:
282282
response = self._send_request_msgpack("datachain/ls", {"source": path})
283283
yield path, response
284284

285-
def ls_datasets(self) -> Response[LsData]:
286-
return self._send_request("datachain/datasets", {}, method="GET")
285+
def ls_datasets(self, prefix: Optional[str] = None) -> Response[LsData]:
286+
return self._send_request(
287+
"datachain/datasets", {"prefix": prefix}, method="GET"
288+
)
287289

288290
def edit_dataset(
289291
self,

tests/func/test_catalog.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616

1717
def listing_stats(uri, catalog):
18-
list_dataset_name, _, _ = parse_listing_uri(uri, catalog.client_config)
18+
list_dataset_name, _, _ = parse_listing_uri(uri)
1919
dataset = catalog.get_dataset(list_dataset_name)
2020
dataset_version = dataset.get_version(dataset.latest_version)
2121
return dataset_version.num_objects, dataset_version.size

tests/func/test_datachain.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,11 @@ def test_read_storage_reindex(tmp_dir, test_session):
114114

115115

116116
def test_read_storage_reindex_expired(tmp_dir, test_session):
117-
catalog = test_session.catalog
118117
tmp_dir = tmp_dir / "parquets"
119118
os.mkdir(tmp_dir)
120119
uri = tmp_dir.as_uri()
121120

122-
lst_ds_name = parse_listing_uri(uri, catalog.client_config)[0]
121+
lst_ds_name = parse_listing_uri(uri)[0]
123122

124123
pd.DataFrame({"name": ["Alice", "Bob"]}).to_parquet(tmp_dir / "test1.parquet")
125124
assert dc.read_storage(uri, session=test_session).count() == 1
@@ -144,10 +143,9 @@ def test_read_storage_partials(cloud_test_catalog):
144143
ctc = cloud_test_catalog
145144
src_uri = ctc.src_uri
146145
session = ctc.session
147-
catalog = session.catalog
148146

149147
def _list_dataset_name(uri: str) -> str:
150-
name = parse_listing_uri(uri, catalog.client_config)[0]
148+
name = parse_listing_uri(uri)[0]
151149
assert name
152150
return name
153151

@@ -188,10 +186,9 @@ def test_read_storage_partials_with_update(cloud_test_catalog):
188186
ctc = cloud_test_catalog
189187
src_uri = ctc.src_uri
190188
session = ctc.session
191-
catalog = session.catalog
192189

193190
def _list_dataset_name(uri: str) -> str:
194-
name = parse_listing_uri(uri, catalog.client_config)[0]
191+
name = parse_listing_uri(uri)[0]
195192
assert name
196193
return name
197194

@@ -222,15 +219,15 @@ def test_read_storage_listing_happens_once(cloud_test_catalog, cloud_type):
222219
dc_dogs = chain.filter(dc.C("file.path").glob("dogs*"))
223220
dc_cats.union(dc_dogs).save(ds_name)
224221

225-
lst_ds_name = parse_listing_uri(uri, ctc.session.catalog.client_config)[0]
222+
lst_ds_name = parse_listing_uri(uri)[0]
226223
assert _get_listing_datasets(ctc.session) == [f"{lst_ds_name}@v1.0.0"]
227224

228225

229226
def test_read_storage_dependencies(cloud_test_catalog, cloud_type):
230227
ctc = cloud_test_catalog
231228
src_uri = ctc.src_uri
232229
uri = f"{src_uri}/cats"
233-
dep_name, _, _ = parse_listing_uri(uri, ctc.catalog.client_config)
230+
dep_name, _, _ = parse_listing_uri(uri)
234231
ds_name = "dep"
235232
dc.read_storage(uri, session=ctc.session).save(ds_name)
236233
dependencies = ctc.session.catalog.get_dataset_dependencies(ds_name, "1.0.0")
@@ -244,7 +241,7 @@ def test_persist_not_affects_dependencies(tmp_dir, test_session):
244241
(tmp_dir / f"file{i}.txt").write_text(f"file{i}")
245242

246243
uri = tmp_dir.as_uri()
247-
dep_name, _, _ = parse_listing_uri(uri, test_session.catalog.client_config)
244+
dep_name, _, _ = parse_listing_uri(uri)
248245
chain = dc.read_storage(uri, session=test_session) # .persist()
249246
# calling multiple persists to create temp datasets
250247
chain = chain.persist()

tests/func/test_dataset_query.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -965,9 +965,7 @@ def test_dataset_dependencies_one_storage_as_dependency(
965965
ds_name = uuid.uuid4().hex
966966
catalog = cloud_test_catalog.catalog
967967
listing = catalog.listings()[0]
968-
dep_name, _, _ = parse_listing_uri(
969-
cloud_test_catalog.src_uri, catalog.client_config
970-
)
968+
dep_name, _, _ = parse_listing_uri(cloud_test_catalog.src_uri)
971969

972970
DatasetQuery(cats_dataset.name, catalog=catalog).save(ds_name)
973971

@@ -996,9 +994,7 @@ def test_dataset_dependencies_one_registered_dataset_as_dependency(
996994
catalog = cloud_test_catalog.catalog
997995
listing = catalog.listings()[0]
998996

999-
dep_name, _, _ = parse_listing_uri(
1000-
cloud_test_catalog.src_uri, catalog.client_config
1001-
)
997+
dep_name, _, _ = parse_listing_uri(cloud_test_catalog.src_uri)
1002998

1003999
DatasetQuery(name=dogs_dataset.name, catalog=catalog).save(ds_name)
10041000

@@ -1044,9 +1040,7 @@ def test_dataset_dependencies_multiple_direct_dataset_dependencies(
10441040
ds_name = uuid.uuid4().hex
10451041
catalog = cloud_test_catalog.catalog
10461042
listing = catalog.listings()[0]
1047-
dep_name, _, _ = parse_listing_uri(
1048-
cloud_test_catalog.src_uri, catalog.client_config
1049-
)
1043+
dep_name, _, _ = parse_listing_uri(cloud_test_catalog.src_uri)
10501044

10511045
dogs = DatasetQuery(name=dogs_dataset.name, version="1.0.0", catalog=catalog)
10521046
cats = DatasetQuery(name=cats_dataset.name, version="1.0.0", catalog=catalog)
@@ -1116,9 +1110,7 @@ def test_dataset_dependencies_multiple_union(
11161110
ds_name = uuid.uuid4().hex
11171111
catalog = cloud_test_catalog.catalog
11181112
listing = catalog.listings()[0]
1119-
dep_name, _, _ = parse_listing_uri(
1120-
cloud_test_catalog.src_uri, catalog.client_config
1121-
)
1113+
dep_name, _, _ = parse_listing_uri(cloud_test_catalog.src_uri)
11221114

11231115
dogs = DatasetQuery(name=dogs_dataset.name, version="1.0.0", catalog=catalog)
11241116
cats = DatasetQuery(name=cats_dataset.name, version="1.0.0", catalog=catalog)

tests/func/test_datasets.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,12 +666,12 @@ def test_dataset_storage_dependencies(cloud_test_catalog, cloud_type, indirect):
666666
session = ctc.session
667667
catalog = session.catalog
668668
uri = cloud_test_catalog.src_uri
669-
dep_name, _, _ = parse_listing_uri(ctc.src_uri, catalog.client_config)
669+
dep_name, _, _ = parse_listing_uri(ctc.src_uri)
670670

671671
ds_name = "some_ds"
672672
dc.read_storage(uri, session=session).save(ds_name)
673673

674-
lst_ds_name, _, _ = parse_listing_uri(uri, catalog.client_config)
674+
lst_ds_name, _, _ = parse_listing_uri(uri)
675675
lst_dataset = catalog.metastore.get_dataset(lst_ds_name)
676676

677677
assert [

tests/func/test_listing.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ def test_listing_generator(cloud_test_catalog, cloud_type):
3636
)
3737
def test_parse_listing_uri(cloud_test_catalog, cloud_type):
3838
ctc = cloud_test_catalog
39-
catalog = ctc.catalog
40-
dataset_name, listing_uri, listing_path = parse_listing_uri(
41-
f"{ctc.src_uri}/dogs", catalog.client_config
42-
)
39+
dataset_name, listing_uri, listing_path = parse_listing_uri(f"{ctc.src_uri}/dogs")
4340
assert dataset_name == f"lst__{ctc.src_uri}/dogs/"
4441
assert listing_uri == f"{ctc.src_uri}/dogs/"
4542
if cloud_type == "file":
@@ -55,10 +52,7 @@ def test_parse_listing_uri(cloud_test_catalog, cloud_type):
5552
)
5653
def test_parse_listing_uri_with_glob(cloud_test_catalog):
5754
ctc = cloud_test_catalog
58-
catalog = ctc.catalog
59-
dataset_name, listing_uri, listing_path = parse_listing_uri(
60-
f"{ctc.src_uri}/dogs/*", catalog.client_config
61-
)
55+
dataset_name, listing_uri, listing_path = parse_listing_uri(f"{ctc.src_uri}/dogs/*")
6256
assert dataset_name == f"lst__{ctc.src_uri}/dogs/"
6357
assert listing_uri == f"{ctc.src_uri}/dogs"
6458
assert listing_path == "dogs/*"

0 commit comments

Comments
 (0)