Skip to content

Commit ee0c287

Browse files
authored
Merge pull request #11 from elementary-data/enrich_graph_with_operational_context
Enrich graph with operational context
2 parents 62cf690 + e6e82a7 commit ee0c287

File tree

6 files changed

+105
-17
lines changed

6 files changed

+105
-17
lines changed

lineage/lineage_graph.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from sqllineage.exceptions import SQLLineageException
99
from pyvis.network import Network
1010
import webbrowser
11+
12+
from lineage.query_context import QueryContext
1113
from lineage.utils import get_logger
1214
from sqllineage.models import Schema, Table
1315
from tqdm import tqdm
@@ -118,7 +120,9 @@ def _name_qualification(self, table: Table, database_name: str, schema_name: str
118120

119121
return str(table).rsplit('.', 1)[-1]
120122

121-
def _update_lineage_graph(self, analyzed_statements: [LineageResult], database_name: str, schema_name: str) -> None:
123+
def _update_lineage_graph(self, analyzed_statements: [LineageResult], query_context: QueryContext) -> None:
124+
database_name = query_context.queried_database
125+
schema_name = query_context.queried_schema
122126
for analyzed_statement in analyzed_statements:
123127
# Handle drop tables, if they exist in the statement
124128
dropped_tables = analyzed_statement.drop
@@ -139,9 +143,9 @@ def _update_lineage_graph(self, analyzed_statements: [LineageResult], database_n
139143
targets = {self._name_qualification(target, database_name, schema_name)
140144
for target in analyzed_statement.write}
141145

142-
self._add_nodes_and_edges(sources, targets)
146+
self._add_nodes_and_edges(sources, targets, query_context)
143147

144-
def _add_nodes_and_edges(self, sources: {str}, targets: {str}) -> None:
148+
def _add_nodes_and_edges(self, sources: {str}, targets: {str}, query_context: QueryContext) -> None:
145149
if None in sources:
146150
sources.remove(None)
147151
if None in targets:
@@ -155,10 +159,10 @@ def _add_nodes_and_edges(self, sources: {str}, targets: {str}) -> None:
155159
self._lineage_graph.add_nodes_from(sources)
156160
elif len(targets) > 0 and len(sources) == 0:
157161
if self._show_isolated_nodes:
158-
self._lineage_graph.add_nodes_from(targets)
162+
self._lineage_graph.add_nodes_from(targets, title=query_context.to_html())
159163
else:
160164
self._lineage_graph.add_nodes_from(sources)
161-
self._lineage_graph.add_nodes_from(targets)
165+
self._lineage_graph.add_nodes_from(targets, title=query_context.to_html())
162166
for source, target in itertools.product(sources, targets):
163167
self._lineage_graph.add_edge(source, target)
164168

@@ -191,15 +195,15 @@ def _remove_node(self, node: str) -> None:
191195

192196
def init_graph_from_query_list(self, queries: [tuple]) -> None:
193197
logger.debug(f'Loading {len(queries)} queries into the lineage graph')
194-
for query, database_name, schema_name in tqdm(queries, desc="Updating lineage graph", colour='green'):
198+
for query, query_context in tqdm(queries, desc="Updating lineage graph", colour='green'):
195199
try:
196200
analyzed_statements = self._parse_query(query)
197201
except SQLLineageException as exc:
198202
logger.debug(f'SQLLineageException was raised while parsing this query -\n{query}\n'
199203
f'Error was -\n{exc}.')
200204
continue
201205

202-
self._update_lineage_graph(analyzed_statements, database_name, schema_name)
206+
self._update_lineage_graph(analyzed_statements, query_context)
203207

204208
logger.debug(f'Finished updating lineage graph!')
205209

lineage/query_context.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from datetime import datetime
2+
from typing import Optional, Union
3+
import dateutil.parser
4+
5+
6+
class QueryContext(object):
7+
def __init__(self, queried_database: Optional[str] = None, queried_schema: Optional[str] = None,
8+
query_time: Optional[datetime] = None, query_volume: Optional[int] = None,
9+
query_type: Optional[str] = None, user_name: Optional[str] = None,
10+
role_name: Optional[str] = None) -> None:
11+
self.queried_database = queried_database
12+
self.queried_schema = queried_schema
13+
self.query_time = query_time
14+
self.query_volume = query_volume
15+
self.query_type = query_type
16+
self.user_name = user_name
17+
self.role_name = role_name
18+
19+
def to_dict(self) -> dict:
20+
return {'queried_database': self.queried_database,
21+
'queried_schema': self.queried_schema,
22+
'query_time': self._query_time_to_str(self.query_time),
23+
'query_volume': self.query_volume,
24+
'query_type': self.query_type,
25+
'user_name': self.user_name,
26+
'role_name': self.role_name}
27+
28+
@staticmethod
29+
def _query_time_to_str(query_time: Optional[datetime], fmt: str = None) -> Optional[str]:
30+
if query_time is None:
31+
return None
32+
33+
if fmt is None:
34+
return query_time.isoformat()
35+
36+
return query_time.strftime(fmt)
37+
38+
@staticmethod
39+
def _html_param_with_default(param: Union[str, int], default: Union[str, int] = 'unknown') -> Union[str, int]:
40+
return default if param is None else param
41+
42+
def to_html(self) -> str:
43+
query_type = self._html_param_with_default(self.query_type)
44+
user_name = self._html_param_with_default(self.user_name)
45+
role_name = self._html_param_with_default(self.role_name)
46+
query_time = self._query_time_to_str(self.query_time, fmt='%Y-%m-%d %H:%M:%S')
47+
query_volume = self._html_param_with_default(self.query_volume, 0)
48+
volume_color = "DarkSlateGrey"
49+
if query_volume == 0:
50+
volume_color = "tomato"
51+
52+
return f"""
53+
<html>
54+
<body>
55+
<div style="font-family:arial;color:DarkSlateGrey;font-size:110%;">
56+
<strong>
57+
Last update</br>
58+
</strong>
59+
<div style="min-width:62px;display:inline-block">Type:</div> {query_type}</br>
60+
<div style="min-width:62px;display:inline-block">User:</div> {user_name}</br>
61+
<div style="min-width:62px;display:inline-block">Role:</div> {role_name}</br>
62+
<div style="min-width:62px;display:inline-block">Time:</div> {query_time}</br>
63+
<div style="min-width:62px;display:inline-block;">Volume:</div> <a style="color:{volume_color}">{query_volume} rows</a>
64+
</div>
65+
</body>
66+
</html>
67+
"""
68+
69+
@staticmethod
70+
def from_dict(query_context_dict: dict) -> 'QueryContext':
71+
if 'query_time' in query_context_dict and query_context_dict['query_time'] is not None:
72+
query_context_dict['query_time'] = dateutil.parser.parse(query_context_dict['query_time'])
73+
return QueryContext(**query_context_dict)

lineage/query_history.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import datetime, timedelta
22
from typing import Optional
3+
4+
from lineage.query_context import QueryContext
35
from lineage.utils import is_flight_mode_on
46
import json
57
import os
@@ -18,13 +20,18 @@ def __init__(self, con, should_export_query_history: bool = True) -> None:
1820
def _serialize_query_history(self, queries: [str]) -> None:
1921
if self.should_export_query_history:
2022
with open(self.LATEST_QUERY_HISTORY_FILE, 'w') as query_history_file:
21-
json.dump(queries, query_history_file)
23+
serialized_queries = []
24+
for query, query_context in queries:
25+
serialized_queries.append((query, query_context.to_dict()))
26+
json.dump(serialized_queries, query_history_file)
2227

2328
def _deserialize_query_history(self) -> [str]:
2429
queries = []
2530
if os.path.exists(self.LATEST_QUERY_HISTORY_FILE):
2631
with open(self.LATEST_QUERY_HISTORY_FILE, 'r') as query_history_file:
27-
queries = json.load(query_history_file)
32+
deserialized_queries = json.load(query_history_file)
33+
for query, query_context_dict in deserialized_queries:
34+
queries.append((query, QueryContext.from_dict(query_context_dict)))
2835
return queries
2936

3037
@staticmethod
@@ -34,7 +41,7 @@ def _include_end_date(end_date: datetime) -> Optional[datetime]:
3441

3542
return end_date
3643

37-
def extract_queries(self, start_date: datetime, end_date: datetime) -> [str]:
44+
def extract_queries(self, start_date: datetime, end_date: datetime) -> [tuple]:
3845
if is_flight_mode_on():
3946
queries = self._deserialize_query_history()
4047
else:

lineage/snowflake_query_history.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import datetime, date, timedelta
22

33
from lineage.exceptions import ConfigError
4+
from lineage.query_context import QueryContext
45
from lineage.query_history import QueryHistory
56
from lineage.utils import get_logger
67

@@ -14,7 +15,7 @@ class SnowflakeQueryHistory(QueryHistory):
1415
# the query was really executed on the configured db and filter it accordingly.
1516

1617
INFORMATION_SCHEMA_QUERY_HISTORY = """
17-
select query_text, database_name, schema_name
18+
select query_text, database_name, schema_name, end_time, rows_produced, query_type, user_name, role_name
1819
from table(information_schema.query_history(
1920
end_time_range_start=>to_timestamp_ltz(:2),
2021
{end_time_range_end_expr},
@@ -31,7 +32,8 @@ class SnowflakeQueryHistory(QueryHistory):
3132
QUERY_HISTORY_SOURCE_INFORMATION_SCHEMA = 'information_schema'
3233

3334
ACCOUNT_USAGE_QUERY_HISTORY = """
34-
select query_text, database_name, schema_name
35+
select query_text, database_name, schema_name, end_time, rows_inserted + rows_produced, query_type, user_name,
36+
role_name
3537
from snowflake.account_usage.query_history
3638
where end_time >= :2 and {end_time_range_end_expr}
3739
and execution_status = 'SUCCESS' and query_type not in
@@ -88,7 +90,7 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [tup
8890
logger.debug("Finished executing snowflake history query")
8991
rows = cursor.fetchall()
9092
for row in rows:
91-
queries.append((row[0], row[1], row[2]))
93+
queries.append((row[0], QueryContext(row[1], row[2], row[3], row[4], row[5], row[6], row[7])))
9294
logger.debug("Finished fetching snowflake history query results")
9395

9496
return queries

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
setup(
2121
name='elementary-lineage',
2222
description='Presenting data lineage based on your data warehouse query history',
23-
version='0.0.8',
23+
version='0.0.9',
2424
packages=find_packages(),
2525
include_package_data=True,
2626
python_requires='>=3.6.2',

tests/test_lineage_graph.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from sqllineage.models import Schema
66

77
from lineage.lineage_graph import LineageGraph
8+
from lineage.query_context import QueryContext
89

910

1011
def create_lineage_result(read, write):
@@ -74,13 +75,14 @@ def test_lineage_graph_add_nodes_and_edges(sources, targets, edges, show_isolate
7475
di_graph_mock = mock.create_autospec(nx.DiGraph)
7576
reference._lineage_graph = di_graph_mock
7677

77-
reference._add_nodes_and_edges(sources, targets)
78+
empty_query_context = QueryContext()
79+
reference._add_nodes_and_edges(sources, targets, empty_query_context)
7880

7981
node_calls = []
8082
if len(sources) > 0:
8183
node_calls.append(mock.call(sources))
8284
if len(targets) > 0:
83-
node_calls.append(mock.call(targets))
85+
node_calls.append(mock.call(targets, title=empty_query_context.to_html()))
8486

8587
edge_calls = []
8688
for edge in edges:
@@ -142,7 +144,7 @@ def test_lineage_graph_remove_node(show_isolated_nodes):
142144
def test_lineage_graph_init_graph_from_query_list_with_loops(queries, show_isolated_nodes):
143145
reference = LineageGraph(profile_database_name='elementary_db', show_isolated_nodes=show_isolated_nodes)
144146

145-
query_list = [(query, 'elementary_db', 'elementary_schema') for query in queries]
147+
query_list = [(query, QueryContext('elementary_db', 'elementary_schema')) for query in queries]
146148
try:
147149
reference.init_graph_from_query_list(query_list)
148150
except Exception as exc:

0 commit comments

Comments
 (0)