Skip to content

Commit 04d73bb

Browse files
committed
Merge branch 'develop'
2 parents bb01ca3 + 65f50de commit 04d73bb

File tree

18 files changed

+390
-256
lines changed

18 files changed

+390
-256
lines changed

.env.docker

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ DATA_RENTGEN__KAFKA__COMPRESSION=zstd
3737
DATA_RENTGEN__UI__API_BROWSER_URL=http://localhost:8000
3838

3939
# Session
40+
DATA_RENTGEN__SERVER__SESSION__ENABLED=True
4041
DATA_RENTGEN__SERVER__SESSION__SECRET_KEY=session_secret_key
4142

4243
# Keycloak Auth
@@ -54,8 +55,8 @@ DATA_RENTGEN__AUTH__PROVIDER=data_rentgen.server.providers.auth.dummy_provider.D
5455
DATA_RENTGEN__AUTH__ACCESS_TOKEN__SECRET_KEY=access_key_secret
5556

5657
# Personal Tokens
57-
export DATA_RENTGEN__AUTH__PERSONAL_TOKENS__ENABLED=True
58-
export DATA_RENTGEN__AUTH__PERSONAL_TOKENS__SECRET_KEY=pat_secret
58+
DATA_RENTGEN__AUTH__PERSONAL_TOKENS__ENABLED=True
59+
DATA_RENTGEN__AUTH__PERSONAL_TOKENS__SECRET_KEY=pat_secret
5960

6061
# Cors
6162
DATA_RENTGEN__SERVER__CORS__ENABLED=True

.env.local

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export DATA_RENTGEN__SERVER__DEBUG=True
1616
export DATA_RENTGEN__UI__API_BROWSER_URL=http://localhost:8000
1717

1818
# Session
19+
export DATA_RENTGEN__SERVER__SESSION__ENABLED=True
1920
export DATA_RENTGEN__SERVER__SESSION__SECRET_KEY=session_secret_key
2021

2122
# Keycloak Auth

.pre-commit-config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ repos:
5151
- --no-extra-eol
5252

5353
- repo: https://github.com/asottile/pyupgrade
54-
rev: v3.21.0
54+
rev: v3.21.1
5555
hooks:
5656
- id: pyupgrade
5757
args: [--py37-plus, --keep-runtime-typing]
@@ -62,7 +62,7 @@ repos:
6262
- id: add-trailing-comma
6363

6464
- repo: https://github.com/astral-sh/ruff-pre-commit
65-
rev: v0.14.2
65+
rev: v0.14.5
6666
hooks:
6767
- id: ruff-check
6868
args: [--fix]
@@ -88,7 +88,7 @@ repos:
8888
- tomli
8989

9090
- repo: https://github.com/astral-sh/uv-pre-commit
91-
rev: 0.9.5
91+
rev: 0.9.9
9292
hooks:
9393
- id: uv-lock
9494

README.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ Goals
4848
-----
4949

5050
* Collect lineage events produced by OpenLineage clients & integrations.
51-
* Store operation-grained events for better detalization (instead of job grained `Marquez <https://marquezproject.ai/>`_).
51+
* Store operation-grained events for better detalization.
5252
* Provide API for fetching both job/run ↔ dataset lineage and dataset ↔ dataset lineage.
5353

5454
Features
5555
--------
5656

5757
* Support consuming large amounts of lineage events, use Apache Kafka as event buffer.
5858
* Store data in tables partitioned by event timestamp, to speed up lineage graph resolution.
59-
* Lineage graph is build with user-specified time boundaries (unlike Marquez where lineage is build only for last job run).
59+
* Lineage graph is build with user-specified time boundaries.
6060
* Lineage graph can be build with different granularity. e.g. merge all individual Spark commands into Spark applicationId or Spark applicationName.
6161
* Column-level lineage support.
6262
* Authentication support.
@@ -71,7 +71,7 @@ Limitations
7171
-----------
7272

7373
* OpenLineage have integrations with Trino, Debezium and some other lineage sources. DataRentgen support may be added later.
74-
* Unlike Marquez, DataRentgen parses only limited set of facets send by OpenLineage, and doesn't store custom facets. This can be changed in future.
74+
* DataRentgen parses only limited set of OpenLineage facets, and doesn't store custom facets. This can be changed in future.
7575

7676
.. documentation
7777

