Skip to content

Commit 015f819

Browse files
committed
[DOP-22750] update extractor condition
1 parent 87c3ee3 commit 015f819

File tree

2 files changed

+142
-6
lines changed

2 files changed

+142
-6
lines changed

data_rentgen/consumer/extractors/run.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,21 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
187187
# and also it does not mean that this exact user started this run.
188188
# Airflow using different facets for version above provider-opelineage/1.11.0.
189189
airflow_application_details = event.run.facets.airflow
190-
if airflow_application_details and airflow_application_details.dag.owner != "airflow":
191-
if airflow_application_details.dag.owner is not None:
192-
run.user = UserDTO(name=airflow_application_details.dag.owner)
190+
if airflow_application_details and all(
191+
(
192+
airflow_application_details.dag.owner is not None,
193+
airflow_application_details.dag.owner != "airflow",
194+
),
195+
):
196+
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]
197+
193198
airflow_application_dag_details = event.run.facets.airflowDagRun
194-
if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow":
195-
if airflow_application_dag_details.dag.owner is not None:
196-
run.user = UserDTO(name=airflow_application_dag_details.dag.owner)
199+
if airflow_application_dag_details and all(
200+
(
201+
airflow_application_dag_details.dag.owner is not None,
202+
airflow_application_dag_details.dag.owner != "airflow",
203+
),
204+
):
205+
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]
197206

198207
return run

tests/test_consumer/test_extractors/test_extractors_run.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x():
479479
)
480480

481481

482+
def test_extractors_extract_run_airflow_dag_check_with_owner():
483+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
484+
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
485+
run = OpenLineageRunEvent(
486+
eventType=OpenLineageRunEventType.COMPLETE,
487+
eventTime=now,
488+
job=OpenLineageJob(
489+
namespace="http://airflow-host:8081",
490+
name="mydag",
491+
facets=OpenLineageJobFacets(
492+
jobType=OpenLineageJobTypeJobFacet(
493+
processingType=None,
494+
integration=OpenLineageJobIntegrationType.AIRFLOW,
495+
jobType=OpenLineageJobType.DAG,
496+
),
497+
),
498+
),
499+
run=OpenLineageRun(
500+
runId=run_id,
501+
facets=OpenLineageRunFacets(
502+
processing_engine=OpenLineageProcessingEngineRunFacet(
503+
version=Version("2.1.4"),
504+
name=OpenLineageProcessingEngineName.AIRFLOW,
505+
openlineageAdapterVersion=Version("1.10.0"),
506+
),
507+
airflowDagRun=OpenLineageAirflowDagRunFacet(
508+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
509+
dagRun=OpenLineageAirflowDagRunInfo(
510+
run_id="manual__2024-07-05T09:04:13:979349+00:00",
511+
run_type=OpenLineageAirflowDagRunType.MANUAL,
512+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
513+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
514+
),
515+
),
516+
),
517+
),
518+
)
519+
520+
assert extract_run(run) == RunDTO(
521+
id=run_id,
522+
job=JobDTO(
523+
name="mydag",
524+
location=LocationDTO(
525+
type="http",
526+
name="airflow-host:8081",
527+
addresses={"http://airflow-host:8081"},
528+
),
529+
type=JobTypeDTO.AIRFLOW_DAG,
530+
),
531+
status=RunStatusDTO.SUCCEEDED,
532+
started_at=None,
533+
start_reason=RunStartReasonDTO.MANUAL,
534+
user=UserDTO(name="myuser"),
535+
ended_at=now,
536+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
537+
attempt=None,
538+
persistent_log_url=(
539+
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
540+
),
541+
running_log_url=None,
542+
)
543+
544+
545+
def test_extractors_extract_run_airflow_task_with_owner():
546+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
547+
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
548+
run = OpenLineageRunEvent(
549+
eventType=OpenLineageRunEventType.COMPLETE,
550+
eventTime=now,
551+
job=OpenLineageJob(
552+
namespace="http://airflow-host:8081",
553+
name="mydag.mytask",
554+
facets=OpenLineageJobFacets(
555+
jobType=OpenLineageJobTypeJobFacet(
556+
processingType=None,
557+
integration=OpenLineageJobIntegrationType.AIRFLOW,
558+
jobType=OpenLineageJobType.TASK,
559+
),
560+
),
561+
),
562+
run=OpenLineageRun(
563+
runId=run_id,
564+
facets=OpenLineageRunFacets(
565+
airflow=OpenLineageAirflowTaskRunFacet(
566+
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
567+
dagRun=OpenLineageAirflowDagRunInfo(
568+
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
569+
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
570+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
571+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
572+
),
573+
task=OpenLineageAirflowTaskInfo(
574+
task_id="mytask",
575+
),
576+
taskInstance=OpenLineageAirflowTaskInstanceInfo(
577+
try_number=1,
578+
),
579+
),
580+
),
581+
),
582+
)
583+
584+
assert extract_run(run) == RunDTO(
585+
id=run_id,
586+
job=JobDTO(
587+
name="mydag.mytask",
588+
location=LocationDTO(
589+
type="http",
590+
name="airflow-host:8081",
591+
addresses={"http://airflow-host:8081"},
592+
),
593+
type=JobTypeDTO.AIRFLOW_TASK,
594+
),
595+
status=RunStatusDTO.SUCCEEDED,
596+
started_at=None,
597+
start_reason=RunStartReasonDTO.AUTOMATIC,
598+
user=UserDTO(name="myuser"),
599+
ended_at=now,
600+
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
601+
attempt="1",
602+
persistent_log_url=(
603+
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
604+
),
605+
running_log_url=None,
606+
)
607+
608+
482609
def test_extractors_extract_run_airflow_dag_without_owner():
483610
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
484611
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")

0 commit comments

Comments
 (0)