Skip to content

Commit 87c3ee3

Browse files
committed
[DOP-22750] make owner optional, add test for airflow dag/task without owner
1 parent 1e548cc commit 87c3ee3

File tree

4 files changed

+132
-178
lines changed

4 files changed

+132
-178
lines changed

data_rentgen/consumer/extractors/run.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,11 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
188188
# Airflow using different facets for version above provider-opelineage/1.11.0.
189189
airflow_application_details = event.run.facets.airflow
190190
if airflow_application_details and airflow_application_details.dag.owner != "airflow":
191-
run.user = UserDTO(name=airflow_application_details.dag.owner)
191+
if airflow_application_details.dag.owner is not None:
192+
run.user = UserDTO(name=airflow_application_details.dag.owner)
192193
airflow_application_dag_details = event.run.facets.airflowDagRun
193194
if airflow_application_dag_details and airflow_application_dag_details.dag.owner != "airflow":
194-
run.user = UserDTO(name=airflow_application_dag_details.dag.owner)
195+
if airflow_application_dag_details.dag.owner is not None:
196+
run.user = UserDTO(name=airflow_application_dag_details.dag.owner)
195197

196198
return run

data_rentgen/consumer/openlineage/run_facets/airflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
1414
"""
1515

1616
dag_id: str
17-
owner: str
17+
owner: str | None = None
1818

1919

2020
class OpenLineageAirflowDagRunType(Enum):

tests/test_consumer/test_extractors/test_extractors_batch_airflow.py

Lines changed: 0 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -80,46 +80,6 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
8080
)
8181

8282

83-
@pytest.fixture
84-
def airflow_dag_run_event_start_owner_airflow() -> OpenLineageRunEvent:
85-
event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
86-
run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839")
87-
return OpenLineageRunEvent(
88-
eventType=OpenLineageRunEventType.START,
89-
eventTime=event_time,
90-
job=OpenLineageJob(
91-
namespace="http://airflow-host:8081",
92-
name="mydag",
93-
facets=OpenLineageJobFacets(
94-
jobType=OpenLineageJobTypeJobFacet(
95-
processingType=None,
96-
integration=OpenLineageJobIntegrationType.AIRFLOW,
97-
jobType=OpenLineageJobType.DAG,
98-
),
99-
),
100-
),
101-
run=OpenLineageRun(
102-
runId=run_id,
103-
facets=OpenLineageRunFacets(
104-
processing_engine=OpenLineageProcessingEngineRunFacet(
105-
version=Version("2.9.2"),
106-
name=OpenLineageProcessingEngineName.AIRFLOW,
107-
openlineageAdapterVersion=Version("1.10.0"),
108-
),
109-
airflowDagRun=OpenLineageAirflowDagRunFacet(
110-
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
111-
dagRun=OpenLineageAirflowDagRunInfo(
112-
run_id="manual__2024-07-05T09:04:13:979349+00:00",
113-
run_type=OpenLineageAirflowDagRunType.MANUAL,
114-
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
115-
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
116-
),
117-
),
118-
),
119-
),
120-
)
121-
122-
12383
@pytest.fixture
12484
def airflow_dag_run_event_stop() -> OpenLineageRunEvent:
12585
event_time = datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc)
@@ -200,55 +160,6 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
200160
)
201161

202162

203-
@pytest.fixture
204-
def airflow_task_run_event_start_owner_airflow() -> OpenLineageRunEvent:
205-
event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
206-
run_id = UUID("01908223-0782-7fc0-9d69-b1df9dac2c60")
207-
return OpenLineageRunEvent(
208-
eventType=OpenLineageRunEventType.START,
209-
eventTime=event_time,
210-
job=OpenLineageJob(
211-
namespace="http://airflow-host:8081",
212-
name="mydag.mytask",
213-
facets=OpenLineageJobFacets(
214-
jobType=OpenLineageJobTypeJobFacet(
215-
processingType=None,
216-
integration=OpenLineageJobIntegrationType.AIRFLOW,
217-
jobType=OpenLineageJobType.TASK,
218-
),
219-
),
220-
),
221-
run=OpenLineageRun(
222-
runId=run_id,
223-
facets=OpenLineageRunFacets(
224-
processing_engine=OpenLineageProcessingEngineRunFacet(
225-
version=Version("2.9.2"),
226-
name=OpenLineageProcessingEngineName.AIRFLOW,
227-
openlineageAdapterVersion=Version("1.10.0"),
228-
),
229-
airflow=OpenLineageAirflowTaskRunFacet(
230-
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
231-
dagRun=OpenLineageAirflowDagRunInfo(
232-
run_id="manual__2024-07-05T09:04:13:979349+00:00",
233-
run_type=OpenLineageAirflowDagRunType.MANUAL,
234-
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
235-
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
236-
),
237-
task=OpenLineageAirflowTaskInfo(
238-
task_id="mytask",
239-
),
240-
taskInstance=OpenLineageAirflowTaskInstanceInfo(
241-
try_number=1,
242-
log_url=(
243-
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
244-
),
245-
),
246-
),
247-
),
248-
),
249-
)
250-
251-
252163
@pytest.fixture
253164
def airflow_task_run_event_stop() -> OpenLineageRunEvent:
254165
event_time = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
@@ -311,41 +222,6 @@ def extracted_airflow_task_job(
311222
)
312223

313224

314-
@pytest.fixture
315-
def extracted_airflow_dag_run_owner_airflow(
316-
extracted_airflow_dag_job: JobDTO,
317-
) -> RunDTO:
318-
return RunDTO(
319-
id=UUID("01908223-0782-79b8-9495-b1c38aaee839"),
320-
job=extracted_airflow_dag_job,
321-
status=RunStatusDTO.SUCCEEDED,
322-
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
323-
start_reason=RunStartReasonDTO.MANUAL,
324-
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
325-
external_id="manual__2024-07-05T09:04:13:979349+00:00",
326-
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
327-
)
328-
329-
330-
@pytest.fixture
331-
def extracted_airflow_task_run_owner_airflow(
332-
extracted_airflow_task_job: JobDTO,
333-
) -> RunDTO:
334-
return RunDTO(
335-
id=UUID("01908223-0782-7fc0-9d69-b1df9dac2c60"),
336-
job=extracted_airflow_task_job,
337-
status=RunStatusDTO.SUCCEEDED,
338-
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
339-
start_reason=RunStartReasonDTO.MANUAL,
340-
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
341-
external_id="manual__2024-07-05T09:04:13:979349+00:00",
342-
attempt="1",
343-
persistent_log_url=(
344-
"http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask"
345-
),
346-
)
347-
348-
349225
@pytest.fixture
350226
def extracted_airflow_dag_run(
351227
extracted_airflow_dag_job: JobDTO,
@@ -437,54 +313,3 @@ def test_extractors_extract_batch_airflow(
437313
assert not extracted.operations()
438314
assert not extracted.inputs()
439315
assert not extracted.outputs()
440-
441-
442-
@pytest.mark.parametrize(
443-
"input_transformation",
444-
[
445-
# receiving data out of order does not change result
446-
pytest.param(
447-
list,
448-
id="preserve order",
449-
),
450-
pytest.param(
451-
reversed,
452-
id="reverse order",
453-
),
454-
],
455-
)
456-
def test_extractors_extract_batch_airflow_with_airflow_owner(
457-
airflow_dag_run_event_start_owner_airflow: OpenLineageRunEvent,
458-
airflow_dag_run_event_stop: OpenLineageRunEvent,
459-
airflow_task_run_event_start_owner_airflow: OpenLineageRunEvent,
460-
airflow_task_run_event_stop: OpenLineageRunEvent,
461-
extracted_airflow_location: LocationDTO,
462-
extracted_airflow_dag_job: JobDTO,
463-
extracted_airflow_task_job: JobDTO,
464-
extracted_airflow_dag_run_owner_airflow: RunDTO,
465-
extracted_airflow_task_run_owner_airflow: RunDTO,
466-
input_transformation,
467-
):
468-
events = [
469-
airflow_dag_run_event_start_owner_airflow,
470-
airflow_task_run_event_start_owner_airflow,
471-
airflow_task_run_event_stop,
472-
airflow_dag_run_event_stop,
473-
]
474-
475-
extracted = extract_batch(input_transformation(events))
476-
477-
assert extracted.locations() == [extracted_airflow_location]
478-
assert extracted.jobs() == [extracted_airflow_dag_job, extracted_airflow_task_job]
479-
assert extracted.runs() == [
480-
extracted_airflow_dag_run_owner_airflow,
481-
extracted_airflow_task_run_owner_airflow,
482-
]
483-
484-
assert not extracted.datasets()
485-
assert not extracted.dataset_symlinks()
486-
assert not extracted.schemas()
487-
assert not extracted.users()
488-
assert not extracted.operations()
489-
assert not extracted.inputs()
490-
assert not extracted.outputs()

tests/test_consumer/test_extractors/test_extractors_run.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,133 @@ def test_extractors_extract_run_airflow_task_2_x():
479479
)
480480

481481

482+
def test_extractors_extract_run_airflow_dag_without_owner():
483+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
484+
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
485+
run = OpenLineageRunEvent(
486+
eventType=OpenLineageRunEventType.COMPLETE,
487+
eventTime=now,
488+
job=OpenLineageJob(
489+
namespace="http://airflow-host:8081",
490+
name="mydag",
491+
facets=OpenLineageJobFacets(
492+
jobType=OpenLineageJobTypeJobFacet(
493+
processingType=None,
494+
integration=OpenLineageJobIntegrationType.AIRFLOW,
495+
jobType=OpenLineageJobType.DAG,
496+
),
497+
),
498+
),
499+
run=OpenLineageRun(
500+
runId=run_id,
501+
facets=OpenLineageRunFacets(
502+
processing_engine=OpenLineageProcessingEngineRunFacet(
503+
version=Version("2.1.4"),
504+
name=OpenLineageProcessingEngineName.AIRFLOW,
505+
openlineageAdapterVersion=Version("1.10.0"),
506+
),
507+
airflowDagRun=OpenLineageAirflowDagRunFacet(
508+
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
509+
dagRun=OpenLineageAirflowDagRunInfo(
510+
run_id="manual__2024-07-05T09:04:13:979349+00:00",
511+
run_type=OpenLineageAirflowDagRunType.MANUAL,
512+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
513+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
514+
),
515+
),
516+
),
517+
),
518+
)
519+
520+
assert extract_run(run) == RunDTO(
521+
id=run_id,
522+
job=JobDTO(
523+
name="mydag",
524+
location=LocationDTO(
525+
type="http",
526+
name="airflow-host:8081",
527+
addresses={"http://airflow-host:8081"},
528+
),
529+
type=JobTypeDTO.AIRFLOW_DAG,
530+
),
531+
status=RunStatusDTO.SUCCEEDED,
532+
started_at=None,
533+
start_reason=RunStartReasonDTO.MANUAL,
534+
user=None,
535+
ended_at=now,
536+
external_id="manual__2024-07-05T09:04:13:979349+00:00",
537+
attempt=None,
538+
persistent_log_url=(
539+
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
540+
),
541+
running_log_url=None,
542+
)
543+
544+
545+
def test_extractors_extract_run_airflow_task_without_owner():
546+
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
547+
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
548+
run = OpenLineageRunEvent(
549+
eventType=OpenLineageRunEventType.COMPLETE,
550+
eventTime=now,
551+
job=OpenLineageJob(
552+
namespace="http://airflow-host:8081",
553+
name="mydag.mytask",
554+
facets=OpenLineageJobFacets(
555+
jobType=OpenLineageJobTypeJobFacet(
556+
processingType=None,
557+
integration=OpenLineageJobIntegrationType.AIRFLOW,
558+
jobType=OpenLineageJobType.TASK,
559+
),
560+
),
561+
),
562+
run=OpenLineageRun(
563+
runId=run_id,
564+
facets=OpenLineageRunFacets(
565+
airflow=OpenLineageAirflowTaskRunFacet(
566+
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
567+
dagRun=OpenLineageAirflowDagRunInfo(
568+
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
569+
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
570+
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
571+
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
572+
),
573+
task=OpenLineageAirflowTaskInfo(
574+
task_id="mytask",
575+
),
576+
taskInstance=OpenLineageAirflowTaskInstanceInfo(
577+
try_number=1,
578+
),
579+
),
580+
),
581+
),
582+
)
583+
584+
assert extract_run(run) == RunDTO(
585+
id=run_id,
586+
job=JobDTO(
587+
name="mydag.mytask",
588+
location=LocationDTO(
589+
type="http",
590+
name="airflow-host:8081",
591+
addresses={"http://airflow-host:8081"},
592+
),
593+
type=JobTypeDTO.AIRFLOW_TASK,
594+
),
595+
status=RunStatusDTO.SUCCEEDED,
596+
started_at=None,
597+
start_reason=RunStartReasonDTO.AUTOMATIC,
598+
user=None,
599+
ended_at=now,
600+
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
601+
attempt="1",
602+
persistent_log_url=(
603+
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
604+
),
605+
running_log_url=None,
606+
)
607+
608+
482609
@pytest.mark.parametrize(
483610
["event_type", "expected_status"],
484611
[

0 commit comments

Comments
 (0)