Skip to content

Commit b9d33ff

Browse files
authored
[DOP-22750] Add owner parsing for airflow run events (#135)
* [DOP-22750] Add owner parsing for airflow run events * [DOP-22750] Update tests * [DOP-22750] add unit tests for 'airflow' owner * [DOP-22750] make owner optional, add test for airflow dag/task without owner * [DOP-22750] update extractor condition
1 parent 4731716 commit b9d33ff

File tree

6 files changed

+299
-8
lines changed

6 files changed

+299
-8
lines changed

data_rentgen/consumer/extractors/run.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,26 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
182182
spark_application_details = event.run.facets.spark_applicationDetails
183183
if spark_application_details:
184184
run.user = UserDTO(name=spark_application_details.userName)
185+
185186
# Airflow DAG and task have 'owner' field, but if can be either user or group name,
186187
# and also it does not mean that this exact user started this run.
188+
# Airflow using different facets for version above provider-opelineage/1.11.0.
189+
airflow_application_details = event.run.facets.airflow
190+
if airflow_application_details and all(
191+
(
192+
airflow_application_details.dag.owner is not None,
193+
airflow_application_details.dag.owner != "airflow",
194+
),
195+
):
196+
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]
197+
198+
airflow_application_dag_details = event.run.facets.airflowDagRun
199+
if airflow_application_dag_details and all(
200+
(
201+
airflow_application_dag_details.dag.owner is not None,
202+
airflow_application_dag_details.dag.owner != "airflow",
203+
),
204+
):
205+
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]
206+
187207
return run

