Skip to content

Commit 84d25c3

Browse files
committed
[DOP-26403] Set output type based on SQL query type
1 parent 39e58e4 commit 84d25c3

File tree

10 files changed

+190
-24
lines changed

10 files changed

+190
-24
lines changed

data_rentgen/consumer/extractors/generic/io.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
import re
56
from abc import ABC, abstractmethod
67
from datetime import datetime, timedelta
78

@@ -23,6 +24,25 @@
2324
from data_rentgen.openlineage.dataset_facets import OpenLineageSchemaField
2425
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
2526

27+
SQL_QUERY_SYNTAX = re.compile(
28+
r"\b(?P<query_type>MERGE|INSERT|UPDATE|DELETE|CREATE|RENAME|TRUNCATE|DROP(?!\sCOLUMN)|COPY)\s",
29+
re.IGNORECASE | re.DOTALL,
30+
)
31+
# Alter has lowest priority
32+
ALTER_SYNTAX = re.compile(r"\bALTER\s", re.IGNORECASE | re.DOTALL)
33+
QUERY_TYPE_TO_OUTPUT_TYPE = {
34+
"MERGE": OutputTypeDTO.MERGE,
35+
"INSERT": OutputTypeDTO.APPEND,
36+
"UPDATE": OutputTypeDTO.UPDATE,
37+
"DELETE": OutputTypeDTO.DELETE,
38+
"CREATE": OutputTypeDTO.CREATE,
39+
"RENAME": OutputTypeDTO.RENAME,
40+
"ALTER": OutputTypeDTO.ALTER,
41+
"TRUNCATE": OutputTypeDTO.TRUNCATE,
42+
"DROP": OutputTypeDTO.DROP,
43+
"COPY": OutputTypeDTO.APPEND,
44+
}
45+
2646
METASTORE = DatasetSymlinkTypeDTO.METASTORE
2747
WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE
2848

@@ -121,7 +141,7 @@ def extract_output(
121141
created_at=created_at,
122142
operation=operation,
123143
dataset=resolved_dataset_dto,
124-
type=self._extract_output_type(operation, dataset),
144+
type=self._extract_output_type(operation, dataset) or OutputTypeDTO.UNKNOWN,
125145
schema=self.extract_schema(dataset),
126146
)
127147
if dataset.outputFacets.outputStatistics:
@@ -135,10 +155,21 @@ def _extract_output_type(
135155
self,
136156
operation: OperationDTO,
137157
dataset: OpenLineageOutputDataset,
138-
) -> OutputTypeDTO:
158+
) -> OutputTypeDTO | None:
139159
if dataset.facets.lifecycleStateChange:
140160
return OutputTypeDTO[dataset.facets.lifecycleStateChange.lifecycleStateChange]
141-
return OutputTypeDTO.APPEND
161+
if operation.sql_query:
162+
return self._extract_output_type_from_sql(operation.sql_query.query)
163+
return None
164+
165+
def _extract_output_type_from_sql(self, sql: str) -> OutputTypeDTO | None:
166+
found = SQL_QUERY_SYNTAX.search(sql)
167+
if found:
168+
return QUERY_TYPE_TO_OUTPUT_TYPE[found.group("query_type")]
169+
found = ALTER_SYNTAX.search(sql)
170+
if found:
171+
return OutputTypeDTO.ALTER
172+
return None
142173

