Skip to content

Commit 0a9c23a

Browse files
authored
fix(delta): handle missing dependencies (#1419)
1 parent 5ce8c3e commit 0a9c23a

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

src/datachain/delta.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ def _get_source_info(
200200
indirect=False,
201201
)
202202

203-
source_ds_dep = next((d for d in dependencies if d.name == source_ds.name), None)
203+
source_ds_dep = next(
204+
(d for d in dependencies if d and d.name == source_ds.name), None
205+
)
204206
if not source_ds_dep:
205207
# Starting dataset was removed, back off to normal dataset creation
206208
return None, None, None, None, None

tests/func/test_delta.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,70 @@ def create_delta_dataset(ds_name):
9898
create_delta_dataset(ds_name)
9999

100100

101+
def test_delta_falls_back_when_dependency_missing(test_session):
102+
catalog = test_session.catalog
103+
104+
source_ds = "delta_removed_dep_source"
105+
delta_ds = "delta_removed_dep_result"
106+
process_log: list[int] = []
107+
108+
def record_processing(id: int) -> int:
109+
process_log.append(id)
110+
return id
111+
112+
# Create first source dataset and initial delta version that depends on it
113+
dc.read_values(id=[1, 2], session=test_session).save(source_ds)
114+
dc.read_dataset(
115+
source_ds,
116+
session=test_session,
117+
delta=True,
118+
delta_on="id",
119+
).map(processed_id=record_processing).save(delta_ds)
120+
121+
assert _get_dependencies(catalog, delta_ds, "1.0.0") == [(source_ds, "1.0.0")]
122+
assert set(
123+
dc.read_dataset(delta_ds, version="1.0.0", session=test_session).to_values("id")
124+
) == {1, 2}
125+
assert sorted(process_log[:2]) == [1, 2]
126+
127+
dc.read_values(id=[1, 2, 10, 20, 30], session=test_session).save(source_ds)
128+
129+
# Drop the previous version so it is clear the dependency targets 1.0.1
130+
dc.delete_dataset(source_ds, version="1.0.0", session=test_session)
131+
132+
with pytest.raises(DatasetNotFoundError):
133+
dc.read_dataset(source_ds, session=test_session, version="1.0.0")
134+
135+
deps_after_removal = catalog.get_dataset_dependencies(
136+
delta_ds,
137+
"1.0.0",
138+
namespace_name=catalog.metastore.default_project.namespace.name,
139+
project_name=catalog.metastore.default_project.name,
140+
indirect=False,
141+
)
142+
assert deps_after_removal == [None]
143+
144+
dc.read_dataset(
145+
source_ds,
146+
session=test_session,
147+
delta=True,
148+
delta_on="id",
149+
).map(processed_id=record_processing).save(delta_ds)
150+
151+
# Delta logic should fall back to rebuilding from scratch with the new dependency
152+
assert _get_dependencies(catalog, delta_ds, "1.0.1") == [(source_ds, "1.0.1")]
153+
assert set(
154+
dc.read_dataset(delta_ds, version="1.0.1", session=test_session).to_values("id")
155+
) == {1, 2, 10, 20, 30}
156+
# Previous version remains intact and still reflects the original source dataset
157+
assert set(
158+
dc.read_dataset(delta_ds, version="1.0.0", session=test_session).to_values("id")
159+
) == {1, 2}
160+
# Fallback rebuilds the dataset, so ids 1 and 2 appear twice across both runs.
161+
assert sorted(process_log[:2]) == [1, 2]
162+
assert sorted(process_log[2:]) == [1, 2, 10, 20, 30]
163+
164+
101165
def test_delta_returns_correct_dataset_on_no_changes(test_session):
102166
catalog = test_session.catalog
103167

0 commit comments

Comments
 (0)