diff --git a/data_rentgen/consumer/extractors/run.py b/data_rentgen/consumer/extractors/run.py index cd76ed6d..a61e8fe7 100644 --- a/data_rentgen/consumer/extractors/run.py +++ b/data_rentgen/consumer/extractors/run.py @@ -182,6 +182,26 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: spark_application_details = event.run.facets.spark_applicationDetails if spark_application_details: run.user = UserDTO(name=spark_application_details.userName) + # Airflow DAG and task have 'owner' field, but if can be either user or group name, # and also it does not mean that this exact user started this run. + # Airflow using different facets for version above provider-opelineage/1.11.0. + airflow_application_details = event.run.facets.airflow + if airflow_application_details and all( + ( + airflow_application_details.dag.owner is not None, + airflow_application_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type] + + airflow_application_dag_details = event.run.facets.airflowDagRun + if airflow_application_dag_details and all( + ( + airflow_application_dag_details.dag.owner is not None, + airflow_application_dag_details.dag.owner != "airflow", + ), + ): + run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type] + return run diff --git a/data_rentgen/consumer/openlineage/run_facets/airflow.py b/data_rentgen/consumer/openlineage/run_facets/airflow.py index d56405b3..79860aa2 100644 --- a/data_rentgen/consumer/openlineage/run_facets/airflow.py +++ b/data_rentgen/consumer/openlineage/run_facets/airflow.py @@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase): """ dag_id: str + owner: str | None = None class OpenLineageAirflowDagRunType(Enum): diff --git a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py index aa9d1434..bcaa8d9c 100644 --- a/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_batch_airflow.py @@ -36,6 +36,7 @@ RunDTO, RunStartReasonDTO, RunStatusDTO, + UserDTO, ) @@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent: openlineageAdapterVersion=Version("1.10.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -231,6 +232,10 @@ def extracted_airflow_dag_run( status=RunStatusDTO.SUCCEEDED, started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO( + name="myuser", + id=None, + ), ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc), external_id="manual__2024-07-05T09:04:13:979349+00:00", persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00", @@ -247,6 +252,10 @@ def extracted_airflow_task_run( status=RunStatusDTO.SUCCEEDED, started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO( + name="myuser", + id=None, + ), ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), external_id="manual__2024-07-05T09:04:13:979349+00:00", attempt="1", @@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow( assert not extracted.datasets() assert not extracted.dataset_symlinks() - assert not extracted.users() assert not extracted.schemas() assert not extracted.operations() assert not extracted.inputs() diff --git a/tests/test_consumer/test_extractors/test_extractors_run.py b/tests/test_consumer/test_extractors/test_extractors_run.py index 136637bc..4cd922a0 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run.py +++ b/tests/test_consumer/test_extractors/test_extractors_run.py @@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus(): openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x(): openlineageAdapterVersion=Version("1.10.0"), ), airflowDagRun=OpenLineageAirflowDagRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url(): openlineageAdapterVersion=Version("1.10.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.MANUAL, @@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus(): openlineageAdapterVersion=Version("1.9.0"), ), airflow=OpenLineageAirflowTaskRunFacet( - dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), dagRun=OpenLineageAirflowDagRunInfo( run_id="backfill__2024-07-05T09:04:13:979349+00:00", run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB, @@ -418,6 +418,260 @@ def test_extractors_extract_run_airflow_task_2_9_plus(): def test_extractors_extract_run_airflow_task_2_x(): now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_TASK, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_dag_check_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.1.4"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_DAG, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.MANUAL, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt=None, + persistent_log_url=( + "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_task_with_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.TASK, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_TASK, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=UserDTO(name="myuser"), + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_dag_without_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=None, + integration=OpenLineageJobIntegrationType.AIRFLOW, + jobType=OpenLineageJobType.DAG, + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.1.4"), + name=OpenLineageProcessingEngineName.AIRFLOW, + openlineageAdapterVersion=Version("1.10.0"), + ), + airflowDagRun=OpenLineageAirflowDagRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="manual__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.MANUAL, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + ), + ), + ), + ) + + assert extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO.AIRFLOW_DAG, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.MANUAL, + user=None, + ended_at=now, + external_id="manual__2024-07-05T09:04:13:979349+00:00", + attempt=None, + persistent_log_url=( + "http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00" + ), + running_log_url=None, + ) + + +def test_extractors_extract_run_airflow_task_without_owner(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91") run = OpenLineageRunEvent( eventType=OpenLineageRunEventType.COMPLETE, eventTime=now, diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py index 10bd9bca..1a9215e8 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py @@ -17,6 +17,7 @@ Run, RunStartReason, RunStatus, + User, ) RESOURCES_PATH = Path(__file__).parent.parent.joinpath("resources").resolve() @@ -76,6 +77,10 @@ async def test_runs_handler_airflow( runs = run_scalars.all() assert len(runs) == 2 + user_query = select(User).where(User.name == "myuser") + user_scalars = await async_session.scalars(user_query) + user = user_scalars.one_or_none() + dag_run = runs[0] assert dag_run.id == UUID("01908223-0782-79b8-9495-b1c38aaee839") assert dag_run.created_at == datetime(2024, 7, 5, 9, 4, 12, 162000, tzinfo=timezone.utc) @@ -97,7 +102,8 @@ async def test_runs_handler_airflow( assert task_run.status == RunStatus.SUCCEEDED assert task_run.started_at == datetime(2024, 7, 5, 9, 4, 20, 783845, tzinfo=timezone.utc) assert task_run.ended_at == datetime(2024, 7, 5, 9, 7, 37, 858423, tzinfo=timezone.utc) - assert task_run.started_by_user_id is None + assert task_run.started_by_user_id is not None + assert task_run.started_by_user_id == user.id assert task_run.start_reason == RunStartReason.MANUAL assert task_run.end_reason is None assert task_run.external_id == "manual__2024-07-05T09:04:12.162809+00:00" diff --git a/tests/test_consumer/test_openlineage/test_run_event_airflow.py b/tests/test_consumer/test_openlineage/test_run_event_airflow.py index 70723263..ee2ec84a 100644 --- a/tests/test_consumer/test_openlineage/test_run_event_airflow.py +++ b/tests/test_consumer/test_openlineage/test_run_event_airflow.py @@ -150,6 +150,7 @@ def test_run_event_airflow_dag_start(): airflowDagRun=OpenLineageAirflowDagRunFacet( dag=OpenLineageAirflowDagInfo( dag_id="mydag", + owner="myuser", ), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:12.162809+00:00", @@ -400,6 +401,7 @@ def test_run_event_airflow_task_start(): airflow=OpenLineageAirflowTaskRunFacet( dag=OpenLineageAirflowDagInfo( dag_id="mydag", + owner="myuser", ), dagRun=OpenLineageAirflowDagRunInfo( run_id="manual__2024-07-05T09:04:12.162809+00:00",