143174
def _schema_field_to_json(self, field: OpenLineageSchemaField):
144175
result: dict = {

data_rentgen/consumer/extractors/impl/dbt.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from __future__ import annotations
44

55
from data_rentgen.consumer.extractors.generic import GenericExtractor
6-
from data_rentgen.dto import DatasetDTO, OperationDTO, RunDTO
7-
from data_rentgen.openlineage.dataset import OpenLineageDataset
6+
from data_rentgen.dto import DatasetDTO, OperationDTO, OutputTypeDTO, RunDTO
7+
from data_rentgen.openlineage.dataset import OpenLineageDataset, OpenLineageOutputDataset
88
from data_rentgen.openlineage.dataset_facets import (
99
OpenLineageColumnLineageDatasetFacetFieldRef,
1010
OpenLineageSymlinkIdentifier,
@@ -41,9 +41,18 @@ def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
4141

4242
def _extract_dataset_ref(
4343
self,
44-
dataset_ref: OpenLineageDataset | OpenLineageColumnLineageDatasetFacetFieldRef | OpenLineageSymlinkIdentifier,
44+
dataset: OpenLineageDataset | OpenLineageColumnLineageDatasetFacetFieldRef | OpenLineageSymlinkIdentifier,
4545
) -> DatasetDTO:
46-
dataset = super()._extract_dataset_ref(dataset_ref)
46+
dataset_dto = super()._extract_dataset_ref(dataset)
4747
# https://github.com/OpenLineage/OpenLineage/pull/3707
48-
dataset.name = dataset.name.replace("None.", "")
49-
return dataset
48+
dataset_dto.name = dataset.name.replace("None.", "")
49+
return dataset_dto
50+
51+
def _extract_output_type(
52+
self,
53+
operation: OperationDTO,
54+
dataset: OpenLineageOutputDataset,
55+
) -> OutputTypeDTO | None:
56+
# by default, model is not materialized, and is either VIEW or INSERT INTO
57+
result = super()._extract_output_type(operation, dataset)
58+
return result or OutputTypeDTO.APPEND

data_rentgen/consumer/extractors/impl/flink.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from __future__ import annotations
44

55
from data_rentgen.consumer.extractors.generic import GenericExtractor
6-
from data_rentgen.dto import DatasetDTO, DatasetSymlinkDTO, RunDTO
7-
from data_rentgen.openlineage.dataset import OpenLineageDataset
6+
from data_rentgen.dto import DatasetDTO, DatasetSymlinkDTO, OperationDTO, OutputTypeDTO, RunDTO
7+
from data_rentgen.openlineage.dataset import OpenLineageDataset, OpenLineageOutputDataset
88
from data_rentgen.openlineage.dataset_facets import (
99
OpenLineageSymlinkIdentifier,
1010
OpenLineageSymlinkType,
@@ -59,3 +59,12 @@ def _extract_dataset_and_symlinks(
5959
if not (identifier.namespace.startswith("kafka://") and identifier.type == OpenLineageSymlinkType.TABLE)
6060
]
6161
return super()._extract_dataset_and_symlinks(dataset, symlink_identifiers)
62+
63+
def _extract_output_type(
64+
self,
65+
operation: OperationDTO,
66+
dataset: OpenLineageOutputDataset,
67+
) -> OutputTypeDTO | None:
68+
# In most real cases, Flink writes to Kafka with APPEND
69+
result = super()._extract_output_type(operation, dataset)
70+
return result or OutputTypeDTO.APPEND

data_rentgen/consumer/extractors/impl/spark.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
7777

7878
def _extract_dataset_ref(
7979
self,
80-
dataset_ref: OpenLineageDataset | OpenLineageColumnLineageDatasetFacetFieldRef | OpenLineageSymlinkIdentifier,
80+
dataset: OpenLineageDataset | OpenLineageColumnLineageDatasetFacetFieldRef | OpenLineageSymlinkIdentifier,
8181
) -> DatasetDTO:
82-
dataset = super()._extract_dataset_ref(dataset_ref)
82+
dataset_dto = super()._extract_dataset_ref(dataset)
8383

8484
# convert /some/long/path/with=partition/another=abc to /some/long/path
85-
if "=" in dataset.name and "/" in dataset.name:
86-
name_with_partitions = PARTITION_PATH_PATTERN.match(dataset.name)
85+
if "=" in dataset_dto.name and "/" in dataset_dto.name:
86+
name_with_partitions = PARTITION_PATH_PATTERN.match(dataset_dto.name)
8787
if name_with_partitions:
88-
dataset.name = name_with_partitions.group(1)
89-
return dataset
88+
dataset_dto.name = name_with_partitions.group(1)
89+
return dataset_dto
9090

9191
def _extract_dataset_and_symlinks(
9292
self,
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Increase output.type to int32
4+
5+
Revision ID: fc001835e473
6+
Revises: 85592fce8fb0
7+
Create Date: 2025-09-15 18:53:32.392011
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "fc001835e473"
16+
down_revision = "85592fce8fb0"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.alter_column(
23+
"output",
24+
"type",
25+
existing_type=sa.SMALLINT(),
26+
type_=sa.INTEGER(),
27+
existing_nullable=False,
28+
)
29+
30+
31+
def downgrade() -> None:
32+
op.alter_column(
33+
"output",
34+
"type",
35+
existing_type=sa.INTEGER(),
36+
type_=sa.SMALLINT(),
37+
existing_nullable=False,
38+
)

data_rentgen/db/models/output.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from uuid import UUID
99

1010
from sqlalchemy import UUID as SQL_UUID
11-
from sqlalchemy import BigInteger, DateTime, PrimaryKeyConstraint, SmallInteger
11+
from sqlalchemy import BigInteger, DateTime, Integer, PrimaryKeyConstraint
1212
from sqlalchemy.orm import Mapped, mapped_column, relationship
1313
from sqlalchemy_utils import ChoiceType
1414

@@ -21,6 +21,8 @@
2121

2222

2323
class OutputType(IntFlag):
24+
UNKNOWN = 0
25+
2426
APPEND = 1
2527

2628
CREATE = 2
@@ -32,6 +34,10 @@ class OutputType(IntFlag):
3234
DROP = 32
3335
TRUNCATE = 64
3436

37+
DELETE = 128
38+
UPDATE = 256
39+
MERGE = 512
40+
3541

3642
# no foreign keys to avoid scanning all the partitions
3743
class Output(Base):
@@ -102,9 +108,9 @@ class Output(Base):
102108
)
103109

104110
type: Mapped[OutputType] = mapped_column(
105-
ChoiceType(OutputType, impl=SmallInteger()),
111+
ChoiceType(OutputType, impl=Integer()),
106112
nullable=False,
107-
default=OutputType.APPEND,
113+
default=OutputType.UNKNOWN,
108114
doc="Type of the output, e.g. READ, CREATE, APPEND",
109115
)
110116

data_rentgen/dto/output.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616

1717
class OutputTypeDTO(IntFlag):
18+
UNKNOWN = 0
19+
1820
APPEND = 1
1921

2022
CREATE = 2
@@ -26,13 +28,17 @@ class OutputTypeDTO(IntFlag):
2628
DROP = 32
2729
TRUNCATE = 64
2830

31+
DELETE = 128
32+
UPDATE = 256
33+
MERGE = 512
34+
2935

3036
@dataclass(slots=True)
3137
class OutputDTO:
3238
created_at: datetime
3339
operation: OperationDTO
3440
dataset: DatasetDTO
35-
type: OutputTypeDTO
41+
type: OutputTypeDTO = OutputTypeDTO.UNKNOWN
3642
schema: SchemaDTO | None = None
3743
num_rows: int | None = None
3844
num_bytes: int | None = None

data_rentgen/server/schemas/v1/lineage.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ class OutputTypeV1(IntFlag):
136136
DROP = 32
137137
TRUNCATE = 64
138138

139+
DELETE = 128
140+
UPDATE = 256
141+
MERGE = 512
142+
139143
def __str__(self) -> str:
140144
return f"{self.name}"
141145

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Set ``output.type`` based on executed SQL query, e.g. ``INSERT``, ``UPDATE``, ``DELETE``, and so on.

tests/test_consumer/test_extractors/test_extractors_interaction.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
import pytest
77

88
from data_rentgen.consumer.extractors.generic import GenericExtractor
9-
from data_rentgen.dto import DatasetDTO, InputDTO, LocationDTO, OperationDTO, OutputDTO, OutputTypeDTO, SchemaDTO
9+
from data_rentgen.dto import (
10+
DatasetDTO,
11+
InputDTO,
12+
LocationDTO,
13+
OperationDTO,
14+
OutputDTO,
15+
OutputTypeDTO,
16+
SchemaDTO,
17+
SQLQueryDTO,
18+
)
1019
from data_rentgen.openlineage.dataset import (
1120
OpenLineageDataset,
1221
OpenLineageInputDataset,
@@ -199,7 +208,7 @@ def test_extractors_extract_input_for_long_operations():
199208
(None, None, None),
200209
],
201210
)
202-
def test_extractors_extract_output_batch(
211+
def test_extractors_extract_output_batch_with_lifecycle(
203212
lifecycle_state_change: OpenLineageDatasetLifecycleStateChange,
204213
expected_type: OutputTypeDTO,
205214
row_count: int | None,
@@ -247,6 +256,58 @@ def test_extractors_extract_output_batch(
247256
)
248257

249258

259+
@pytest.mark.parametrize(
260+
["sql_query", "expected_type"],
261+
[
262+
("CREATE TABLE AS SELECT * FROM mytable", OutputTypeDTO.CREATE),
263+
("INSERT INTO mytable SELECT * FROM mytable", OutputTypeDTO.APPEND),
264+
("UPDATE mytable SET a=1", OutputTypeDTO.UPDATE),
265+
("DELETE FROM mytable", OutputTypeDTO.DELETE),
266+
("COPY mytable FROM '...'", OutputTypeDTO.APPEND),
267+
("ALTER TABLE mytable RENAME TO mytable_new", OutputTypeDTO.RENAME),
268+
("ALTER TABLE mytable DROP COLUMN a", OutputTypeDTO.ALTER),
269+
("TRUNCATE TABLE mytable", OutputTypeDTO.TRUNCATE),
270+
("TRUNCATE TABLE mytable DROP STORAGE", OutputTypeDTO.TRUNCATE),
271+
("ALTER TABLE mytable TRUNCATE PARTITION (a=1, b=2)", OutputTypeDTO.TRUNCATE),
272+
("DROP TABLE mytable", OutputTypeDTO.DROP),
273+
("DROP TABLE mytable PURGE", OutputTypeDTO.DROP),
274+
("ALTER TABLE mytable DROP PARTITION (a=1, b=2)", OutputTypeDTO.DROP),
275+
("MERGE INTO mytable", OutputTypeDTO.MERGE),
276+
("CALL myproc()", OutputTypeDTO.UNKNOWN),
277+
],
278+
)
279+
def test_extractors_extract_output_batch_with_sql(
280+
sql_query: str,
281+
expected_type: OutputTypeDTO,
282+
):
283+
output = OpenLineageOutputDataset(
284+
namespace="hdfs://test-hadoop:9820",
285+
name="/user/hive/warehouse/mydb.db/mytable",
286+
)
287+
operation = Mock(spec=OperationDTO)
288+
operation.sql_query = SQLQueryDTO(query=sql_query)
289+
290+
event = Mock(spec=OpenLineageRunEvent)
291+
operation.created_at = event.eventTime = datetime(2024, 7, 5, 9, 6, 29, 462000, tzinfo=timezone.utc)
292+
293+
assert GenericExtractor().extract_output(operation, output, event) == (
294+
OutputDTO(
295+
created_at=operation.created_at,
296+
type=expected_type,
297+
operation=operation,
298+
dataset=DatasetDTO(
299+
name="/user/hive/warehouse/mydb.db/mytable",
300+
location=LocationDTO(
301+
type="hdfs",
302+
name="test-hadoop:9820",
303+
addresses={"hdfs://test-hadoop:9820"},
304+
),
305+
),
306+
),
307+
[],
308+
)
309+
310+
250311
def test_extractors_extract_output_for_long_running_operations():
251312
output = OpenLineageOutputDataset(
252313
namespace="hdfs://test-hadoop:9820",
@@ -255,6 +316,7 @@ def test_extractors_extract_output_for_long_running_operations():
255316

256317
# operation is streaming and created long time ago
257318
operation = Mock(spec=OperationDTO)
319+
operation.sql_query = None
258320
operation.created_at = datetime(2024, 7, 5, tzinfo=timezone.utc)
259321

260322
event = Mock(spec=OpenLineageRunEvent)
@@ -264,7 +326,7 @@ def test_extractors_extract_output_for_long_running_operations():
264326
OutputDTO(
265327
# count only whole hours since operation was created
266328
created_at=operation.created_at + timedelta(hours=9),
267-
type=OutputTypeDTO.APPEND,
329+
type=OutputTypeDTO.UNKNOWN,
268330
operation=operation,
269331
dataset=DatasetDTO(
270332
name="/user/hive/warehouse/mydb.db/mytable",

0 commit comments

Comments
 (0)