Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions data_rentgen/consumer/extractors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ def enrich_run_user(run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
spark_application_details = event.run.facets.spark_applicationDetails
if spark_application_details:
run.user = UserDTO(name=spark_application_details.userName)

# Airflow DAG and task have 'owner' field, but if can be either user or group name,
# and also it does not mean that this exact user started this run.
# Airflow using different facets for version above provider-opelineage/1.11.0.
airflow_application_details = event.run.facets.airflow
if airflow_application_details and all(
(
airflow_application_details.dag.owner is not None,
airflow_application_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_details.dag.owner) # type: ignore[arg-type]

airflow_application_dag_details = event.run.facets.airflowDagRun
if airflow_application_dag_details and all(
(
airflow_application_dag_details.dag.owner is not None,
airflow_application_dag_details.dag.owner != "airflow",
),
):
run.user = UserDTO(name=airflow_application_dag_details.dag.owner) # type: ignore[arg-type]

return run
1 change: 1 addition & 0 deletions data_rentgen/consumer/openlineage/run_facets/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class OpenLineageAirflowDagInfo(OpenLineageBase):
"""

dag_id: str
owner: str | None = None


class OpenLineageAirflowDagRunType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RunDTO,
RunStartReasonDTO,
RunStatusDTO,
UserDTO,
)


Expand Down Expand Up @@ -66,7 +67,7 @@ def airflow_dag_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -137,7 +138,7 @@ def airflow_task_run_event_start() -> OpenLineageRunEvent:
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -231,6 +232,10 @@ def extracted_airflow_dag_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 8, 5, 691973, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
persistent_log_url="http://airflow-host:8081/dags/mydag/grid?dag_run_id=manual__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00",
Expand All @@ -247,6 +252,10 @@ def extracted_airflow_task_run(
status=RunStatusDTO.SUCCEEDED,
started_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(
name="myuser",
id=None,
),
ended_at=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt="1",
Expand Down Expand Up @@ -300,7 +309,6 @@ def test_extractors_extract_batch_airflow(

assert not extracted.datasets()
assert not extracted.dataset_symlinks()
assert not extracted.users()
assert not extracted.schemas()
assert not extracted.operations()
assert not extracted.inputs()
Expand Down
262 changes: 258 additions & 4 deletions tests/test_consumer/test_extractors/test_extractors_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_extractors_extract_run_airflow_dag_2_3_plus():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_extractors_extract_run_airflow_dag_2_x():
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_extractors_extract_run_airflow_task_with_ti_persistent_log_url():
openlineageAdapterVersion=Version("1.10.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
openlineageAdapterVersion=Version("1.9.0"),
),
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="backfill__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.BACKFILL_JOB,
Expand Down Expand Up @@ -418,6 +418,260 @@ def test_extractors_extract_run_airflow_task_2_9_plus():
def test_extractors_extract_run_airflow_task_2_x():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("01908223-0e9b-7c52-9856-6cecfc842610")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=None,
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_check_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_with_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag.mytask",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.TASK,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
airflow=OpenLineageAirflowTaskRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="myuser"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.SCHEDULED,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
task=OpenLineageAirflowTaskInfo(
task_id="mytask",
),
taskInstance=OpenLineageAirflowTaskInstanceInfo(
try_number=1,
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag.mytask",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_TASK,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.AUTOMATIC,
user=UserDTO(name="myuser"),
ended_at=now,
external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
attempt="1",
persistent_log_url=(
"http://airflow-host:8081/log?&dag_id=mydag&task_id=mytask&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_dag_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e4c-04e5-6cc0-b991-358ae6c316c8")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
job=OpenLineageJob(
namespace="http://airflow-host:8081",
name="mydag",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
processingType=None,
integration=OpenLineageJobIntegrationType.AIRFLOW,
jobType=OpenLineageJobType.DAG,
),
),
),
run=OpenLineageRun(
runId=run_id,
facets=OpenLineageRunFacets(
processing_engine=OpenLineageProcessingEngineRunFacet(
version=Version("2.1.4"),
name=OpenLineageProcessingEngineName.AIRFLOW,
openlineageAdapterVersion=Version("1.10.0"),
),
airflowDagRun=OpenLineageAirflowDagRunFacet(
dag=OpenLineageAirflowDagInfo(dag_id="mydag"),
dagRun=OpenLineageAirflowDagRunInfo(
run_id="manual__2024-07-05T09:04:13:979349+00:00",
run_type=OpenLineageAirflowDagRunType.MANUAL,
data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
),
),
),
),
)

assert extract_run(run) == RunDTO(
id=run_id,
job=JobDTO(
name="mydag",
location=LocationDTO(
type="http",
name="airflow-host:8081",
addresses={"http://airflow-host:8081"},
),
type=JobTypeDTO.AIRFLOW_DAG,
),
status=RunStatusDTO.SUCCEEDED,
started_at=None,
start_reason=RunStartReasonDTO.MANUAL,
user=None,
ended_at=now,
external_id="manual__2024-07-05T09:04:13:979349+00:00",
attempt=None,
persistent_log_url=(
"http://airflow-host:8081/graph?dag_id=mydag&execution_date=2024-07-05T09%3A04%3A13.979349%2B00%3A00"
),
running_log_url=None,
)


def test_extractors_extract_run_airflow_task_without_owner():
now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
run_id = UUID("1efc1e7f-4015-6970-b4f9-12e828cb9b91")
run = OpenLineageRunEvent(
eventType=OpenLineageRunEventType.COMPLETE,
eventTime=now,
Expand Down
Loading
Loading