@@ -80,6 +80,46 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
8080 )
8181
8282
83+ @pytest .fixture
84+ def airflow_dag_run_event_start_owner_airflow () -> OpenLineageRunEvent :
85+ event_time = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
86+ run_id = UUID ("01908223-0782-79b8-9495-b1c38aaee839" )
87+ return OpenLineageRunEvent (
88+ eventType = OpenLineageRunEventType .START ,
89+ eventTime = event_time ,
90+ job = OpenLineageJob (
91+ namespace = "http://airflow-host:8081" ,
92+ name = "mydag" ,
93+ facets = OpenLineageJobFacets (
94+ jobType = OpenLineageJobTypeJobFacet (
95+ processingType = None ,
96+ integration = OpenLineageJobIntegrationType .AIRFLOW ,
97+ jobType = OpenLineageJobType .DAG ,
98+ ),
99+ ),
100+ ),
101+ run = OpenLineageRun (
102+ runId = run_id ,
103+ facets = OpenLineageRunFacets (
104+ processing_engine = OpenLineageProcessingEngineRunFacet (
105+ version = Version ("2.9.2" ),
106+ name = OpenLineageProcessingEngineName .AIRFLOW ,
107+ openlineageAdapterVersion = Version ("1.10.0" ),
108+ ),
109+ airflowDagRun = OpenLineageAirflowDagRunFacet (
110+ dag = OpenLineageAirflowDagInfo (dag_id = "mydag" , owner = "airflow" ),
111+ dagRun = OpenLineageAirflowDagRunInfo (
112+ run_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
113+ run_type = OpenLineageAirflowDagRunType .MANUAL ,
114+ data_interval_start = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
115+ data_interval_end = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
116+ ),
117+ ),
118+ ),
119+ ),
120+ )
121+
122+
83123@pytest .fixture
84124def airflow_dag_run_event_stop () -> OpenLineageRunEvent :
85125 event_time = datetime (2024 , 7 , 5 , 9 , 8 , 5 , 691973 , tzinfo = timezone .utc )
@@ -160,6 +200,55 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
160200 )
161201
162202
203+ @pytest .fixture
204+ def airflow_task_run_event_start_owner_airflow () -> OpenLineageRunEvent :
205+ event_time = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
206+ run_id = UUID ("01908223-0782-7fc0-9d69-b1df9dac2c60" )
207+ return OpenLineageRunEvent (
208+ eventType = OpenLineageRunEventType .START ,
209+ eventTime = event_time ,
210+ job = OpenLineageJob (
211+ namespace = "http://airflow-host:8081" ,
212+ name = "mydag.mytask" ,
213+ facets = OpenLineageJobFacets (
214+ jobType = OpenLineageJobTypeJobFacet (
215+ processingType = None ,
216+ integration = OpenLineageJobIntegrationType .AIRFLOW ,
217+ jobType = OpenLineageJobType .TASK ,
218+ ),
219+ ),
220+ ),
221+ run = OpenLineageRun (
222+ runId = run_id ,
223+ facets = OpenLineageRunFacets (
224+ processing_engine = OpenLineageProcessingEngineRunFacet (
225+ version = Version ("2.9.2" ),
226+ name = OpenLineageProcessingEngineName .AIRFLOW ,
227+ openlineageAdapterVersion = Version ("1.10.0" ),
228+ ),
229+ airflow = OpenLineageAirflowTaskRunFacet (
230+ dag = OpenLineageAirflowDagInfo (dag_id = "mydag" , owner = "airflow" ),
231+ dagRun = OpenLineageAirflowDagRunInfo (
232+ run_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
233+ run_type = OpenLineageAirflowDagRunType .MANUAL ,
234+ data_interval_start = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
235+ data_interval_end = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
236+ ),
237+ task = OpenLineageAirflowTaskInfo (
238+ task_id = "mytask" ,
239+ ),
240+ taskInstance = OpenLineageAirflowTaskInstanceInfo (
241+ try_number = 1 ,
242+ log_url = (
243+ "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
244+ ),
245+ ),
246+ ),
247+ ),
248+ ),
249+ )
250+
251+
163252@pytest .fixture
164253def airflow_task_run_event_stop () -> OpenLineageRunEvent :
165254 event_time = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc )
@@ -222,6 +311,41 @@ def extracted_airflow_task_job(
222311 )
223312
224313
314+ @pytest .fixture
315+ def extracted_airflow_dag_run_owner_airflow (
316+ extracted_airflow_dag_job : JobDTO ,
317+ ) -> RunDTO :
318+ return RunDTO (
319+ id = UUID ("01908223-0782-79b8-9495-b1c38aaee839" ),
320+ job = extracted_airflow_dag_job ,
321+ status = RunStatusDTO .SUCCEEDED ,
322+ started_at = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
323+ start_reason = RunStartReasonDTO .MANUAL ,
324+ ended_at = datetime (2024 , 7 , 5 , 9 , 8 , 5 , 691973 , tzinfo = timezone .utc ),
325+ external_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
326+ persistent_log_url = "http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00" ,
327+ )
328+
329+
330+ @pytest .fixture
331+ def extracted_airflow_task_run_owner_airflow (
332+ extracted_airflow_task_job : JobDTO ,
333+ ) -> RunDTO :
334+ return RunDTO (
335+ id = UUID ("01908223-0782-7fc0-9d69-b1df9dac2c60" ),
336+ job = extracted_airflow_task_job ,
337+ status = RunStatusDTO .SUCCEEDED ,
338+ started_at = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
339+ start_reason = RunStartReasonDTO .MANUAL ,
340+ ended_at = datetime (2024 , 7 , 5 , 9 , 4 , 13 , 979349 , tzinfo = timezone .utc ),
341+ external_id = "manual__2024-07-05T09:04:13:979349+00:00" ,
342+ attempt = "1" ,
343+ persistent_log_url = (
344+ "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
345+ ),
346+ )
347+
348+
225349@pytest .fixture
226350def extracted_airflow_dag_run (
227351 extracted_airflow_dag_job : JobDTO ,
@@ -313,3 +437,54 @@ def test_extractors_extract_batch_airflow(
313437 assert not extracted .operations ()
314438 assert not extracted .inputs ()
315439 assert not extracted .outputs ()
440+
441+
442+ @pytest .mark .parametrize (
443+ "input_transformation" ,
444+ [
445+ # receiving data out of order does not change result
446+ pytest .param (
447+ list ,
448+ id = "preserve order" ,
449+ ),
450+ pytest .param (
451+ reversed ,
452+ id = "reverse order" ,
453+ ),
454+ ],
455+ )
456+ def test_extractors_extract_batch_airflow_with_airflow_owner (
457+ airflow_dag_run_event_start_owner_airflow : OpenLineageRunEvent ,
458+ airflow_dag_run_event_stop : OpenLineageRunEvent ,
459+ airflow_task_run_event_start_owner_airflow : OpenLineageRunEvent ,
460+ airflow_task_run_event_stop : OpenLineageRunEvent ,
461+ extracted_airflow_location : LocationDTO ,
462+ extracted_airflow_dag_job : JobDTO ,
463+ extracted_airflow_task_job : JobDTO ,
464+ extracted_airflow_dag_run_owner_airflow : RunDTO ,
465+ extracted_airflow_task_run_owner_airflow : RunDTO ,
466+ input_transformation ,
467+ ):
468+ events = [
469+ airflow_dag_run_event_start_owner_airflow ,
470+ airflow_task_run_event_start_owner_airflow ,
471+ airflow_task_run_event_stop ,
472+ airflow_dag_run_event_stop ,
473+ ]
474+
475+ extracted = extract_batch (input_transformation (events ))
476+
477+ assert extracted .locations () == [extracted_airflow_location ]
478+ assert extracted .jobs () == [extracted_airflow_dag_job , extracted_airflow_task_job ]
479+ assert extracted .runs () == [
480+ extracted_airflow_dag_run_owner_airflow ,
481+ extracted_airflow_task_run_owner_airflow ,
482+ ]
483+
484+ assert not extracted .datasets ()
485+ assert not extracted .dataset_symlinks ()
486+ assert not extracted .schemas ()
487+ assert not extracted .users ()
488+ assert not extracted .operations ()
489+ assert not extracted .inputs ()
490+ assert not extracted .outputs ()
0 commit comments