Skip to content

Commit 7b263ff

Browse files
authored
Refactor view sequencing and return sequenced views if recursion is found (#2499)
## Changes Refactor view sequencing and return sequenced views if recursion is found ### Linked issues Resolves #2494 ### Functionality - [x] modified existing workflow: `table-migration` ### Tests - [x] updated unit tests
1 parent 97d3abc commit 7b263ff

File tree

5 files changed

+260
-285
lines changed

5 files changed

+260
-285
lines changed

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def _migrate_tables(self, what: What, mounts: list[Mount], hiveserde_in_place_mi
101101
def _migrate_views(self):
102102
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
103103
all_tasks = []
104-
sequencer = ViewsMigrationSequencer(tables_to_migrate, self.index_full_refresh())
104+
sequencer = ViewsMigrationSequencer(tables_to_migrate, migration_index=self.index_full_refresh())
105105
batches = sequencer.sequence_batches()
106106
for batch in batches:
107107
tasks = []

src/databricks/labs/ucx/hive_metastore/view_migrate.py

Lines changed: 90 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -83,86 +83,104 @@ def __eq__(self, other):
8383

8484
class ViewsMigrationSequencer:
8585

86-
def __init__(self, tables: Collection[TableToMigrate], index: MigrationIndex):
87-
self._tables = tables
88-
self._index = index
89-
self._result_view_list: list[ViewToMigrate] = []
90-
self._result_tables_set: set[TableView] = set()
86+
def __init__(self, tables_to_migrate: Collection[TableToMigrate], *, migration_index: MigrationIndex | None = None):
87+
self._tables = tables_to_migrate # Also contains views to migrate
88+
self._index = migration_index or MigrationIndex([])
89+
90+
@cached_property
91+
def _views(self) -> dict[ViewToMigrate, TableView]:
92+
# Views is a mapping as the TableView is required when resolving dependencies
93+
views = {}
94+
for table_or_view in self._tables:
95+
if table_or_view.src.view_text is None:
96+
continue
97+
view_to_migrate = ViewToMigrate(table_or_view.src, table_or_view.rule)
98+
# All views to migrate are stored in the hive_metastore
99+
views[view_to_migrate] = TableView("hive_metastore", view_to_migrate.src.database, view_to_migrate.src.name)
100+
return views
101+
102+
def _get_view_to_migrate(self, key: str) -> ViewToMigrate | None:
103+
"""Get a view to migrate by key"""
104+
for view in self._views:
105+
if view.src.key == key:
106+
return view
107+
return None
91108

92109
def sequence_batches(self) -> list[list[ViewToMigrate]]:
93-
# sequencing is achieved using a very simple algorithm:
94-
# for each view, we register dependencies (extracted from view_text)
95-
# then given the remaining set of views to process,
96-
# and the growing set of views already processed
97-
# we check if each remaining view refers to not yet processed views
98-
# if none, then it's safe to add that view to the next batch of views
99-
# the complexity for a given set of views v and a dependency depth d looks like Ov^d
100-
# this seems enormous but in practice d remains small and v decreases rapidly
101-
all_tables: dict[str, TableToMigrate] = {}
102-
views = set()
103-
for table in self._tables:
104-
if table.src.view_text is not None:
105-
table = ViewToMigrate(table.src, table.rule)
106-
all_tables[table.src.key] = table
107-
if isinstance(table, ViewToMigrate):
108-
views.add(table)
109-
# when migrating views we want them in batches
110+
"""Sequence the views in batches to migrate them in the right order.
111+
112+
Batch sequencing uses the following algorithm:
113+
0. For each view, we register dependencies (extracted from view_text),
114+
1. Then to create a new batch of views,
115+
We require the dependencies that are covered already:
116+
1. The migrated tables
117+
2. The (growing) set of views from already sequenced previous batches
118+
For each remaining view, we check if all its dependencies are covered for. If that is the case, then we
119+
add that view to the new batch of views.
120+
2. We repeat point from point 1. until all views are sequenced.
121+
122+
The complexity for a given set of views v and a dependency depth d looks like Ov^d, this seems enormous but in
123+
practice d remains small and v decreases rapidly
124+
"""
110125
batches: list[list[ViewToMigrate]] = []
111-
while len(views) > 0:
112-
next_batch = self._next_batch(views)
113-
self._result_view_list.extend(next_batch)
114-
table_views = {TableView("hive_metastore", t.src.database, t.src.name) for t in next_batch}
115-
self._result_tables_set.update(table_views)
116-
views.difference_update(next_batch)
117-
batches.append(list(next_batch))
126+
views_to_migrate = set(self._views.keys())
127+
views_sequenced: set[TableView] = set()
128+
while len(views_to_migrate) > 0:
129+
try:
130+
next_batch = self._next_batch(views_to_migrate, views_from_previous_batches=views_sequenced)
131+
except RecursionError as e:
132+
logger.error(
133+
f"Cannot sequence views {views_to_migrate} given migration index {self._index}", exc_info=e
134+
)
135+
# By returning the current batches, we can migrate the views that are not causing the recursion
136+
return batches
137+
for view in next_batch:
138+
views_sequenced.add(self._views[view])
139+
batches.append(next_batch)
140+
views_to_migrate.difference_update(next_batch)
118141
return batches
119142

120-
def _next_batch(self, views: set[ViewToMigrate]) -> set[ViewToMigrate]:
143+
def _next_batch(
144+
self, views: set[ViewToMigrate], *, views_from_previous_batches: set[TableView] | None
145+
) -> list[ViewToMigrate]:
146+
"""For sequencing algorithm see docstring of :meth:sequence_batches.
147+
148+
Raises:
149+
RecursionError :
150+
If an infinite loop is detected.
151+
"""
152+
views_from_previous_batches = views_from_previous_batches or set()
121153
# we can't (slightly) optimize by checking len(views) == 0 or 1,
122154
# because we'd lose the opportunity to check the SQL
123-
result: set[ViewToMigrate] = set()
155+
result = []
124156
for view in views:
125-
view_deps = set(view.dependencies)
126-
self._check_circular_dependency(view, views)
127-
if len(view_deps) == 0:
128-
result.add(view)
129-
else:
130-
# does the view have at least one view dependency that is not yet processed ?
131-
not_processed_yet = view_deps - self._result_tables_set
132-
if len(not_processed_yet) == 0:
133-
result.add(view)
134-
continue
135-
if not [
136-
table_view
137-
for table_view in not_processed_yet
138-
if not self._index.is_migrated(table_view.schema, table_view.name)
139-
]:
140-
result.add(view)
141-
# prevent infinite loop
142-
if len(result) == 0 and len(views) > 0:
143-
raise ValueError(f"Invalid table references are preventing migration: {views}")
157+
self._check_circular_dependency(view)
158+
if len(view.dependencies) == 0:
159+
result.append(view)
160+
continue
161+
# If all dependencies are already processed, we can add the view to the next batch
162+
not_processed_yet = set(view.dependencies) - views_from_previous_batches
163+
if len(not_processed_yet) == 0:
164+
result.append(view)
165+
continue
166+
if all(self._index.is_migrated(table_view.schema, table_view.name) for table_view in not_processed_yet):
167+
result.append(view)
168+
if len(result) == 0 and len(views) > 0: # prevent infinite loop
169+
raise RecursionError(f"Unresolved dependencies prevent batch sequencing: {views}")
144170
return result
145171

146-
def _check_circular_dependency(self, initial_view, views):
147-
queue = []
148-
queue.extend(dep for dep in initial_view.dependencies)
149-
while queue:
150-
current_view = self._get_view_instance(queue.pop(0).key, views)
151-
if not current_view:
172+
def _check_circular_dependency(self, view: ViewToMigrate) -> None:
173+
"""Check for circular dependencies in the views to migrate.
174+
175+
Raises:
176+
RecursionError :
177+
If a circular dependency is detected between views.
178+
"""
179+
dependency_keys = [dep.key for dep in view.dependencies]
180+
while len(dependency_keys) > 0:
181+
dependency = self._get_view_to_migrate(dependency_keys.pop(0))
182+
if not dependency: # Only views (to migrate) can cause a circular dependency, tables can be ignored
152183
continue
153-
if current_view == initial_view:
154-
raise ValueError(
155-
f"Circular dependency detected between {initial_view.src.name} and {current_view.src.name} "
156-
)
157-
queue.extend(dep for dep in current_view.dependencies)
158-
159-
def _get_view_instance(self, key: str, views: set[ViewToMigrate]) -> ViewToMigrate | None:
160-
# This method acts as a mapper between TableView and ViewToMigrate. We check if the key passed matches with
161-
# any of the views in the list of views. This means the circular dependency will be identified only
162-
# if the dependencies are present in the list of views passed to _next_batch() or the _result_view_list
163-
# ToDo: see if a mapper between TableView and ViewToMigrate can be implemented
164-
all_views = list(views) + self._result_view_list
165-
for view in all_views:
166-
if view.src.key == key:
167-
return view
168-
return None
184+
if dependency == view:
185+
raise RecursionError(f"Circular dependency detected starting from: {view.src.full_name}")
186+
dependency_keys.extend(dep.key for dep in dependency.dependencies)

tests/unit/hive_metastore/tables/tables_and_views.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@
8888
"db": "db1",
8989
"table": "v14",
9090
"view_text": "select * from db1.v12"
91+
},
92+
{
93+
"db": "db1",
94+
"table": "v15",
95+
"view_text": "select * from db1.v1, db1.t1"
9196
}
92-
93-
9497
]

