Skip to content

Commit 97340df

Browse files
committed
[DOP-31810] Rename run.expected_%_time fields to expected_%_at
1 parent 192c045 commit 97340df

File tree

13 files changed

+68
-40
lines changed

13 files changed

+68
-40
lines changed

data_rentgen/consumer/extractors/generic/run.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,9 @@ def _enrich_nominal_times(self, run: RunDTO, event: OpenLineageRunEvent) -> RunD
140140
if not event.run.facets.nominalTime:
141141
return run
142142

143-
run.nominal_start_time = event.run.facets.nominalTime.nominalStartTime
144-
run.nominal_end_time = event.run.facets.nominalTime.nominalEndTime
145-
if run.nominal_start_time == run.nominal_end_time:
146-
run.nominal_end_time = None
143+
run.expected_start_at = event.run.facets.nominalTime.nominalStartTime
144+
run.expected_end_at = event.run.facets.nominalTime.nominalEndTime
145+
if run.expected_start_at == run.expected_end_at:
146+
run.expected_end_at = None
147147

148148
return run
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# SPDX-FileCopyrightText: 2024-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Rename run.expected_%_time columns
4+
5+
Revision ID: 0e9bb788b04b
6+
Revises: b6a12e72a04e
7+
Create Date: 2026-02-03 11:10:05.902453
8+
9+
"""
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "0e9bb788b04b"
16+
down_revision = "b6a12e72a04e"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
op.execute(sa.text("ALTER TABLE run RENAME COLUMN expected_start_time TO expected_start_at"))
23+
op.execute(sa.text("ALTER TABLE run RENAME COLUMN expected_end_time TO expected_end_at"))
24+
25+
26+
def downgrade() -> None:
27+
op.execute(sa.text("ALTER TABLE run RENAME COLUMN expected_start_at TO expected_start_time"))
28+
op.execute(sa.text("ALTER TABLE run RENAME COLUMN expected_end_at TO expected_end_time"))

data_rentgen/db/models/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,12 @@ class Run(Base):
152152
nullable=True,
153153
doc="End reason of the run, e.g. exception string",
154154
)
155-
expected_start_time: Mapped[datetime] = mapped_column(
155+
expected_start_at: Mapped[datetime] = mapped_column(
156156
DateTime(timezone=True),
157157
nullable=True,
158158
doc="Timestamp representing the nominal start time (included) of the run. AKA the schedule time",
159159
)
160-
expected_end_time: Mapped[datetime] = mapped_column(
160+
expected_end_at: Mapped[datetime] = mapped_column(
161161
DateTime(timezone=True),
162162
nullable=True,
163163
doc=(

data_rentgen/db/repositories/run.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@
6161
started_by_user_id=bindparam("started_by_user_id"),
6262
start_reason=bindparam("start_reason"),
6363
ended_at=bindparam("ended_at"),
64-
expected_start_time=bindparam("expected_start_time"),
65-
expected_end_time=bindparam("expected_end_time"),
64+
expected_start_at=bindparam("expected_start_at"),
65+
expected_end_at=bindparam("expected_end_at"),
6666
)
6767
inserted_row = insert_statement.excluded
6868
insert_statement = insert_statement.on_conflict_do_update(
@@ -79,8 +79,8 @@
7979
"attempt": func.coalesce(inserted_row.attempt, Run.attempt),
8080
"persistent_log_url": func.coalesce(inserted_row.persistent_log_url, Run.persistent_log_url),
8181
"running_log_url": func.coalesce(inserted_row.running_log_url, Run.running_log_url),
82-
"expected_start_time": func.coalesce(inserted_row.expected_start_time, Run.expected_start_time),
83-
"expected_end_time": func.coalesce(inserted_row.expected_end_time, Run.expected_end_time),
82+
"expected_start_at": func.coalesce(inserted_row.expected_start_at, Run.expected_start_at),
83+
"expected_end_at": func.coalesce(inserted_row.expected_end_at, Run.expected_end_at),
8484
},
8585
)
8686

@@ -275,8 +275,8 @@ async def create(self, run: RunDTO) -> Run:
275275
attempt=run.attempt,
276276
persistent_log_url=run.persistent_log_url,
277277
running_log_url=run.running_log_url,
278-
expected_start_time=run.nominal_start_time,
279-
expected_end_time=run.nominal_end_time,
278+
expected_start_at=run.expected_start_at,
279+
expected_end_at=run.expected_end_at,
280280
)
281281
self._session.add(result)
282282
await self._session.flush([result])
@@ -304,8 +304,8 @@ async def update(
304304
"attempt": new.attempt,
305305
"persistent_log_url": new.persistent_log_url,
306306
"running_log_url": new.running_log_url,
307-
"expected_start_time": new.nominal_start_time,
308-
"expected_end_time": new.nominal_end_time,
307+
"expected_start_at": new.expected_start_at,
308+
"expected_end_at": new.expected_end_at,
309309
}
310310
for col_name, value in optional_fields.items():
311311
if value is not None:
@@ -335,8 +335,8 @@ async def create_or_update_bulk(self, runs: list[RunDTO]) -> None:
335335
"attempt": run.attempt,
336336
"persistent_log_url": run.persistent_log_url,
337337
"running_log_url": run.running_log_url,
338-
"expected_start_time": run.nominal_start_time,
339-
"expected_end_time": run.nominal_end_time,
338+
"expected_start_at": run.expected_start_at,
339+
"expected_end_at": run.expected_end_at,
340340
}
341341
for run in runs
342342
],

data_rentgen/db/scripts/seed/airflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ def generate_airflow_run(dag_id: str, task_id: str, created_at: datetime, ended_
5959
persistent_log_url=dag_ui,
6060
status=RunStatusDTO.SUCCEEDED,
6161
start_reason=RunStartReasonDTO.AUTOMATIC,
62-
nominal_start_time=created_at.replace(minute=(created_at.minute // 5) * 5, second=0, microsecond=0),
63-
nominal_end_time=ended_at.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1),
62+
expected_start_at=created_at.replace(minute=(created_at.minute // 5) * 5, second=0, microsecond=0),
63+
expected_end_at=ended_at.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1),
6464
)
6565

6666
task_job = JobDTO(

data_rentgen/dto/run.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ class RunDTO:
4444
attempt: str | None = None
4545
running_log_url: str | None = None
4646
persistent_log_url: str | None = None
47-
nominal_start_time: datetime | None = None
48-
nominal_end_time: datetime | None = None
47+
expected_start_at: datetime | None = None
48+
expected_end_at: datetime | None = None
4949

5050
def __post_init__(self):
5151
self.created_at = extract_timestamp_from_uuid(self.id)
@@ -74,6 +74,6 @@ def merge(self, new: RunDTO) -> RunDTO:
7474
self.attempt = new.attempt or self.attempt
7575
self.running_log_url = new.running_log_url or self.running_log_url
7676
self.persistent_log_url = new.persistent_log_url or self.persistent_log_url
77-
self.nominal_start_time = new.nominal_start_time or self.nominal_start_time
78-
self.nominal_end_time = new.nominal_end_time or self.nominal_end_time
77+
self.expected_start_at = new.expected_start_at or self.expected_start_at
78+
self.expected_end_at = new.expected_end_at or self.expected_end_at
7979
return self

data_rentgen/server/schemas/v1/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ class RunResponseV1(BaseModel):
8181
start_reason: str | None = Field(description="Start reason of the Run", default=None)
8282
ended_at: datetime | None = Field(description="End time of the Run", default=None)
8383
end_reason: str | None = Field(description="End reason of the Run", default=None)
84-
expected_start_time: datetime | None = Field(description="Run expected start time", default=None)
85-
expected_end_time: datetime | None = Field(description="Run expected end time", default=None)
84+
expected_start_at: datetime | None = Field(description="Run expected start time", default=None)
85+
expected_end_at: datetime | None = Field(description="Run expected end time", default=None)
8686

8787
model_config = ConfigDict(from_attributes=True)
8888

docs/entities/index.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ It contains following fields:
223223
- ``started_at: timestamp | None`` - timestamp when OpenLineage event with ``eventType=START`` was received.
224224
- ``started_by user: User | None`` - Spark session started as specific OS user/Kerberos principal.
225225
- ``start_reason: Enum | None`` - "why this Run was started?":
226-
- ``expected_start_time: timestamp | None`` - scheduled/expected start time of the run, which may differ from ``started_at`` (actual start time).
227-
- ``expected_end_time: timestamp | None`` - scheduled/expected end time of the run, which may differ from ``ended_at`` (actual end time).
226+
- ``expected_start_at: timestamp | None`` - scheduled/expected start time of the run, which may differ from ``started_at`` (actual start time).
227+
- ``expected_end_at: timestamp | None`` - scheduled/expected end time of the run, which may differ from ``ended_at`` (actual end time).
228228

229229
- ``MANUAL``
230230
- ``AUTOMATIC`` - e.g. by schedule or triggered by another run.

docs/reference/database/structure.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ Database structure
6969
start_reason: varchar(32)
7070
ended_at: timestamptz null
7171
end_reason: text null
72-
expected_start_time: timestamptz null
73-
expected_end_time: timestamptz null
72+
expected_start_at: timestamptz null
73+
expected_end_at: timestamptz null
7474
external_id: text null
7575
attempt: varchar(64) null
7676
persistent_log_url: timestamptz null

tests/test_consumer/test_extractors/test_extractors_run_airflow.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,7 +1374,7 @@ def test_extractors_extract_run_airflow_task_tags_below_2_10(tags: list[str] | s
13741374

13751375

13761376
@pytest.mark.parametrize(
1377-
["start_time", "end_time", "expected_start_time", "expected_end_time"],
1377+
["start_time", "end_time", "expected_start_at", "expected_end_at"],
13781378
[
13791379
(
13801380
datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
@@ -1399,8 +1399,8 @@ def test_extractors_extract_run_airflow_task_tags_below_2_10(tags: list[str] | s
13991399
def test_extractors_extract_run_airflow_task_nominal_times(
14001400
start_time: datetime | None,
14011401
end_time: datetime | None,
1402-
expected_start_time: datetime | None,
1403-
expected_end_time: datetime | None,
1402+
expected_start_at: datetime | None,
1403+
expected_end_at: datetime | None,
14041404
):
14051405
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
14061406
run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839")
@@ -1482,6 +1482,6 @@ def test_extractors_extract_run_airflow_task_nominal_times(
14821482
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=scheduled__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask&map_index=-1"
14831483
),
14841484
running_log_url=None,
1485-
nominal_start_time=expected_start_time,
1486-
nominal_end_time=expected_end_time,
1485+
expected_start_at=expected_start_at,
1486+
expected_end_at=expected_end_at,
14871487
)

0 commit comments

Comments
 (0)