data_rentgen/consumer/openlineage/run_facets/airflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
1414
"""
1515

1616
dag_id: str
17+
owner: str | None = None
1718

1819

1920
class OpenLineageAirflowDagRunType(Enum):

tests/test_consumer/test_extractors/test_extractors_batch_airflow.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
RunDTO,
3737
RunStartReasonDTO,
3838
RunStatusDTO,
39+
UserDTO,
3940
)
4041

4142

@@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
6667
openlineageAdapterVersion=Version("1.10.0"),
6768
),
6869
airflowDagRun=OpenLineageAirflowDagRunFacet(
69-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
70+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
7071
dagRun=OpenLineageAirflowDagRunInfo(
7172
run_id="manual__2024-07-05T09:04:13:979349+00:00",
7273
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
137138
openlineageAdapterVersion=Version("1.10.0"),
138139
),
139140
airflow=OpenLineageAirflowTaskRunFacet(
140-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
141+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
141142
dagRun=OpenLineageAirflowDagRunInfo(
142143
run_id="manual__2024-07-05T09:04:13:979349+00:00",
143144
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -231,6 +232,10 @@ def extracted_airflow_dag_run(
231232
status=RunStatusDTO.SUCCEEDED,
232233
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
233234
start_reason=RunStartReasonDTO.MANUAL,
235+
user=UserDTO(
236+
name="myuser",
237+
id=None,
238+
),
234239
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
235240
external_id="manual__2024-07-05T09:04:13:979349+00:00",
236241
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
@@ -247,6 +252,10 @@ def extracted_airflow_task_run(
247252
status=RunStatusDTO.SUCCEEDED,
248253
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
249254
start_reason=RunStartReasonDTO.MANUAL,
255+
user=UserDTO(
256+
name="myuser",
257+
id=None,
258+
),
250259
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
251260
external_id="manual__2024-07-05T09:04:13:979349+00:00",
252261
attempt="1",
@@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow(
300309

301310
assert not extracted.datasets()
302311
assert not extracted.dataset_symlinks()
303-
assert not extracted.users()
304312
assert not extracted.schemas()
305313
assert not extracted.operations()
306314
assert not extracted.inputs()

tests/test_consumer/test_extractors/test_extractors_run.py

Lines changed: 258 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus():
174174
openlineageAdapterVersion=Version("1.10.0"),
175175
),
176176
airflowDagRun=OpenLineageAirflowDagRunFacet(
177-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
177+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
178178
dagRun=OpenLineageAirflowDagRunInfo(
179179
run_id="manual__2024-07-05T09:04:13:979349+00:00",
180180
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x():
237237
openlineageAdapterVersion=Version("1.10.0"),
238238
),
239239
airflowDagRun=OpenLineageAirflowDagRunFacet(
240-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
240+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
241241
dagRun=OpenLineageAirflowDagRunInfo(
242242
run_id="manual__2024-07-05T09:04:13:979349+00:00",
243243
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url():
300300
openlineageAdapterVersion=Version("1.10.0"),
301301
),
302302
airflow=OpenLineageAirflowTaskRunFacet(
303-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
303+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
304304
dagRun=OpenLineageAirflowDagRunInfo(
305305
run_id="manual__2024-07-05T09:04:13:979349+00:00",
306306
run_type=OpenLineageAirflowDagRunType.MANUAL,
@@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
372372
openlineageAdapterVersion=Version("1.9.0"),
373373
),
374374
airflow=OpenLineageAirflowTaskRunFacet(
375-
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
375+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
376376
dagRun=OpenLineageAirflowDagRunInfo(
377377
run_id="backfill__2024-07-05T09:04:13:979349+00:00",
378378
run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB,
@@ -418,6 +418,260 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
418418
def test_extractors_extract_run_airflow_task_2_x():
419419
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
420420
run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610")
421+
run = OpenLineageRunEvent(
422+
eventType=OpenLineageRunEventType.COMPLETE,
423+
eventTime=now,
424+
job=OpenLineageJob(
425+
namespace="http://airflow-host:8081",
426+
name="mydag.mytask",
427+
facets=OpenLineageJobFacets(
428+
jobType=OpenLineageJobTypeJobFacet(
429+
processingType=None,
430+
integration=OpenLineageJobIntegrationType.AIRFLOW,
431+
jobType=OpenLineageJobType.TASK,
432+
),
433+
),
434+
),
435+
run=OpenLineageRun(
436+
runId=run_id,
437+
facets=OpenLineageRunFacets(
438+
airflow=OpenLineageAirflowTaskRunFacet(
439+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
440+
dagRun=OpenLineageAirflowDagRunInfo(
441+
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
442+
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
443+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
444+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
445+
),
446+
task=OpenLineageAirflowTaskInfo(
447+
task_id="mytask",
448+
),
449+
taskInstance=OpenLineageAirflowTaskInstanceInfo(
450+
try_number=1,
451+
),
452+
),
453+
),
454+
),
455+
)
456+
457+
assert extract_run(run) == RunDTO(
458+
id=run_id,
459+
job=JobDTO(
460+
name="mydag.mytask",
461+
location=LocationDTO(
462+
type="http",
463+
name="airflow-host:8081",
464+
addresses={"http://airflow-host:8081"},
465+
),
466+
type=JobTypeDTO.AIRFLOW_TASK,
467+
),
468+
status=RunStatusDTO.SUCCEEDED,
469+
started_at=None,
470+
start_reason=RunStartReasonDTO.AUTOMATIC,
471+
user=None,
472+
ended_at=now,
473+
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
474+
attempt="1",
475+
persistent_log_url=(
476+
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
477+
),
478+
running_log_url=None,
479+
)
480+
481+
482+
def test_extractors_extract_run_airflow_dag_check_with_owner():
483+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
484+
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
485+
run = OpenLineageRunEvent(
486+
eventType=OpenLineageRunEventType.COMPLETE,
487+
eventTime=now,
488+
job=OpenLineageJob(
489+
namespace="http://airflow-host:8081",
490+
name="mydag",
491+
facets=OpenLineageJobFacets(
492+
jobType=OpenLineageJobTypeJobFacet(
493+
processingType=None,
494+
integration=OpenLineageJobIntegrationType.AIRFLOW,
495+
jobType=OpenLineageJobType.DAG,
496+
),
497+
),
498+
),
499+
run=OpenLineageRun(
500+
runId=run_id,
501+
facets=OpenLineageRunFacets(
502+
processing_engine=OpenLineageProcessingEngineRunFacet(
503+
version=Version("2.1.4"),
504+
name=OpenLineageProcessingEngineName.AIRFLOW,
505+
openlineageAdapterVersion=Version("1.10.0"),
506+
),
507+
airflowDagRun=OpenLineageAirflowDagRunFacet(
508+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
509+
dagRun=OpenLineageAirflowDagRunInfo(
510+
run_id="manual__2024-07-05T09:04:13:979349+00:00",
511+
run_type=OpenLineageAirflowDagRunType.MANUAL,
512+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
513+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
514+
),
515+
),
516+
),
517+
),
518+
)
519+
520+
assert extract_run(run) == RunDTO(
521+
id=run_id,
522+
job=JobDTO(
523+
name="mydag",
524+
location=LocationDTO(
525+
type="http",
526+
name="airflow-host:8081",
527+
addresses={"http://airflow-host:8081"},
528+
),
529+
type=JobTypeDTO.AIRFLOW_DAG,
530+
),
531+
status=RunStatusDTO.SUCCEEDED,
532+
started_at=None,
533+
start_reason=RunStartReasonDTO.MANUAL,
534+
user=UserDTO(name="myuser"),
535+
ended_at=now,
536+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
537+
attempt=None,
538+
persistent_log_url=(
539+
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
540+
),
541+
running_log_url=None,
542+
)
543+
544+
545+
def test_extractors_extract_run_airflow_task_with_owner():
546+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
547+
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
548+
run = OpenLineageRunEvent(
549+
eventType=OpenLineageRunEventType.COMPLETE,
550+
eventTime=now,
551+
job=OpenLineageJob(
552+
namespace="http://airflow-host:8081",
553+
name="mydag.mytask",
554+
facets=OpenLineageJobFacets(
555+
jobType=OpenLineageJobTypeJobFacet(
556+
processingType=None,
557+
integration=OpenLineageJobIntegrationType.AIRFLOW,
558+
jobType=OpenLineageJobType.TASK,
559+
),
560+
),
561+
),
562+
run=OpenLineageRun(
563+
runId=run_id,
564+
facets=OpenLineageRunFacets(
565+
airflow=OpenLineageAirflowTaskRunFacet(
566+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
567+
dagRun=OpenLineageAirflowDagRunInfo(
568+
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
569+
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
570+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
571+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
572+
),
573+
task=OpenLineageAirflowTaskInfo(
574+
task_id="mytask",
575+
),
576+
taskInstance=OpenLineageAirflowTaskInstanceInfo(
577+
try_number=1,
578+
),
579+
),
580+
),
581+
),
582+
)
583+
584+
assert extract_run(run) == RunDTO(
585+
id=run_id,
586+
job=JobDTO(
587+
name="mydag.mytask",
588+
location=LocationDTO(
589+
type="http",
590+
name="airflow-host:8081",
591+
addresses={"http://airflow-host:8081"},
592+
),
593+
type=JobTypeDTO.AIRFLOW_TASK,
594+
),
595+
status=RunStatusDTO.SUCCEEDED,
596+
started_at=None,
597+
start_reason=RunStartReasonDTO.AUTOMATIC,
598+
user=UserDTO(name="myuser"),
599+
ended_at=now,
600+
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
601+
attempt="1",
602+
persistent_log_url=(
603+
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
604+
),
605+
running_log_url=None,
606+
)
607+
608+
609+
def test_extractors_extract_run_airflow_dag_without_owner():
610+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
611+
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
612+
run = OpenLineageRunEvent(
613+
eventType=OpenLineageRunEventType.COMPLETE,
614+
eventTime=now,
615+
job=OpenLineageJob(
616+
namespace="http://airflow-host:8081",
617+
name="mydag",
618+
facets=OpenLineageJobFacets(
619+
jobType=OpenLineageJobTypeJobFacet(
620+
processingType=None,
621+
integration=OpenLineageJobIntegrationType.AIRFLOW,
622+
jobType=OpenLineageJobType.DAG,
623+
),
624+
),
625+
),
626+
run=OpenLineageRun(
627+
runId=run_id,
628+
facets=OpenLineageRunFacets(
629+
processing_engine=OpenLineageProcessingEngineRunFacet(
630+
version=Version("2.1.4"),
631+
name=OpenLineageProcessingEngineName.AIRFLOW,
632+
openlineageAdapterVersion=Version("1.10.0"),
633+
),
634+
airflowDagRun=OpenLineageAirflowDagRunFacet(
635+
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
636+
dagRun=OpenLineageAirflowDagRunInfo(
637+
run_id="manual__2024-07-05T09:04:13:979349+00:00",
638+
run_type=OpenLineageAirflowDagRunType.MANUAL,
639+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
640+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
641+
),
642+
),
643+
),
644+
),
645+
)
646+
647+
assert extract_run(run) == RunDTO(
648+
id=run_id,
649+
job=JobDTO(
650+
name="mydag",
651+
location=LocationDTO(
652+
type="http",
653+
name="airflow-host:8081",
654+
addresses={"http://airflow-host:8081"},
655+
),
656+
type=JobTypeDTO.AIRFLOW_DAG,
657+
),
658+
status=RunStatusDTO.SUCCEEDED,
659+
started_at=None,
660+
start_reason=RunStartReasonDTO.MANUAL,
661+
user=None,
662+
ended_at=now,
663+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
664+
attempt=None,
665+
persistent_log_url=(
666+
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
667+
),
668+
running_log_url=None,
669+
)
670+
671+
672+
def test_extractors_extract_run_airflow_task_without_owner():
673+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
674+
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
421675
run = OpenLineageRunEvent(
422676
eventType=OpenLineageRunEventType.COMPLETE,
423677
eventTime=now,

0 commit comments

Comments
 (0)