tests/unit/hive_metastore/test_view_migrate.py

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
1-
from databricks.labs.ucx.hive_metastore.mapping import Rule
1+
import itertools
2+
import json
3+
import logging
4+
from pathlib import Path
5+
from typing import TypeVar
6+
7+
import pytest
8+
9+
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableToMigrate
210
from databricks.labs.ucx.hive_metastore.migration_status import MigrationIndex, MigrationStatus
311
from databricks.labs.ucx.hive_metastore.tables import Table
4-
from databricks.labs.ucx.hive_metastore.view_migrate import ViewToMigrate
12+
from databricks.labs.ucx.hive_metastore.view_migrate import ViewsMigrationSequencer, ViewToMigrate
513

614

7-
def test_view_to_migrate_sql_migrate_view():
15+
def test_view_to_migrate_sql_migrate_view_sql():
816
expected_query = "CREATE OR REPLACE VIEW IF NOT EXISTS `cat1`.`schema1`.`dest_view1` AS SELECT * FROM `cat1`.`schema1`.`dest_table1`"
917
view = Table(
1018
object_type="VIEW",
@@ -27,3 +35,156 @@ def test_view_to_migrate_sql_migrate_view():
2735
sql = view_to_migrate.sql_migrate_view(migration_index)
2836

2937
assert sql == expected_query
38+
39+
40+
@pytest.fixture(scope="session")
41+
def samples() -> dict[str, dict[str, str]]:
42+
path = Path(Path(__file__).parent, "tables", "tables_and_views.json")
43+
samples_with_key = {}
44+
with path.open(encoding="utf-8") as f:
45+
for sample in json.load(f):
46+
key = sample["db"] + "." + sample["table"]
47+
samples_with_key[key] = sample
48+
return samples_with_key
49+
50+
51+
@pytest.fixture
52+
def tables(request, samples) -> list[TableToMigrate]:
53+
tables_to_migrate = []
54+
rule = Rule("ws1", "cat1", "schema", "db1", "table1", "table2")
55+
for key in request.param:
56+
sample = samples[key]
57+
table = Table(
58+
"hive_metastore",
59+
sample["db"],
60+
sample["table"],
61+
"type",
62+
"DELTA" if sample.get("view_text") is None else "VIEW",
63+
view_text=sample.get("view_text"),
64+
)
65+
table_to_migrate = TableToMigrate(table, rule)
66+
tables_to_migrate.append(table_to_migrate)
67+
return tables_to_migrate
68+
69+
70+
@pytest.mark.parametrize("tables", [("db1.t1", "db2.t1")], indirect=True)
71+
def test_empty_sequence_without_views(tables):
72+
migration_index = MigrationIndex(
73+
[
74+
MigrationStatus("db1", "t1", "cat1", "db2", "t1"),
75+
MigrationStatus("db2", "t2", "cat1", "db2", "t1"),
76+
]
77+
)
78+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
79+
batches = sequencer.sequence_batches()
80+
81+
assert len(batches) == 0
82+
83+
84+
T = TypeVar("T")
85+
86+
87+
def flatten(lists: list[list[T]]) -> list[T]:
88+
return list(itertools.chain.from_iterable(lists))
89+
90+
91+
@pytest.mark.parametrize("tables", [("db1.t1", "db1.v1")], indirect=True)
92+
def test_sequence_direct_view(tables) -> None:
93+
expected = ["hive_metastore.db1.v1"]
94+
migration_index = MigrationIndex([MigrationStatus("db1", "t1", "cat1", "db1", "t1")])
95+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
96+
97+
batches = sequencer.sequence_batches()
98+
99+
assert [t.src.key for t in flatten(batches)] == expected
100+
101+
102+
@pytest.mark.parametrize("tables", [("db1.t1", "db1.v1", "db1.t2", "db1.v2")], indirect=True)
103+
def test_sequence_direct_views(tables) -> None:
104+
expected = ["hive_metastore.db1.v1", "hive_metastore.db1.v2"]
105+
migration_index = MigrationIndex(
106+
[MigrationStatus("db1", "t1", "cat1", "db1", "t1"), MigrationStatus("db1", "t2", "cat1", "db1", "t2")]
107+
)
108+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
109+
110+
batches = sequencer.sequence_batches()
111+
112+
assert len(batches) == 1
113+
# Sort because the order of the views is not guaranteed as they both depend on just tables
114+
assert sorted([t.src.key for t in flatten(batches)]) == expected
115+
116+
117+
@pytest.mark.parametrize("tables", [("db1.t1", "db1.v1", "db1.v4")], indirect=True)
118+
def test_sequence_indirect_views(tables) -> None:
119+
expected = ["hive_metastore.db1.v1", "hive_metastore.db1.v4"]
120+
migration_index = MigrationIndex([MigrationStatus("db1", "t1", "cat1", "db1", "t1")])
121+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
122+
123+
batches = sequencer.sequence_batches()
124+
125+
assert len(batches) == 2
126+
assert [t.src.key for t in flatten(batches)] == expected
127+
128+
129+
@pytest.mark.parametrize("tables", [("db1.t1", "db1.v1", "db1.v4", "db1.v5", "db1.v6", "db1.v7")], indirect=True)
130+
def test_sequence_deep_indirect_views(tables) -> None:
131+
expected = [
132+
"hive_metastore.db1.v1",
133+
"hive_metastore.db1.v4",
134+
"hive_metastore.db1.v7",
135+
"hive_metastore.db1.v6",
136+
"hive_metastore.db1.v5",
137+
]
138+
migration_index = MigrationIndex([MigrationStatus("db1", "t1", "cat1", "db1", "t1")])
139+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
140+
141+
batches = sequencer.sequence_batches()
142+
143+
assert len(batches) == 5
144+
assert [t.src.key for t in flatten(batches)] == expected
145+
146+
147+
@pytest.mark.parametrize("tables", [("db1.v1", "db1.v15")], indirect=True)
148+
def test_sequence_view_with_view_and_table_dependency(tables) -> None:
149+
expected = ["hive_metastore.db1.v1", "hive_metastore.db1.v15"]
150+
migration_index = MigrationIndex([MigrationStatus("db1", "t1", "cat1", "db1", "t1")])
151+
sequencer = ViewsMigrationSequencer(tables, migration_index=migration_index)
152+
153+
batches = sequencer.sequence_batches()
154+
155+
assert len(batches) == 2
156+
assert [t.src.key for t in flatten(batches)] == expected
157+
158+
159+
@pytest.mark.parametrize("tables", [("db1.v8",)], indirect=True)
160+
def test_sequence_view_with_invalid_query_raises_value_error(tables) -> None:
161+
sequencer = ViewsMigrationSequencer(tables)
162+
163+
with pytest.raises(ValueError) as error:
164+
sequencer.sequence_batches()
165+
assert "Could not analyze view SQL:" in str(error)
166+
167+
168+
@pytest.mark.parametrize("tables", [("db1.v9",)], indirect=True)
169+
def test_sequencing_logs_unresolved_dependencies(caplog, tables) -> None:
170+
sequencer = ViewsMigrationSequencer(tables)
171+
172+
with caplog.at_level(logging.ERROR, logger="databricks.labs.ucx.hive_metastore.view_migrate"):
173+
sequencer.sequence_batches()
174+
assert "Unresolved dependencies prevent batch sequencing:" in caplog.text
175+
176+
177+
@pytest.mark.parametrize(
178+
"tables",
179+
[
180+
("db1.v10", "db1.v11"),
181+
("db1.v12", "db1.v13", "db1.v14"),
182+
],
183+
indirect=True,
184+
)
185+
def test_sequencing_logs_circular_dependency(caplog, tables) -> None:
186+
sequencer = ViewsMigrationSequencer(tables)
187+
188+
with caplog.at_level(logging.ERROR, logger="databricks.labs.ucx.hive_metastore.view_migrate"):
189+
sequencer.sequence_batches()
190+
assert "Circular dependency detected starting from:" in caplog.text

0 commit comments

Comments
 (0)