Skip to content

Commit 19f2162

Browse files
committed
[DOP-22750] Add owner parsing for airflow run events
1 parent 4731716 commit 19f2162

File tree

6 files changed

+24
-8
lines changed

6 files changed

+24
-8
lines changed

data_rentgen/consumer/extractors/run.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,15 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
182182
spark_application_details = event.run.facets.spark_applicationDetails
183183
if spark_application_details:
184184
run.user = UserDTO(name=spark_application_details.userName)
185+
185186
# Airflow DAG and task have 'owner' field, but if can be either user or group name,
186187
# and also it does not mean that this exact user started this run.
188+
# Airflow using different facets for version above provider-opelineage/1.11.0.
189+
airflow_application_details = event.run.facets.airflow
190+
if airflow_application_details and airflow_application_details.dag.owner != "airflow":
191+
run.user = UserDTO(name=airflow_application_details.dag.owner)
192+
airflow_application_dag_details = event.run.facets.airflowDagRun
193+
if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow":
194+
run.user = UserDTO(name=airflow_application_dag_details.dag.owner)
195+
187196
return run

data_rentgen/consumer/openlineage/run_facets/airflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
1414
"""
1515

1616
dag_id: str
17+
owner: str
1718

1819

1920
class OpenLineageAirflowDagRunType(Enum):

tests/test_consumer/test_extractors/test_extractors_batch_airflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
6666
openlineageAdapterVersion=Version("1.10.0"),
6767
),
6868
airflowDagRun=OpenLineageAirflowDagRunFacet(
69-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
69+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
7070
dagRun=OpenLineageAirflowDagRunInfo(
7171
run_id="manual__2024-07-05T09:04:13:979349+00:00",
7272
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -137,7 +137,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
137137
openlineageAdapterVersion=Version("1.10.0"),
138138
),
139139
airflow=OpenLineageAirflowTaskRunFacet(
140-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
140+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
141141
dagRun=OpenLineageAirflowDagRunInfo(
142142
run_id="manual__2024-07-05T09:04:13:979349+00:00",
143143
run_type=OpenLineageAirflowDagRunType.MANUAL,

tests/test_consumer/test_extractors/test_extractors_run.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus():
174174
openlineageAdapterVersion=Version("1.10.0"),
175175
),
176176
airflowDagRun=OpenLineageAirflowDagRunFacet(
177-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
177+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
178178
dagRun=OpenLineageAirflowDagRunInfo(
179179
run_id="manual__2024-07-05T09:04:13:979349+00:00",
180180
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x():
237237
openlineageAdapterVersion=Version("1.10.0"),
238238
),
239239
airflowDagRun=OpenLineageAirflowDagRunFacet(
240-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
240+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
241241
dagRun=OpenLineageAirflowDagRunInfo(
242242
run_id="manual__2024-07-05T09:04:13:979349+00:00",
243243
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url():
300300
openlineageAdapterVersion=Version("1.10.0"),
301301
),
302302
airflow=OpenLineageAirflowTaskRunFacet(
303-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
303+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
304304
dagRun=OpenLineageAirflowDagRunInfo(
305305
run_id="manual__2024-07-05T09:04:13:979349+00:00",
306306
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
372372
openlineageAdapterVersion=Version("1.9.0"),
373373
),
374374
airflow=OpenLineageAirflowTaskRunFacet(
375-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
375+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
376376
dagRun=OpenLineageAirflowDagRunInfo(
377377
run_id="backfill__2024-07-05T09:04:13:979349+00:00",
378378
run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB,
@@ -436,7 +436,7 @@ def test_extractors_extract_run_airflow_task_2_x():
436436
runId=run_id,
437437
facets=OpenLineageRunFacets(
438438
airflow=OpenLineageAirflowTaskRunFacet(
439-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
439+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
440440
dagRun=OpenLineageAirflowDagRunInfo(
441441
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
442442
run_type=OpenLineageAirflowDagRunType.SCHEDULED,

tests/test_consumer/test_handlers/test_runs_handler_airflow.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Run,
1818
RunStartReason,
1919
RunStatus,
20+
User,
2021
)
2122

2223
RESOURCES_PATH = Path(__file__).parent.parent.joinpath("resources").resolve()
@@ -97,7 +98,10 @@ async def test_runs_handler_airflow(
9798
assert task_run.status == RunStatus.SUCCEEDED
9899
assert task_run.started_at == datetime(2024, 7, 5, 9, 4, 20, 783845, tzinfo=timezone.utc)
99100
assert task_run.ended_at == datetime(2024, 7, 5, 9, 7, 37, 858423, tzinfo=timezone.utc)
100-
assert task_run.started_by_user_id is None
101+
user_query = select(User).where(User.name == "myuser")
102+
user_scalars = await async_session.scalars(user_query)
103+
user = user_scalars.one_or_none()
104+
assert task_run.started_by_user_id == user.id
101105
assert task_run.start_reason == RunStartReason.MANUAL
102106
assert task_run.end_reason is None
103107
assert task_run.external_id == "manual__2024-07-05T09:04:12.162809+00:00"

tests/test_consumer/test_openlineage/test_run_event_airflow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def test_run_event_airflow_dag_start():
150150
airflowDagRun=OpenLineageAirflowDagRunFacet(
151151
dag=OpenLineageAirflowDagInfo(
152152
dag_id="mydag",
153+
owner="myuser",
153154
),
154155
dagRun=OpenLineageAirflowDagRunInfo(
155156
run_id="manual__2024-07-05T09:04:12.162809+00:00",
@@ -400,6 +401,7 @@ def test_run_event_airflow_task_start():
400401
airflow=OpenLineageAirflowTaskRunFacet(
401402
dag=OpenLineageAirflowDagInfo(
402403
dag_id="mydag",
404+
owner="myuser",
403405
),
404406
dagRun=OpenLineageAirflowDagRunInfo(
405407
run_id="manual__2024-07-05T09:04:12.162809+00:00",

0 commit comments

Comments
 (0)