Skip to content

Commit b38f676

Browse files
committed
feat: initial implementation
Signed-off-by: Lucas Roesler <[email protected]>
0 parents  commit b38f676

File tree

10 files changed

+4080
-0
lines changed

10 files changed

+4080
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
__pycache__
2+
.venv
3+
.pytest_cache

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Datahub Postgres View Lineage
2+
3+
4+
A ingestion source to generate lineage for views in a Postgres database.
5+
6+
7+
## Quick Start
8+
9+
First install [Poetry](https://python-poetry.org/docs/#installation)
10+
11+
Now, start a database
12+
13+
```sh
14+
docker compose up -f examples/docker-compose.yaml -d
15+
```
16+
17+
Create some tables and views in your database using psql
18+
19+
```sh
20+
psql -U normal_user -d cool_db -h localhost -p 6432 -c "create table if not exists users ( id uuid primary key, name text, email text, age int);"
21+
psql -U normal_user -d cool_db -h localhost -p 6432 -c "create view names as select distinct(name) from users;"
22+
```
23+
24+
Now run the ingestion to the console
25+
```sh
26+
poetry install
27+
poetry run datahub ingest -c examples/recipe.yaml
28+
```
29+
30+
When it is successful, the output should include
31+
32+
```sh
33+
Source (datahub_postgres_lineage.ingestion.PostgresLineageSource) report:
34+
{'events_produced': '1',
35+
'events_produced_per_sec': '1',
36+
'event_ids': ['urn:li:dataset:(urn:li:dataPlatform:postgres,cool_db.public.names,PROD)-upstreamLineage'],
37+
'warnings': {},
38+
'failures': {},
39+
'soft_deleted_stale_entities': [],
40+
'tables_scanned': '0',
41+
'views_scanned': '0',
42+
'entities_profiled': '0',
43+
'filtered': [],
44+
'start_time': '2022-12-16 18:17:57.889983 (now).',
45+
'running_time': '0.59 seconds'}
46+
```

datahub_postgres_lineage/__init__.py

Whitespace-only changes.

datahub_postgres_lineage/ingestion.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
from contextlib import contextmanager
2+
import logging
3+
from typing import Iterable, List, Optional
4+
5+
from pydantic import BaseModel
6+
7+
from sqlalchemy import create_engine
8+
from sqlalchemy.engine import Connection, CursorResult
9+
10+
from datahub.ingestion.source.sql.postgres import PostgresConfig
11+
from datahub.ingestion.source.sql.sql_common import SQLAlchemySource
12+
from datahub.ingestion.api.source import TestableSource
13+
from datahub.ingestion.api.workunit import MetadataWorkUnit
14+
15+
from datahub.ingestion.source.state.stateful_ingestion_base import (
16+
StatefulIngestionSourceBase,
17+
)
18+
from datahub.ingestion.api.decorators import (
19+
SupportStatus,
20+
config_class,
21+
platform_name,
22+
support_status,
23+
)
24+
25+
# from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
26+
# DatasetProperties,
27+
# UpstreamLineage,
28+
# ViewProperties,
29+
# )
30+
31+
from datahub.emitter import mce_builder
32+
from datahub.emitter.mcp_builder import (
33+
mcps_from_mce,
34+
# wrap_aspect_as_workunit
35+
)
36+
37+
logger: logging.Logger = logging.getLogger(__name__)
38+
39+
40+
VIEW_LINEAGE_QUERY = """
41+
WITH RECURSIVE view_deps AS (
42+
SELECT DISTINCT dependent_ns.nspname as dependent_schema
43+
, dependent_view.relname as dependent_view
44+
, source_ns.nspname as source_schema
45+
, source_table.relname as source_table
46+
FROM pg_depend
47+
JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid
48+
JOIN pg_class as dependent_view ON pg_rewrite.ev_class = dependent_view.oid
49+
JOIN pg_class as source_table ON pg_depend.refobjid = source_table.oid
50+
JOIN pg_namespace dependent_ns ON dependent_ns.oid = dependent_view.relnamespace
51+
JOIN pg_namespace source_ns ON source_ns.oid = source_table.relnamespace
52+
WHERE NOT (dependent_ns.nspname = source_ns.nspname AND dependent_view.relname = source_table.relname)
53+
UNION
54+
SELECT DISTINCT dependent_ns.nspname as dependent_schema
55+
, dependent_view.relname as dependent_view
56+
, source_ns.nspname as source_schema
57+
, source_table.relname as source_table
58+
FROM pg_depend
59+
JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid
60+
JOIN pg_class as dependent_view ON pg_rewrite.ev_class = dependent_view.oid
61+
JOIN pg_class as source_table ON pg_depend.refobjid = source_table.oid
62+
JOIN pg_namespace dependent_ns ON dependent_ns.oid = dependent_view.relnamespace
63+
JOIN pg_namespace source_ns ON source_ns.oid = source_table.relnamespace
64+
INNER JOIN view_deps vd
65+
ON vd.dependent_schema = source_ns.nspname
66+
AND vd.dependent_view = source_table.relname
67+
AND NOT (dependent_ns.nspname = vd.dependent_schema AND dependent_view.relname = vd.dependent_view)
68+
)
69+
70+
71+
SELECT source_table, source_schema, dependent_view, dependent_schema
72+
FROM view_deps
73+
WHERE NOT (source_schema = 'information_schema' OR source_schema = 'pg_catalog')
74+
ORDER BY source_schema, source_table;
75+
"""
76+
77+
78+
class ViewLineageEntry(BaseModel):
79+
# note that the order matches our query above
80+
# so pydantic is able to parse the tuple using parse_obj
81+
source_table: str
82+
source_schema: str
83+
dependent_view: str
84+
dependent_schema: str
85+
86+
87+
class ViewLineage(BaseModel):
88+
lineage: List[ViewLineageEntry]
89+
90+
91+
class PostgresLineageConfig(PostgresConfig):
92+
pass
93+
94+
95+
@platform_name("Postgres")
96+
@config_class(PostgresLineageConfig)
97+
@support_status(SupportStatus.TESTING)
98+
class PostgresLineageSource(
99+
SQLAlchemySource, StatefulIngestionSourceBase, TestableSource
100+
):
101+
def __init__(self, config, ctx):
102+
super().__init__(config, ctx, "postgres")
103+
self.config = config
104+
105+
@classmethod
106+
def create(cls, config_dict, ctx):
107+
config = PostgresConfig.parse_obj(config_dict)
108+
return cls(config, ctx)
109+
110+
def test_connection(self):
111+
with self._get_connection() as conn:
112+
results = conn.execute("SELECT 1")
113+
row = results.fetchone()
114+
assert row is not None, "No rows returned from SELECT 1"
115+
assert row[0] == 1, "SELECT 1 returned unexpected value"
116+
117+
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
118+
# https://github.com/datahub-project/datahub/blob/b236d0958c046aa58a62eb44de0176a72c16ba8a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage.py#L166
119+
120+
logger.info("Getting lineage for postgres")
121+
data: List[ViewLineageEntry] = []
122+
with self._get_connection() as conn:
123+
results: CursorResult = conn.execute(VIEW_LINEAGE_QUERY)
124+
125+
if results.returns_rows is False:
126+
return None
127+
128+
for row in results:
129+
logger.info(row)
130+
data.append(ViewLineageEntry.parse_obj(row))
131+
132+
if len(data) == 0:
133+
return None
134+
135+
# original reference was https://github.com/datahub-project/datahub/blob/9a1f78fc60f692ebbd57c0a8cbabe4bfde44376b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L193
136+
# but the only addition seems to be a call to self.report
137+
# dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,{},PROD)".format(
138+
# "test"
139+
# )
140+
# upstream_lineage: Optional[UpstreamLineage] = None
141+
# yield wrap_aspect_as_workunit(
142+
# "dataset", dataset_urn, "upstreamLineage", upstream_lineage
143+
# )
144+
145+
lineage_elements = {}
146+
# Loop over the lineages in the JSON data.
147+
for lineage in data:
148+
# Check if the dependent view + source schema already exists in the dictionary, if not create a new entry.
149+
# Use ':::::' to join as it is unlikely to be part of a schema or view name.
150+
key = ":::::".join([lineage.dependent_view, lineage.dependent_schema])
151+
if key not in lineage_elements:
152+
lineage_elements[key] = []
153+
154+
# Append the source table to the list.
155+
lineage_elements[key].append(
156+
mce_builder.make_dataset_urn(
157+
"postgres",
158+
".".join(
159+
[
160+
self.config.database,
161+
lineage.source_schema,
162+
lineage.source_table,
163+
]
164+
),
165+
self.config.env,
166+
)
167+
)
168+
169+
# Loop over the lineage elements dictionary.
170+
for key, source_tables in lineage_elements.items():
171+
# Split the key into dependent view and dependent schema
172+
dependent_view, dependent_schema = key.split(":::::")
173+
174+
# Construct a lineage object.
175+
urn = mce_builder.make_dataset_urn(
176+
"postgres",
177+
".".join([self.config.database, dependent_schema, dependent_view]),
178+
self.config.env,
179+
)
180+
lineage_mce = mce_builder.make_lineage_mce(
181+
source_tables,
182+
urn,
183+
)
184+
185+
for item in mcps_from_mce(lineage_mce):
186+
wu = item.as_workunit()
187+
self.report.report_workunit(wu)
188+
yield wu
189+
190+
@contextmanager
191+
def _get_connection(self) -> Connection:
192+
# This method can be overridden in the case that you want to dynamically
193+
# run on multiple databases.
194+
195+
url = self.config.get_sql_alchemy_url()
196+
logger.debug(f"sql_alchemy_url={url}")
197+
engine = create_engine(url, **self.config.options)
198+
199+
with engine.connect() as conn:
200+
yield conn

examples/docker-compose.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
version: "3.7"
2+
3+
services:
4+
sample-db:
5+
image: postgres:14-alpine
6+
environment:
7+
POSTGRES_PASSWORD: password
8+
POSTGRES_USER: normal_user
9+
POSTGRES_DB: cool_db
10+
POSTGRES_HOST_AUTH_METHOD: md5
11+
ports:
12+
- "6432:5432"
13+
volumes:
14+
- pg-lineage:/var/lib/postgresql/data
15+
16+
volumes:
17+
pg-lineage:

examples/recipe.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
source:
2+
type: datahub_postgres_lineage.ingestion.PostgresLineageSource
3+
config:
4+
# Coordinates
5+
host_port: localhost:6432
6+
database: cool_db
7+
8+
# Credentials
9+
username: normal_user
10+
password: password
11+
12+
sink:
13+
type: console

0 commit comments

Comments
 (0)