Skip to content

Commit 138ec7f

Browse files
committed
[DOP-28706] Make name unique check case-insensitive
1 parent 4d7256d commit 138ec7f

File tree

12 files changed

+138
-24
lines changed

12 files changed

+138
-24
lines changed

data_rentgen/consumer/extractors/generic/dataset.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ def _extract_dataset_location(
4343
self,
4444
dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef,
4545
) -> LocationDTO:
46-
namespace = dataset.namespace
46+
# hostname and scheme are normalized to lowercase for uniqueness
47+
namespace = dataset.namespace.lower()
4748
if namespace == "file":
4849
# TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709
4950
namespace = "file://"

data_rentgen/consumer/extractors/generic/job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobD
3636
)
3737

3838
def _extract_job_location(self, job: OpenLineageJob | OpenLineageParentJob) -> LocationDTO:
39-
url = urlparse(job.namespace)
39+
# hostname and scheme are normalized to lowercase for uniqueness
40+
url = urlparse(job.namespace.lower())
4041
scheme = url.scheme or "unknown"
4142
netloc = url.netloc or url.path
4243
return LocationDTO(
@@ -50,6 +51,7 @@ def _extract_job_type(self, job: OpenLineageJob) -> JobTypeDTO | None:
5051
integration_type = job.facets.jobType.integration
5152
job_type = job.facets.jobType.jobType
5253
type_ = f"{integration_type}_{job_type}" if job_type else integration_type
54+
# job_type are always upper case
5355
return JobTypeDTO(type=type_.upper())
5456

5557
return None
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Make unique names constraints case-insensitive
4+
5+
Revision ID: 17481d3b2466
6+
Revises: fc001835e473
7+
Create Date: 2025-09-16 11:18:01.308085
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "17481d3b2466"
16+
down_revision = "fc001835e473"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.execute(sa.text("UPDATE location SET type = lower(type), name = lower(name)"))
23+
op.execute(sa.text("UPDATE address SET url = lower(url)"))
24+
op.execute(sa.text("UPDATE job_type SET type = upper(type)"))
25+
26+
op.create_index(
27+
"ix__dataset__location_id__name_lower",
28+
"dataset",
29+
["location_id", sa.literal_column("lower(name)")],
30+
unique=True,
31+
)
32+
op.drop_constraint(op.f("uq__dataset__location_id_name"), "dataset", type_="unique")
33+
op.drop_index(op.f("ix__dataset__location_id"), table_name="dataset")
34+
op.drop_index(op.f("ix__dataset__name"), table_name="dataset")
35+
36+
op.create_index(
37+
"ix__job__location_id_name_lower",
38+
"job",
39+
["location_id", sa.literal_column("lower(name)")],
40+
unique=True,
41+
)
42+
op.drop_constraint(op.f("uq__job__location_id_name"), "job", type_="unique")
43+
op.drop_index(op.f("ix__job__location_id"), table_name="job")
44+
op.drop_index(op.f("ix__job__name"), table_name="job")
45+
46+
op.create_index("ix__tag__name_lower", "tag", [sa.literal_column("lower(name)")], unique=True)
47+
op.drop_constraint(op.f("uq__tag__name"), "tag", type_="unique")
48+
49+
op.create_index(
50+
"ix__tag_value__tag_id_value_lower",
51+
"tag_value",
52+
["tag_id", sa.literal_column("lower(value)")],
53+
unique=True,
54+
)
55+
op.drop_constraint(op.f("uq__tag_value__tag_id_value"), "tag_value", type_="unique")
56+
op.drop_index(op.f("ix__tag_value__tag_id"), table_name="tag_value")
57+
58+
op.create_index("ix__user__name_lower", "user", [sa.literal_column("lower(name)")], unique=True)
59+
op.drop_index(op.f("ix__user__name"), table_name="user")
60+
61+
62+
def downgrade() -> None:
63+
op.create_index(op.f("ix__user__name"), "user", ["name"], unique=True)
64+
op.drop_index("ix__user__name_lower", table_name="user")
65+
66+
op.create_unique_constraint(
67+
op.f("uq__tag_value__tag_id_value"),
68+
"tag_value",
69+
["tag_id", "value"],
70+
postgresql_nulls_not_distinct=False,
71+
)
72+
op.create_index(op.f("ix__tag_value__tag_id"), "tag_value", ["tag_id"], unique=False)
73+
op.drop_index("ix__tag_value__tag_id_value_lower", table_name="tag_value")
74+
75+
op.create_unique_constraint(op.f("uq__tag__name"), "tag", ["name"])
76+
op.drop_index("ix__tag__name_lower", table_name="tag")
77+
78+
op.create_unique_constraint(op.f("uq__job_type__type"), "job_type", ["type"])
79+
op.create_index(op.f("ix__job_type__type"), "job_type", ["type"], unique=False)
80+
op.drop_index("ix__job_type__type_lower", table_name="job_type")
81+
82+
op.create_index(op.f("ix__job__location_id"), "job", ["location_id"], unique=False)
83+
op.create_index(op.f("ix__job__name"), "job", ["name"], unique=False)
84+
op.create_unique_constraint(
85+
op.f("uq__job__location_id_name"),
86+
"job",
87+
["location_id", "name"],
88+
)
89+
op.drop_index("ix__job__location_id_name_lower", table_name="job")
90+
91+
op.create_unique_constraint(
92+
op.f("uq__dataset__location_id_name"),
93+
"dataset",
94+
["location_id", "name"],
95+
)
96+
op.create_index(op.f("ix__dataset__name"), "dataset", ["name"], unique=False)
97+
op.create_index(op.f("ix__dataset__location_id"), "dataset", ["location_id"], unique=False)
98+
op.drop_index("ix__dataset__location_id__name_lower", table_name="dataset")

data_rentgen/db/models/dataset.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, Table, UniqueConstraint
6+
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, Table, column, func
77
from sqlalchemy.dialects.postgresql import TSVECTOR
88
from sqlalchemy.orm import Mapped, mapped_column, relationship
99

@@ -15,7 +15,7 @@
1515
class Dataset(Base):
1616
__tablename__ = "dataset"
1717
__table_args__ = (
18-
UniqueConstraint("location_id", "name"),
18+
Index("ix__dataset__location_id__name_lower", "location_id", func.lower(column("name")), unique=True),
1919
Index("ix__dataset__search_vector", "search_vector", postgresql_using="gin"),
2020
)
2121

@@ -24,7 +24,6 @@ class Dataset(Base):
2424
location_id: Mapped[int] = mapped_column(
2525
BigInteger,
2626
ForeignKey("location.id", ondelete="CASCADE"),
27-
index=True,
2827
nullable=False,
2928
doc="Where dataset's data is actually located (database address, filesystem address)",
3029
)
@@ -36,7 +35,6 @@ class Dataset(Base):
3635

3736
name: Mapped[str] = mapped_column(
3837
String,
39-
index=True,
4038
nullable=False,
4139
doc="Dataset name, e.g. table name or filesystem path",
4240
)

data_rentgen/db/models/job.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, UniqueConstraint, select
6+
from sqlalchemy import BigInteger, Column, Computed, ForeignKey, Index, String, column, func, select
77
from sqlalchemy.dialects.postgresql import TSVECTOR
88
from sqlalchemy.orm import Mapped, column_property, mapped_column, relationship
99

@@ -15,7 +15,7 @@
1515
class Job(Base):
1616
__tablename__ = "job"
1717
__table_args__ = (
18-
UniqueConstraint("location_id", "name"),
18+
Index("ix__job__location_id_name_lower", "location_id", func.lower(column("name")), unique=True),
1919
Index("ix__job__search_vector", "search_vector", postgresql_using="gin"),
2020
)
2121

@@ -24,15 +24,13 @@ class Job(Base):
2424
location_id: Mapped[int] = mapped_column(
2525
BigInteger,
2626
ForeignKey("location.id", ondelete="CASCADE"),
27-
index=True,
2827
nullable=False,
2928
doc="Where job is located (Airflow instance, Spark cluster)",
3029
)
3130
location: Mapped[Location] = relationship(Location, lazy="noload")
3231

3332
name: Mapped[str] = mapped_column(
3433
String,
35-
index=True,
3634
nullable=False,
3735
doc="Job name, e.g. Airflow DAG name + task name, or Spark applicationName",
3836
)

data_rentgen/db/models/tag.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from typing import TYPE_CHECKING
66

7-
from sqlalchemy import BigInteger, Computed, Index, String
7+
from sqlalchemy import BigInteger, Computed, Index, String, column, func
88
from sqlalchemy.dialects.postgresql import TSVECTOR
99
from sqlalchemy.orm import Mapped, mapped_column, relationship
1010

@@ -16,10 +16,13 @@
1616

1717
class Tag(Base):
1818
__tablename__ = "tag"
19-
__table_args__ = (Index("ix__tag__search_vector", "search_vector", postgresql_using="gin"),)
19+
__table_args__ = (
20+
Index("ix__tag__name_lower", func.lower(column("name")), unique=True),
21+
Index("ix__tag__search_vector", "search_vector", postgresql_using="gin"),
22+
)
2023

2124
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
22-
name: Mapped[str] = mapped_column(String(32), nullable=False, unique=True)
25+
name: Mapped[str] = mapped_column(String(32), nullable=False)
2326
tag_values: Mapped[list[TagValue]] = relationship(
2427
"TagValue",
2528
lazy="noload",
@@ -29,7 +32,7 @@ class Tag(Base):
2932
search_vector: Mapped[str] = mapped_column(
3033
TSVECTOR,
3134
Computed(
32-
"to_tsvector('simple'::regconfig, name || ' ' || translate(name, '.', ' '))",
35+
"to_tsvector('simple'::regconfig, name || ' ' || translate(name, '/.', ' '))",
3336
persisted=True,
3437
),
3538
nullable=False,

data_rentgen/db/models/tag_value.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5-
from sqlalchemy import BigInteger, Computed, ForeignKey, Index, String, UniqueConstraint
5+
from sqlalchemy import BigInteger, Computed, ForeignKey, Index, String, column, func
66
from sqlalchemy.dialects.postgresql import TSVECTOR
77
from sqlalchemy.orm import Mapped, mapped_column, relationship
88

@@ -13,15 +13,14 @@
1313
class TagValue(Base):
1414
__tablename__ = "tag_value"
1515
__table_args__ = (
16-
UniqueConstraint("tag_id", "value"),
16+
Index("ix__tag_value__tag_id_value_lower", "tag_id", func.lower(column("value")), unique=True),
1717
Index("ix__tag_value__search_vector", "search_vector", postgresql_using="gin"),
1818
)
1919

2020
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
2121
tag_id: Mapped[int] = mapped_column(
2222
BigInteger,
2323
ForeignKey("tag.id", ondelete="CASCADE"),
24-
index=True,
2524
nullable=False,
2625
)
2726

@@ -36,7 +35,7 @@ class TagValue(Base):
3635
search_vector: Mapped[str] = mapped_column(
3736
TSVECTOR,
3837
Computed(
39-
"to_tsvector('simple'::regconfig, value || ' ' || translate(value, '.', ' '))",
38+
"to_tsvector('simple'::regconfig, value || ' ' || translate(value, '/.', ' '))",
4039
persisted=True,
4140
),
4241
nullable=False,

data_rentgen/db/models/user.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5-
from sqlalchemy import BigInteger, String
5+
from sqlalchemy import BigInteger, Index, String, column, func
66
from sqlalchemy.orm import Mapped, mapped_column
77

88
from data_rentgen.db.models.base import Base
99

1010

1111
class User(Base):
1212
__tablename__ = "user"
13+
__table_args__ = (Index("ix__user__name_lower", func.lower(column("name")), unique=True),)
1314

1415
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
15-
name: Mapped[str] = mapped_column(String, index=True, unique=True)
16+
name: Mapped[str] = mapped_column(String, nullable=False)

data_rentgen/db/repositories/dataset.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ async def get_stats_by_location_ids(self, location_ids: Collection[int]) -> dict
144144
return {row.location_id: row for row in query_result.all()}
145145

146146
async def _get(self, dataset: DatasetDTO) -> Dataset | None:
147-
statement = select(Dataset).where(Dataset.location_id == dataset.location.id, Dataset.name == dataset.name)
147+
statement = select(Dataset).where(
148+
Dataset.location_id == dataset.location.id,
149+
func.lower(Dataset.name) == dataset.name.lower(),
150+
)
148151
return await self._session.scalar(statement)
149152

150153
async def _create(self, dataset: DatasetDTO) -> Dataset:

data_rentgen/db/repositories/job.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ async def get_stats_by_location_ids(self, location_ids: Collection[int]) -> dict
123123
return {row.location_id: row for row in query_result.all()}
124124

125125
async def _get(self, job: JobDTO) -> Job | None:
126-
statement = select(Job).where(Job.location_id == job.location.id, Job.name == job.name)
126+
statement = select(Job).where(
127+
Job.location_id == job.location.id,
128+
func.lower(Job.name) == job.name.lower(),
129+
)
127130
return await self._session.scalar(statement)
128131

129132
async def _create(self, job: JobDTO) -> Job:

0 commit comments

Comments
 (0)