11"""
2- This module defines the workflow for the 'lineage' command,
3- which combines physical DB lineage (FKs) with logical
4- dbt lineage (refs/sources).
2+ This module defines the workflow for the 'lineage' command.
3+
4+ It combines physical database lineage (from foreign keys) with logical dbt
5+ project lineage (from refs and sources) to generate a single, comprehensive
6+ end-to-end data lineage graph.
57"""
68import typer
79from typing import List , Dict , Any , Set
1517
1618class GlobalLineageGenerator :
1719 """
18- Merges DB foreign keys and dbt dependencies into a single
19- Mermaid graph string.
20+ Builds a global lineage graph from multiple sources.
21+
22+ This class merges physical foreign key relationships from a database with
23+ logical dependencies from a dbt project (`ref` and `source` calls) into a
24+ single graph structure. It intelligently assigns and prioritizes styles to
25+ nodes to ensure, for example, that a dbt model is always styled as a model,
26+ even if it's also a plain database table.
2027 """
2128 def __init__ (self , db_fks : List [Dict [str , str ]], dbt_models : List [Dict [str , Any ]]):
29+ """
30+ Initializes the GlobalLineageGenerator.
31+
32+ Args:
33+ db_fks: A list of foreign key relationships from the database.
34+ dbt_models: A list of parsed dbt models, including their dependencies.
35+ """
2236 self .db_fks = db_fks
2337 self .dbt_models = dbt_models
24- self .nodes : Set [str ] = set ()
25- self .edges : List [str ] = []
38+
39+ # Stores nodes and their assigned style, e.g., {"stg_orders": "box"}
40+ self .nodes : Dict [str , str ] = {}
41+ # Stores unique edges to prevent duplicates in the graph
42+ self .edges : Set [str ] = set ()
43+
44+ def _get_style_priority (self , style : str ) -> int :
45+ """Assigns a priority to a node style. Higher numbers win."""
46+ if style == "box" : return 3 # dbt model (highest priority)
47+ if style == "source" : return 2 # dbt source
48+ if style == "db" : return 1 # db table (lowest priority)
49+ return 0
2650
2751 def _add_node (self , name : str , style : str = "box" ):
28- """Adds a node to the graph if it doesn't exist."""
29- if name not in self .nodes :
30- if style == "box" :
31- self .nodes .add (f' { name } ["{ name } "]' ) # dbt model
32- elif style == "db" :
33- self .nodes .add (f' { name } [("{ name } ")]' ) # DB table
34- elif style == "source" :
35- self .nodes .add (f' { name } (("{ name } "))' ) # dbt source
36- self .nodes .add (name )
52+ """
53+ Adds a node to the graph, applying style based on priority.
54+
55+ If the node already exists, its style is only updated if the new
56+ style has a higher priority than the current one. This ensures a
57+ dbt model is always styled as a model, not as a generic DB table.
58+ """
59+ current_style = self .nodes .get (name )
60+ current_priority = self ._get_style_priority (current_style ) if current_style else - 1
61+ new_priority = self ._get_style_priority (style )
62+
63+ if new_priority > current_priority :
64+ self .nodes [name ] = style
65+
66+ def _add_edge (self , from_node : str , to_node : str , label : str = "" ):
67+ """Adds a unique, formatted edge to the graph's edge set."""
68+ if label :
69+ self .edges .add (f' { from_node } -- "{ label } " --> { to_node } ' )
70+ else :
71+ self .edges .add (f' { from_node } --> { to_node } ' )
3772
3873 def generate_graph (self ) -> str :
39- """Generates the full Mermaid graph string."""
74+ """
75+ Generates the complete Mermaid.js graph string.
76+
77+ It processes database foreign keys first, then dbt dependencies,
78+ allowing the style prioritization logic in `_add_node` to work
79+ correctly. Finally, it assembles the unique nodes and edges into a
80+ single string.
81+
82+ Returns:
83+ A string containing the full Mermaid.js graph definition.
84+ """
4085 logger .info ("Generating global lineage graph..." )
4186
4287 # 1. Process DB Foreign Keys (Physical Lineage)
4388 for fk in self .db_fks :
4489 from_table = fk ["from_table" ]
4590 to_table = fk ["to_table" ]
4691
47- # Style DB tables
92+ # Add nodes with 'db' style (lowest priority)
4893 self ._add_node (from_table , "db" )
4994 self ._add_node (to_table , "db" )
50-
51- self .edges .append (f' { from_table } -- FK --> { to_table } ' )
95+ self ._add_edge (from_table , to_table , "FK" )
5296
5397 # 2. Process dbt Model Dependencies (Logical Lineage)
5498 for model in self .dbt_models :
5599 model_name = model ["name" ]
56- self ._add_node (model_name , "box" ) # Style dbt models
100+ self ._add_node (model_name , "box" ) # Style dbt models (highest priority)
57101
58102 for dep in model .get ("dependencies" , []):
59- if "." in dep : # This is a source (e.g., 'jaffle_shop.customers')
103+ # A dependency with a dot is a source (e.g., 'jaffle_shop.customers')
104+ if "." in dep :
60105 self ._add_node (dep , "source" )
61- self .edges . append ( f' { dep } --> { model_name } ' )
62- else : # This is another dbt model (a ref)
106+ self ._add_edge ( dep , model_name )
107+ else : # Otherwise, it's another dbt model (a ref)
63108 self ._add_node (dep , "box" )
64- self .edges . append ( f' { dep } --> { model_name } ' )
109+ self ._add_edge ( dep , model_name )
65110
66111 # 3. Combine into a Mermaid string
67112 graph_lines = ["graph TD;" ]
68- graph_lines .extend (sorted (list (self .nodes ))) # Add all unique node definitions
69- graph_lines .append ("" ) # Spacer
70- graph_lines .extend (sorted (list (self .edges ))) # Add all unique edges
113+
114+ # Define all nodes with their final, prioritized styles
115+ node_definitions = []
116+ for name , style in self .nodes .items ():
117+ if style == "box" :
118+ node_definitions .append (f' { name } ["{ name } "]' ) # dbt model
119+ elif style == "db" :
120+ node_definitions .append (f' { name } [("{ name } ")]' ) # DB table
121+ elif style == "source" :
122+ node_definitions .append (f' { name } (("{ name } "))' ) # dbt source
123+
124+ graph_lines .extend (sorted (node_definitions ))
125+ graph_lines .append ("" ) # Spacer for readability
126+ graph_lines .extend (sorted (list (self .edges )))
71127
72128 return "\n " .join (graph_lines )
73129
74130
75131class LineageWorkflow :
76132 """
77- Manages the workflow for the ' lineage' command.
133+ Manages the end-to-end workflow for the `data-scribe lineage` command.
78134 """
79135 def __init__ (
80136 self ,
@@ -83,14 +139,19 @@ def __init__(
83139 dbt_project_dir : str ,
84140 output_profile : str ,
85141 ):
142+ """
143+ Initializes the LineageWorkflow with parameters from the CLI.
144+ """
86145 self .config_path = config_path
87146 self .db_profile_name = db_profile
88147 self .dbt_project_dir = dbt_project_dir
89148 self .output_profile_name = output_profile
90149 self .config = load_config (config_path )
91150
92151 def run (self ):
93- """Executes the lineage generation workflow."""
152+ """
153+ Executes the full lineage generation and writing workflow.
154+ """
94155
95156 # 1. Get Physical Lineage (FKs) from DB
96157 db_connector = None
@@ -112,7 +173,7 @@ def run(self):
112173 # 2. Get Logical Lineage (refs) from dbt
113174 logger .info (f"Parsing dbt project at '{ self .dbt_project_dir } ' for dependencies..." )
114175 parser = DbtManifestParser (self .dbt_project_dir )
115- dbt_models = parser .models # This now contains 'dependencies'
176+ dbt_models = parser .models
116177 logger .info (f"Parsed { len (dbt_models )} dbt models." )
117178
118179 # 3. Generate Graph
@@ -125,6 +186,7 @@ def run(self):
125186 try :
126187 writer_params = self .config ["output_profiles" ][self .output_profile_name ]
127188 writer_type = writer_params .pop ("type" )
189+ # The workflow requires a 'mermaid' writer type.
128190 if writer_type != "mermaid" :
129191 logger .warning (f"Output profile '{ self .output_profile_name } ' is not type 'mermaid'. Using MermaidWriter anyway." )
130192
0 commit comments