Skip to content

Commit 04bc6b8

Browse files
feat(gc): include STALE and REMOVING dataset versions in GC cleanup (#1621)
* feat(gc): include STALE dataset versions in GC cleanup Studio's lifecycle system (datachain-ai/studio#12515) marks excess dataset versions as STALE instead of deleting them directly. The datachain GC must then perform the actual warehouse and metastore deletion. Add DatasetStatus.STALE to get_incomplete_dataset_versions() so STALE versions with a finished or absent job are returned and removed by cleanup_failed_dataset_versions(). * Update metastore.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * feat(dataset): add REMOVING status for versions marked for deletion Introduce DatasetStatus.REMOVING so versions marked for deletion are distinct from STALE. Include REMOVING in get_incomplete_dataset_versions cleanup query and update tests accordingly. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 2ba2f6b commit 04bc6b8

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

src/datachain/data_storage/metastore.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,14 +329,17 @@ def get_incomplete_dataset_versions(
329329
self, job_id: str | None = None
330330
) -> list[tuple[DatasetRecord, str]]:
331331
"""
332-
Get incomplete dataset versions to clean up.
332+
Get incomplete, stale, or removed dataset versions to clean up.
333333
334334
When job_id is provided, returns versions belonging to that specific
335335
job (used during job failure cleanup).
336336
337-
When job_id is None, returns all incomplete dataset versions
338-
whose associated job is finished, plus versions with no job_id
339-
that are older than STALE_CREATED_THRESHOLD_HOURS (used by gc).
337+
When job_id is None, returns all versions that are safe to delete:
338+
- Status CREATED, FAILED, STALE where either:
339+
- the associated job has finished, or
340+
- there is no associated job (job_id is NULL) and the version is
341+
older than STALE_CREATED_THRESHOLD_HOURS
342+
- Status REMOVING: marked for deletion
340343
341344
Returns:
342345
List of (DatasetRecord, version_string) tuples. Each DatasetRecord
@@ -1654,7 +1657,14 @@ def get_incomplete_dataset_versions(
16541657
)
16551658
)
16561659
.where(
1657-
dv.c.status.in_([DatasetStatus.CREATED, DatasetStatus.FAILED]),
1660+
dv.c.status.in_(
1661+
[
1662+
DatasetStatus.CREATED,
1663+
DatasetStatus.FAILED,
1664+
DatasetStatus.STALE,
1665+
DatasetStatus.REMOVING,
1666+
]
1667+
),
16581668
or_(
16591669
# job is finished
16601670
j.c.status.in_(

src/datachain/dataset.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ class DatasetStatus:
247247
FAILED = 3
248248
COMPLETE = 4
249249
STALE = 6
250+
REMOVING = 7
250251

251252

252253
@dataclass
@@ -340,6 +341,7 @@ def is_final_status(self) -> bool:
340341
DatasetStatus.FAILED,
341342
DatasetStatus.COMPLETE,
342343
DatasetStatus.STALE,
344+
DatasetStatus.REMOVING,
343345
]
344346

345347
def update(self, **kwargs):

tests/unit/test_dataset_status_management.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,3 +300,42 @@ def test_public_api_move_dataset_rejects_non_complete(
300300
# Should raise error for FAILED dataset
301301
with pytest.raises(DatasetNotFoundError):
302302
move_dataset(dataset_failed.name, "new_name_failed", session=test_session)
303+
304+
305+
@pytest.fixture
306+
def dataset_marked_for_removal(test_session, job) -> DatasetRecord:
307+
ds = dc.read_values(value=["v1"], session=test_session).save(
308+
"ds_marked_for_removal"
309+
)
310+
dataset = ds.dataset
311+
assert dataset is not None
312+
dv = test_session.catalog.metastore._datasets_versions
313+
test_session.catalog.metastore.db.execute(
314+
dv.update()
315+
.where(dv.c.dataset_id == dataset.id)
316+
.values(status=DatasetStatus.REMOVING)
317+
)
318+
return test_session.catalog.get_dataset(dataset.name, include_incomplete=True)
319+
320+
321+
def test_get_incomplete_dataset_versions_includes_marked_for_removal(
322+
test_session, job, dataset_marked_for_removal
323+
):
324+
to_clean = test_session.catalog.metastore.get_incomplete_dataset_versions()
325+
assert dataset_marked_for_removal.name not in {ds.name for ds, _ in to_clean}
326+
327+
test_session.catalog.metastore.set_job_status(job.id, JobStatus.COMPLETE)
328+
to_clean = test_session.catalog.metastore.get_incomplete_dataset_versions()
329+
assert dataset_marked_for_removal.name in {ds.name for ds, _ in to_clean}
330+
331+
332+
def test_cleanup_failed_dataset_versions_removes_marked_for_removal(
333+
test_session, job, dataset_marked_for_removal
334+
):
335+
test_session.catalog.metastore.set_job_status(job.id, JobStatus.COMPLETE)
336+
337+
num_removed = test_session.catalog.cleanup_failed_dataset_versions()
338+
assert num_removed == 1
339+
340+
with pytest.raises(DatasetNotFoundError):
341+
test_session.catalog.get_dataset(dataset_marked_for_removal.name)

0 commit comments

Comments
 (0)