@@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x():
479479 )
480480
481481
482+ def test_extractors_extract_run_airflow_dag_check_with_owner ():
483+ now = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
484+ run_id = UUID ("1efc1e4c-04e5-6cc0-b991-358ae6c316c8" )
485+ run = OpenLineageRunEvent (
486+ eventType = OpenLineageRunEventType .COMPLETE ,
487+ eventTime = now ,
488+ job = OpenLineageJob (
489+ namespace = "http://airflow-host:8081" ,
490+ name = "mydag" ,
491+ facets = OpenLineageJobFacets (
492+ jobType = OpenLineageJobTypeJobFacet (
493+ processingType = None ,
494+ integration = OpenLineageJobIntegrationType .AIRFLOW ,
495+ jobType = OpenLineageJobType .DAG ,
496+ ),
497+ ),
498+ ),
499+ run = OpenLineageRun (
500+ runId = run_id ,
501+ facets = OpenLineageRunFacets (
502+ processing_engine = OpenLineageProcessingEngineRunFacet (
503+ version = Version ("2.1.4" ),
504+ name = OpenLineageProcessingEngineName .AIRFLOW ,
505+ openlineageAdapterVersion = Version ("1.10.0" ),
506+ ),
507+ airflowDagRun = OpenLineageAirflowDagRunFacet (
508+ dag = OpenLineageAirflowDagInfo (dag_id = "mydag" , owner = "myuser" ),
509+ dagRun = OpenLineageAirflowDagRunInfo (
510+ run_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
511+ run_type = OpenLineageAirflowDagRunType .MANUAL ,
512+ data_interval_start = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
513+ data_interval_end = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
514+ ),
515+ ),
516+ ),
517+ ),
518+ )
519+
520+ assert extract_run (run ) == RunDTO (
521+ id = run_id ,
522+ job = JobDTO (
523+ name = "mydag" ,
524+ location = LocationDTO (
525+ type = "http" ,
526+ name = "airflow-host:8081" ,
527+ addresses = {"http://airflow-host:8081" },
528+ ),
529+ type = JobTypeDTO .AIRFLOW_DAG ,
530+ ),
531+ status = RunStatusDTO .SUCCEEDED ,
532+ started_at = None ,
533+ start_reason = RunStartReasonDTO .MANUAL ,
534+ user = UserDTO (name = "myuser" ),
535+ ended_at = now ,
536+ external_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
537+ attempt = None ,
538+ persistent_log_url = (
539+ "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
540+ ),
541+ running_log_url = None ,
542+ )
543+
544+
545+ def test_extractors_extract_run_airflow_task_with_owner ():
546+ now = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
547+ run_id = UUID ("1efc1e7f-4015-6970-b4f9-12e828cb9b91" )
548+ run = OpenLineageRunEvent (
549+ eventType = OpenLineageRunEventType .COMPLETE ,
550+ eventTime = now ,
551+ job = OpenLineageJob (
552+ namespace = "http://airflow-host:8081" ,
553+ name = "mydag.mytask" ,
554+ facets = OpenLineageJobFacets (
555+ jobType = OpenLineageJobTypeJobFacet (
556+ processingType = None ,
557+ integration = OpenLineageJobIntegrationType .AIRFLOW ,
558+ jobType = OpenLineageJobType .TASK ,
559+ ),
560+ ),
561+ ),
562+ run = OpenLineageRun (
563+ runId = run_id ,
564+ facets = OpenLineageRunFacets (
565+ airflow = OpenLineageAirflowTaskRunFacet (
566+ dag = OpenLineageAirflowDagInfo (dag_id = "mydag" , owner = "myuser" ),
567+ dagRun = OpenLineageAirflowDagRunInfo (
568+ run_id = "scheduled__2024-07-05T09:04:13:979349+00:00" ,
569+ run_type = OpenLineageAirflowDagRunType .SCHEDULED ,
570+ data_interval_start = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
571+ data_interval_end = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
572+ ),
573+ task = OpenLineageAirflowTaskInfo (
574+ task_id = "mytask" ,
575+ ),
576+ taskInstance = OpenLineageAirflowTaskInstanceInfo (
577+ try_number = 1 ,
578+ ),
579+ ),
580+ ),
581+ ),
582+ )
583+
584+ assert extract_run (run ) == RunDTO (
585+ id = run_id ,
586+ job = JobDTO (
587+ name = "mydag.mytask" ,
588+ location = LocationDTO (
589+ type = "http" ,
590+ name = "airflow-host:8081" ,
591+ addresses = {"http://airflow-host:8081" },
592+ ),
593+ type = JobTypeDTO .AIRFLOW_TASK ,
594+ ),
595+ status = RunStatusDTO .SUCCEEDED ,
596+ started_at = None ,
597+ start_reason = RunStartReasonDTO .AUTOMATIC ,
598+ user = UserDTO (name = "myuser" ),
599+ ended_at = now ,
600+ external_id = "scheduled__2024-07-05T09:04:13:979349+00:00" ,
601+ attempt = "1" ,
602+ persistent_log_url = (
603+ "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
604+ ),
605+ running_log_url = None ,
606+ )
607+
608+
482609def test_extractors_extract_run_airflow_dag_without_owner ():
483610 now = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
484611 run_id = UUID ("1efc1e4c-04e5-6cc0-b991-358ae6c316c8" )
0 commit comments