Skip to content

Commit 98d6d73

Browse files
authored
Merge pull request #10 from elementary-data/table_filters
Table filters
2 parents bc4f87f + e122f55 commit 98d6d73

File tree

3 files changed

+164
-3
lines changed

3 files changed

+164
-3
lines changed

lineage/lineage_graph.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import networkx as nx
55
import sqlparse
6+
from lineage.exceptions import ConfigError
67
from sqllineage.core import LineageAnalyzer, LineageResult
78
from sqllineage.exceptions import SQLLineageException
89
from pyvis.network import Network
@@ -16,7 +17,10 @@
1617
GRAPH_VISUALIZATION_OPTIONS = """{
1718
"edges": {
1819
"color": {
19-
"inherit": true
20+
"color": "rgba(23,107,215,1)",
21+
"highlight": "rgba(23,107,215,1)",
22+
"hover": "rgba(23,107,215,1)",
23+
"inherit": false
2024
},
2125
"dashes": true,
2226
"smooth": {
@@ -38,8 +42,12 @@
3842
}
3943
},
4044
"interaction": {
45+
"hover": true,
4146
"navigationButtons": true,
42-
"multiselect": true
47+
"multiselect": true,
48+
"keyboard": {
49+
"enabled": true
50+
}
4351
},
4452
"physics": {
4553
"enabled": false,
@@ -53,6 +61,12 @@
5361

5462

5563
class LineageGraph(object):
64+
UPSTREAM_DIRECTION = 'upstream'
65+
DOWNSTREAM_DIRECTION = 'downstream'
66+
BOTH_DIRECTIONS = 'both'
67+
SELECTED_NODE_COLOR = '#0925C7'
68+
SELECTED_NODE_TITLE = 'Selected table<br/>'
69+
5670
def __init__(self, profile_database_name: str, profile_schema_name: str = None, show_isolated_nodes: bool = False,
5771
full_table_names: bool = False) -> None:
5872
self._lineage_graph = nx.DiGraph()
@@ -189,6 +203,48 @@ def init_graph_from_query_list(self, queries: [tuple]) -> None:
189203

190204
logger.debug(f'Finished updating lineage graph!')
191205

206+
def filter_on_table(self, selected_table: str, direction: str = None, depth: int = None) -> None:
207+
logger.debug(f'Filtering lineage graph on table - {selected_table}')
208+
resolved_selected_table_name = self._name_qualification(Table(selected_table), self._profile_database_name,
209+
self._profile_schema_name)
210+
logger.debug(f'Qualified table name - {resolved_selected_table_name}')
211+
if resolved_selected_table_name is None:
212+
raise ConfigError(f'Could not resolve table name - {selected_table}, please make sure to '
213+
f'specify a table name that exists in the database configured in your profiles file.')
214+
215+
if direction == self.DOWNSTREAM_DIRECTION:
216+
self._lineage_graph = self._downstream_graph(resolved_selected_table_name, depth)
217+
elif direction == self.UPSTREAM_DIRECTION:
218+
self._lineage_graph = self._upstream_graph(resolved_selected_table_name, depth)
219+
elif direction == self.BOTH_DIRECTIONS:
220+
downstream_graph = self._downstream_graph(resolved_selected_table_name, depth)
221+
upstream_graph = self._upstream_graph(resolved_selected_table_name, depth)
222+
self._lineage_graph = nx.compose(upstream_graph, downstream_graph)
223+
else:
224+
raise ConfigError(f'Direction must be one of the following - {self.UPSTREAM_DIRECTION}|'
225+
f'{self.DOWNSTREAM_DIRECTION}|{self.BOTH_DIRECTIONS}, '
226+
f'Got - {direction} instead.')
227+
228+
self._update_selected_node_attributes(resolved_selected_table_name)
229+
logger.debug(f'Finished filtering lineage graph on table - {selected_table}')
230+
pass
231+
232+
def _downstream_graph(self, source_node: str, depth: Optional[int]) -> nx.DiGraph:
233+
logger.debug(f'Building a downstream graph for - {source_node}, depth - {depth}')
234+
return nx.bfs_tree(G=self._lineage_graph, source=source_node, depth_limit=depth)
235+
236+
def _upstream_graph(self, target_node: str, depth: Optional[int]) -> nx.DiGraph:
237+
logger.debug(f'Building an upstream graph for - {target_node}, depth - {depth}')
238+
reversed_lineage_graph = self._lineage_graph.reverse(copy=True)
239+
return nx.bfs_tree(G=reversed_lineage_graph, source=target_node, depth_limit=depth).reverse(copy=False)
240+
241+
def _update_selected_node_attributes(self, selected_node: str) -> None:
242+
if self._lineage_graph.has_node(selected_node):
243+
node = self._lineage_graph.nodes[selected_node]
244+
node_title = node.get('title', '')
245+
node.update({'color': self.SELECTED_NODE_COLOR,
246+
'title': self.SELECTED_NODE_TITLE + node_title})
247+
192248
def draw_graph(self, should_open_browser: bool = True) -> None:
193249
# Visualize the graph
194250
net = Network(height="100%", width="100%", directed=True)

lineage/main.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,20 +94,51 @@ def handle_parse_result(self, ctx, opts, args):
9494
default=False,
9595
help="Indicates if the lineage should filter only on tables in the configured profile's schema."
9696
)
97+
@click.option(
98+
'--table', '-t',
99+
type=str,
100+
help="Filter on a table to see upstream and downstream dependencies of this table (see also direction param)."
101+
" Table name format could be a full name like <db_name>.<schema_name>.<table_name>, a partial name like "
102+
"<schema_name>.<table_name> or only a table name <table_name>. If the database name wasn't part of the name "
103+
"the profiles database name will be used, if the schema name wasn't part of the name the profiles schema name "
104+
"will be used.",
105+
default=None
106+
)
107+
@click.option('--direction',
108+
type=click.Choice([LineageGraph.UPSTREAM_DIRECTION, LineageGraph.DOWNSTREAM_DIRECTION,
109+
LineageGraph.BOTH_DIRECTIONS]),
110+
help="Sets direction of dependencies when filtering on a specific table (default is both, "
111+
"meaning showing both upstream and downstream dependencies of this table).",
112+
default='both',
113+
cls=RequiredIf,
114+
required_if='table')
115+
@click.option('--depth',
116+
type=int,
117+
help="Sets how many levels of dependencies to show when filtering on a specific table "
118+
"(default is showing all levels of dependencies).",
119+
default=None,
120+
cls=RequiredIf,
121+
required_if='table')
97122
def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_name: str, open_browser: bool,
98-
export_query_history: bool, full_table_names: bool, ignore_schema: bool) -> None:
123+
export_query_history: bool, full_table_names: bool, ignore_schema: bool, table: str, direction: str,
124+
depth: int) -> None:
99125
"""
100126
For more details check out our documentation here - https://docs.elementary-data.com/
101127
"""
102128
click.echo(f"Any feedback and suggestions are welcomed! join our community here - "
103129
f"https://bit.ly/slack-elementary\n")
130+
104131
query_history = QueryHistoryFactory(profiles_dir, profile_name, export_query_history).create_query_history()
105132
queries = query_history.extract_queries(start_date, end_date)
133+
106134
lineage_graph = LineageGraph(show_isolated_nodes=False,
107135
profile_database_name=query_history.get_database_name(),
108136
profile_schema_name=query_history.get_schema_name() if not ignore_schema else None,
109137
full_table_names=full_table_names)
110138
lineage_graph.init_graph_from_query_list(queries)
139+
if table is not None:
140+
lineage_graph.filter_on_table(table, direction, depth)
141+
111142
lineage_graph.draw_graph(should_open_browser=open_browser)
112143

113144

tests/test_lineage_graph.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,77 @@ def test_lineage_graph_name_qualification(resolved_table_name, show_full_table_n
200200
reference = LineageGraph(profile_database_name='elementary_db', profile_schema_name='elementary_sc',
201201
full_table_names=show_full_table_names)
202202
assert reference._name_qualification(Table(resolved_table_name), '', '') == expected_result
203+
204+
205+
def compare_edges(directed_graph: nx.DiGraph, edges: [set]) -> bool:
206+
for edge in directed_graph.edges:
207+
if (edge[0], edge[1]) not in edges:
208+
return False
209+
return True
210+
211+
212+
def create_directed_graph_from_edge_list(edges: [list]) -> nx.DiGraph:
213+
G = nx.DiGraph()
214+
for edge in edges:
215+
G.add_node(edge[0])
216+
G.add_node(edge[1])
217+
G.add_edge(edge[0], edge[1])
218+
return G
219+
220+
221+
@pytest.mark.parametrize("edges, selected_node, depth, expected_remaining_edges", [
222+
(nx.path_graph(5).edges, 3, None, {(3, 4)}),
223+
(nx.path_graph(5).edges, 3, 1, {(3, 4)}),
224+
(nx.path_graph(7).edges, 3, 1, {(3, 4)}),
225+
([(0, 1), (1, 2), (0, 3), (3, 4), (3, 2), (2, 5), (4, 6)], 3, None, {(3, 4), (3, 2), (2, 5), (4, 6)}),
226+
([(0, 1), (1, 2), (0, 3), (3, 4), (3, 2), (2, 5), (4, 6)], 3, 1, {(3, 4), (3, 2)})
227+
])
228+
def test_lineage_graph_downstream_graph(edges, selected_node, depth, expected_remaining_edges):
229+
reference = LineageGraph(profile_database_name='elementary_db')
230+
reference._lineage_graph.add_edges_from(edges)
231+
reference._lineage_graph = reference._downstream_graph(selected_node, depth)
232+
assert compare_edges(reference._lineage_graph, expected_remaining_edges)
233+
234+
235+
@pytest.mark.parametrize("edges, selected_node, depth, expected_remaining_edges", [
236+
(nx.path_graph(5).edges, 3, None, {(0, 1), (1, 2), (2, 3)}),
237+
(nx.path_graph(5).edges, 3, 1, {(2, 3)}),
238+
(nx.path_graph(7).edges, 3, 1, {(2, 3)}),
239+
([(0, 1), (1, 2), (2, 3), (0, 3), (3, 4), (2, 5), (4, 6)], 3, None, {(0, 3), (2, 3), (1, 2), (0, 1)}),
240+
([(0, 1), (1, 2), (2, 3), (0, 3), (3, 4), (2, 5), (4, 6)], 3, 1, {(0, 3), (2, 3)}),
241+
])
242+
def test_lineage_graph_upstream_graph(edges, selected_node, depth, expected_remaining_edges):
243+
reference = LineageGraph(profile_database_name='elementary_db')
244+
reference._lineage_graph.add_edges_from(edges)
245+
reference._lineage_graph = reference._upstream_graph(selected_node, depth)
246+
assert compare_edges(reference._lineage_graph, expected_remaining_edges)
247+
248+
249+
@pytest.mark.parametrize("profile_database_name, profile_schema_name, full_table_names, edges, selected_node, "
250+
"direction, depth, expected_remaining_edges", [
251+
('db', 'sc', True, [('db.sc.t1', 'db.sc.t2'), ('db.sc.t1', 'db.sc.t3')],
252+
't3', 'upstream', None,
253+
{('db.sc.t1', 'db.sc.t3')}),
254+
('db', 'sc', True, [('db.sc.t1', 'db.sc.t2'), ('db.sc.t1', 'db.sc.t3')],
255+
'db.sc.t3', 'upstream', None,
256+
{('db.sc.t1', 'db.sc.t3')}),
257+
('db', 'sc', True, [('db.sc.t1', 'db.sc.t2'), ('db.sc.t1', 'db.sc.t3'),
258+
('db.sc.t3', 'db.sc.t4')],
259+
'sc.t1', 'downstream', 1,
260+
{('db.sc.t1', 'db.sc.t3'), ('db.sc.t1', 'db.sc.t2')}),
261+
('db', 'sc', False, [('t1', 't2'), ('t1', 't3')],
262+
't3', 'upstream', None,
263+
{('t1', 't3')}),
264+
('db', 'sc', False, [('t1', 't2'), ('t1', 't3'), ('t3', 't4'), ('t4', 't5'), ('t2', 't4'),
265+
('t4', 't6'), ('t6', 't8')],
266+
't4', 'both', 1,
267+
{('t3', 't4'), ('t4', 't5'), ('t2', 't4'), ('t4', 't6')}),
268+
])
269+
def test_lineage_graph_filter_on_table(profile_database_name, profile_schema_name, full_table_names, edges,
270+
selected_node, direction, depth, expected_remaining_edges):
271+
reference = LineageGraph(profile_database_name=profile_database_name, profile_schema_name=profile_schema_name,
272+
full_table_names=full_table_names)
273+
reference._lineage_graph = create_directed_graph_from_edge_list(edges)
274+
reference.filter_on_table(selected_node, direction, depth)
275+
assert compare_edges(reference._lineage_graph, expected_remaining_edges)
276+

0 commit comments

Comments
 (0)