Skip to content

Commit 680ead9

Browse files
authored
Implementing dataset semver (#1082)
Implementing dataset semver
1 parent 3d48a91 commit 680ead9

34 files changed

+653
-629
lines changed

docs/examples.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ dialog-rating@v2
167167
By default, when a saved dataset is loaded, the latest version is fetched but another version can be requested:
168168

169169
```python
170-
ds = dc.read_dataset("dialog-rating", version=1)
170+
ds = dc.read_dataset("dialog-rating", version="1.0.0")
171171
```
172172

173173
### Chain execution, optimization and parallelism

src/datachain/catalog/catalog.py

Lines changed: 20 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from datachain.client import Client
3434
from datachain.dataset import (
3535
DATASET_PREFIX,
36+
DEFAULT_DATASET_VERSION,
3637
QUERY_DATASET_PREFIX,
3738
DatasetDependency,
3839
DatasetListRecord,
@@ -154,9 +155,9 @@ def __init__(
154155
metastore: "AbstractMetastore",
155156
warehouse: "AbstractWarehouse",
156157
remote_ds_name: str,
157-
remote_ds_version: int,
158+
remote_ds_version: str,
158159
local_ds_name: str,
159-
local_ds_version: int,
160+
local_ds_version: str,
160161
schema: dict[str, Union[SQLType, type[SQLType]]],
161162
max_threads: int = PULL_DATASET_MAX_THREADS,
162163
progress_bar=None,
@@ -286,7 +287,7 @@ class NodeGroup:
286287
# (not including the bucket name or s3:// prefix)
287288
source_path: str = ""
288289
dataset_name: Optional[str] = None
289-
dataset_version: Optional[int] = None
290+
dataset_version: Optional[str] = None
290291
instantiated_nodes: Optional[list[NodeWithPath]] = None
291292

292293
@property
@@ -607,7 +608,7 @@ def enlist_source(
607608
return lst, client, list_path
608609

609610
def _remove_dataset_rows_and_warehouse_info(
610-
self, dataset: DatasetRecord, version: int, **kwargs
611+
self, dataset: DatasetRecord, version: str, **kwargs
611612
):
612613
self.warehouse.drop_dataset_rows_table(dataset, version)
613614
self.update_dataset_version_with_warehouse_info(
@@ -767,7 +768,7 @@ def _row_to_node(d: dict[str, Any]) -> Node:
767768
def create_dataset(
768769
self,
769770
name: str,
770-
version: Optional[int] = None,
771+
version: Optional[str] = None,
771772
*,
772773
columns: Sequence[Column],
773774
feature_schema: Optional[dict] = None,
@@ -783,18 +784,17 @@ def create_dataset(
783784
Creates new dataset of a specific version.
784785
If dataset is not yet created, it will create it with version 1
785786
If version is None, then next unused version is created.
786-
If version is given, then it must be an unused version number.
787+
If version is given, then it must be an unused version.
787788
"""
788789
assert [c.name for c in columns if c.name != "sys__id"], f"got {columns=}"
789790
if not listing and Client.is_data_source_uri(name):
790791
raise RuntimeError(
791792
"Cannot create dataset that starts with source prefix, e.g s3://"
792793
)
793-
default_version = 1
794+
default_version = DEFAULT_DATASET_VERSION
794795
try:
795796
dataset = self.get_dataset(name)
796-
default_version = dataset.next_version
797-
797+
default_version = dataset.next_version_patch
798798
if (description or attrs) and (
799799
dataset.description != description or dataset.attrs != attrs
800800
):
@@ -846,7 +846,7 @@ def create_dataset(
846846
def create_new_dataset_version(
847847
self,
848848
dataset: DatasetRecord,
849-
version: int,
849+
version: str,
850850
*,
851851
columns: Sequence[Column],
852852
sources="",
@@ -892,7 +892,7 @@ def create_new_dataset_version(
892892
return dataset
893893

894894
def update_dataset_version_with_warehouse_info(
895-
self, dataset: DatasetRecord, version: int, rows_dropped=False, **kwargs
895+
self, dataset: DatasetRecord, version: str, rows_dropped=False, **kwargs
896896
) -> None:
897897
from datachain.query.dataset import DatasetQuery
898898

@@ -959,7 +959,7 @@ def update_dataset(
959959
return dataset
960960

961961
def remove_dataset_version(
962-
self, dataset: DatasetRecord, version: int, drop_rows: Optional[bool] = True
962+
self, dataset: DatasetRecord, version: str, drop_rows: Optional[bool] = True
963963
) -> None:
964964
"""
965965
Deletes one single dataset version.
@@ -1037,82 +1037,11 @@ def create_dataset_from_sources(
10371037

10381038
return self.get_dataset(name)
10391039

1040-
def register_dataset(
1041-
self,
1042-
dataset: DatasetRecord,
1043-
version: int,
1044-
target_dataset: DatasetRecord,
1045-
target_version: Optional[int] = None,
1046-
) -> DatasetRecord:
1047-
"""
1048-
Registers dataset version of one dataset as dataset version of another
1049-
one (it can be new version of existing one).
1050-
It also removes original dataset version
1051-
"""
1052-
target_version = target_version or target_dataset.next_version
1053-
1054-
if not target_dataset.is_valid_next_version(target_version):
1055-
raise DatasetInvalidVersionError(
1056-
f"Version {target_version} must be higher than the current latest one"
1057-
)
1058-
1059-
dataset_version = dataset.get_version(version)
1060-
if not dataset_version:
1061-
raise DatasetVersionNotFoundError(
1062-
f"Dataset {dataset.name} does not have version {version}"
1063-
)
1064-
1065-
if not dataset_version.is_final_status():
1066-
raise ValueError("Cannot register dataset version in non final status")
1067-
1068-
# copy dataset version
1069-
target_dataset = self.metastore.create_dataset_version(
1070-
target_dataset,
1071-
target_version,
1072-
sources=dataset_version.sources,
1073-
status=dataset_version.status,
1074-
query_script=dataset_version.query_script,
1075-
error_message=dataset_version.error_message,
1076-
error_stack=dataset_version.error_stack,
1077-
script_output=dataset_version.script_output,
1078-
created_at=dataset_version.created_at,
1079-
finished_at=dataset_version.finished_at,
1080-
schema=dataset_version.serialized_schema,
1081-
num_objects=dataset_version.num_objects,
1082-
size=dataset_version.size,
1083-
preview=dataset_version.preview,
1084-
job_id=dataset_version.job_id,
1085-
)
1086-
1087-
# to avoid re-creating rows table, we are just renaming it for a new version
1088-
# of target dataset
1089-
self.warehouse.rename_dataset_table(
1090-
dataset.name,
1091-
target_dataset.name,
1092-
old_version=version,
1093-
new_version=target_version,
1094-
)
1095-
self.metastore.update_dataset_dependency_source(
1096-
dataset,
1097-
version,
1098-
new_source_dataset=target_dataset,
1099-
new_source_dataset_version=target_version,
1100-
)
1101-
1102-
if dataset.id == target_dataset.id:
1103-
# we are updating the same dataset so we need to refresh it to have newly
1104-
# added version in step before
1105-
dataset = self.get_dataset(dataset.name)
1106-
1107-
self.remove_dataset_version(dataset, version, drop_rows=False)
1108-
1109-
return self.get_dataset(target_dataset.name)
1110-
11111040
def get_dataset(self, name: str) -> DatasetRecord:
11121041
return self.metastore.get_dataset(name)
11131042

11141043
def get_dataset_with_remote_fallback(
1115-
self, name: str, version: Optional[int] = None
1044+
self, name: str, version: Optional[str] = None
11161045
) -> DatasetRecord:
11171046
try:
11181047
ds = self.get_dataset(name)
@@ -1157,7 +1086,7 @@ def get_remote_dataset(self, name: str) -> DatasetRecord:
11571086
return DatasetRecord.from_dict(dataset_info)
11581087

11591088
def get_dataset_dependencies(
1160-
self, name: str, version: int, indirect=False
1089+
self, name: str, version: str, indirect=False
11611090
) -> list[Optional[DatasetDependency]]:
11621091
dataset = self.get_dataset(name)
11631092

@@ -1175,7 +1104,7 @@ def get_dataset_dependencies(
11751104
if d.is_dataset:
11761105
# only datasets can have dependencies
11771106
d.dependencies = self.get_dataset_dependencies(
1178-
d.name, int(d.version), indirect=indirect
1107+
d.name, d.version, indirect=indirect
11791108
)
11801109

11811110
return direct_dependencies
@@ -1244,7 +1173,7 @@ def listings(self):
12441173
]
12451174

12461175
def ls_dataset_rows(
1247-
self, name: str, version: int, offset=None, limit=None
1176+
self, name: str, version: str, offset=None, limit=None
12481177
) -> list[dict]:
12491178
from datachain.query.dataset import DatasetQuery
12501179

@@ -1282,7 +1211,7 @@ def export_dataset_table(
12821211
self,
12831212
bucket_uri: str,
12841213
name: str,
1285-
version: int,
1214+
version: str,
12861215
client_config=None,
12871216
) -> list[str]:
12881217
dataset = self.get_dataset(name)
@@ -1291,14 +1220,14 @@ def export_dataset_table(
12911220
bucket_uri, dataset, version, client_config
12921221
)
12931222

1294-
def dataset_table_export_file_names(self, name: str, version: int) -> list[str]:
1223+
def dataset_table_export_file_names(self, name: str, version: str) -> list[str]:
12951224
dataset = self.get_dataset(name)
12961225
return self.warehouse.dataset_table_export_file_names(dataset, version)
12971226

12981227
def remove_dataset(
12991228
self,
13001229
name: str,
1301-
version: Optional[int] = None,
1230+
version: Optional[str] = None,
13021231
force: Optional[bool] = False,
13031232
studio: Optional[bool] = False,
13041233
):
@@ -1372,7 +1301,7 @@ def pull_dataset( # noqa: C901, PLR0915
13721301
remote_ds_uri: str,
13731302
output: Optional[str] = None,
13741303
local_ds_name: Optional[str] = None,
1375-
local_ds_version: Optional[int] = None,
1304+
local_ds_version: Optional[str] = None,
13761305
cp: bool = False,
13771306
force: bool = False,
13781307
*,

src/datachain/cli/commands/datasets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def _datasets_tabulate_row(name, both, local_version, studio_version):
127127
def rm_dataset(
128128
catalog: "Catalog",
129129
name: str,
130-
version: Optional[int] = None,
130+
version: Optional[str] = None,
131131
force: Optional[bool] = False,
132132
studio: bool = False,
133133
local: bool = False,

src/datachain/cli/commands/show.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
def show(
1111
catalog: "Catalog",
1212
name: str,
13-
version: Optional[int] = None,
13+
version: Optional[str] = None,
1414
limit: int = 10,
1515
offset: int = 0,
1616
columns: Sequence[str] = (),

src/datachain/cli/parser/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
302302
"--version",
303303
action="store",
304304
default=None,
305-
type=int,
305+
type=str,
306306
help="Dataset version",
307307
)
308308
rm_dataset_parser.add_argument(
@@ -495,7 +495,7 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
495495
"--version",
496496
action="store",
497497
default=None,
498-
type=int,
498+
type=str,
499499
help="Dataset version",
500500
)
501501
show_parser.add_argument("--schema", action="store_true", help="Show schema")

0 commit comments

Comments
 (0)