data_rentgen/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4.2
1+
0.4.3

data_rentgen/consumer/extractors/generic/operation.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
import codecs
6+
import re
57
from abc import ABC, abstractmethod
68
from textwrap import dedent
79

@@ -17,6 +19,13 @@
1719
OpenLineageRunEventType,
1820
)
1921

22+
# https://www.ascii-code.com/, but left \n intact
23+
ASCII_UNPRINTABLE = re.compile(r"[\x00-\x09\x0b-\x1f\x7f]", re.UNICODE)
24+
25+
26+
def encode_char(char: re.Match[str]) -> str:
27+
return codecs.encode(char.group(0), "unicode-escape").decode("utf-8")
28+
2029

2130
class OperationExtractorMixin(ABC):
2231
@abstractmethod
@@ -74,5 +83,7 @@ def _enrich_operation_status(self, operation: OperationDTO, event: OpenLineageRu
7483
def _extract_sql_query(self, event: OpenLineageRunEvent) -> SQLQueryDTO | None:
7584
if event.job.facets.sql:
7685
query = dedent(event.job.facets.sql.query).strip()
86+
# https://stackoverflow.com/questions/56237415/removing-encoding-utf8-0x00-chars-from-pandas-dataframe-for-psycopg2-cursor
87+
query = ASCII_UNPRINTABLE.sub(encode_char, query)
7788
return SQLQueryDTO(query=query)
7889
return None

data_rentgen/consumer/subscribers.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ async def report_malformed(
102102
await publisher.publish(
103103
message.value,
104104
key=message.key,
105-
partition=message.partition,
106105
timestamp_ms=message.timestamp,
107106
headers=headers or None,
108107
reply_to=message_id,
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Truncate inputs with too much size in bytes
4+
5+
Revision ID: 8e8891273099
6+
Revises: 102502e85b2d
7+
Create Date: 2025-11-21 18:28:52.279644
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "8e8891273099"
16+
down_revision = "102502e85b2d"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.execute(sa.text("UPDATE input SET num_bytes = NULL WHERE num_bytes >= 9223372036854775807"))
23+
24+
25+
def downgrade() -> None:
26+
pass
Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
11
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from pydantic import Field
4+
from pydantic import Field, PositiveInt, field_validator
55

66
from data_rentgen.openlineage.dataset_facets.base import (
77
OpenLineageInputDatasetFacet,
88
)
99

10+
MAX_LONG = 2**63 - 1
11+
1012

1113
class OpenLineageInputStatisticsInputDatasetFacet(OpenLineageInputDatasetFacet):
1214
"""Dataset facet describing Input statistics.
1315
See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/InputStatisticsInputDatasetFacet.json).
1416
"""
1517

16-
rows: int | None = Field(default=None, alias="rowCount", examples=[1_000_000])
17-
bytes: int | None = Field(default=None, alias="size", examples=[2**30])
18-
files: int | None = Field(default=None, alias="fileCount", examples=[0])
18+
rows: PositiveInt | None = Field(default=None, alias="rowCount", examples=[1_000_000])
19+
bytes: PositiveInt | None = Field(default=None, alias="size", examples=[2**30])
20+
files: PositiveInt | None = Field(default=None, alias="fileCount", examples=[0])
21+
22+
@field_validator("bytes", "rows", "files", mode="after")
23+
@classmethod
24+
def value_must_be_sane(cls, value: int | None):
25+
if value and value >= MAX_LONG:
26+
# https://github.com/apache/spark/blob/v3.5.7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2565
27+
# https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L209
28+
return None
29+
return value

data_rentgen/server/middlewares/session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88

99
def apply_session_middleware(app: FastAPI, settings: SessionSettings) -> FastAPI:
1010
"""Add SessionMiddleware middleware to the application."""
11+
if not settings.enabled:
12+
return app
1113

12-
settings_dict = settings.model_dump(exclude={"secret_key"})
13-
settings_dict["secret_key"] = settings.secret_key.get_secret_value()
14+
settings_dict = settings.model_dump(exclude={"secret_key", "enabled"})
15+
settings_dict["secret_key"] = settings.secret_key.get_secret_value() # type: ignore[union-attr]
1416

1517
app.add_middleware(SessionMiddleware, **settings_dict)
1618
return app

0 commit comments

Comments
 (0)