Skip to content

Commit b2c646a

Browse files
authored
feat: Adjust DBT OpenLineage to Airflow 3 and improve logging (apache#47500)
1 parent 1e2660b commit b2c646a

File tree

4 files changed

+95
-34
lines changed

4 files changed

+95
-34
lines changed

providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,13 @@ def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
284284
"""
285285
from airflow.providers.openlineage.extractors import OperatorLineage
286286

287-
if isinstance(self.run_id, int) and self.wait_for_termination is True:
288-
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
289-
return OperatorLineage()
287+
if not isinstance(self.run_id, int):
288+
self.log.info("Skipping OpenLineage event extraction: `self.run_id` is not set.")
289+
return OperatorLineage()
290+
if not self.wait_for_termination:
291+
self.log.info("Skipping OpenLineage event extraction: `self.wait_for_termination` is False.")
292+
return OperatorLineage()
293+
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
290294

291295

292296
class DbtCloudGetJobRunArtifactOperator(BaseOperator):

providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,41 @@
1717
from __future__ import annotations
1818

1919
import asyncio
20+
import logging
2021
import re
21-
from contextlib import suppress
2222
from typing import TYPE_CHECKING
2323

24-
from packaging.version import parse
25-
26-
from airflow import __version__ as airflow_version
24+
from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS
2725

2826
if TYPE_CHECKING:
29-
from packaging.version import Version
30-
3127
from airflow.models.taskinstance import TaskInstance
3228
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
3329
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
3430
from airflow.providers.openlineage.extractors.base import OperatorLineage
3531

3632

37-
_AIRFLOW_VERSION: Version = parse(parse(airflow_version).base_version)
33+
log = logging.getLogger(__name__)
3834

3935

4036
def _get_logical_date(task_instance):
4137
# todo: remove when min airflow version >= 3.0
42-
if parse("3") > _AIRFLOW_VERSION:
43-
return task_instance.execution_date
44-
return task_instance.logical_date
38+
if AIRFLOW_V_3_0_PLUS:
39+
dagrun = task_instance.get_template_context()["dag_run"]
40+
return dagrun.logical_date or dagrun.run_after
41+
42+
if hasattr(task_instance, "logical_date"):
43+
date = task_instance.logical_date
44+
else:
45+
date = task_instance.execution_date
46+
47+
return date
4548

4649

4750
def _get_try_number(val):
4851
# todo: remove when min airflow version >= 2.10.0
49-
if parse("2.10.0") > _AIRFLOW_VERSION:
50-
return val.try_number - 1
51-
else:
52+
if AIRFLOW_V_2_10_PLUS:
5253
return val.try_number
54+
return val.try_number - 1
5355

5456

5557
def generate_openlineage_events_from_dbt_cloud_run(
@@ -87,6 +89,7 @@ def generate_openlineage_events_from_dbt_cloud_run(
8789
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
8890

8991
# if no account_id set this will fallback
92+
log.debug("Retrieving information about DBT job run.")
9093
job_run = operator.hook.get_job_run(
9194
run_id=operator.run_id, account_id=operator.account_id, include_related=["run_steps,job"]
9295
).json()["data"]
@@ -98,6 +101,7 @@ def generate_openlineage_events_from_dbt_cloud_run(
98101
execute_steps = job["execute_steps"]
99102
run_steps = job_run["run_steps"]
100103

104+
log.debug("Filtering only DBT invocation steps for further processing.")
101105
# filter only dbt invocation steps
102106
steps = []
103107
for run_step in run_steps:
@@ -110,8 +114,15 @@ def generate_openlineage_events_from_dbt_cloud_run(
110114

111115
# catalog is available only if docs are generated
112116
catalog = None
113-
with suppress(Exception):
117+
try:
118+
log.debug("Retrieving information about catalog artifact from DBT.")
114119
catalog = operator.hook.get_job_run_artifact(operator.run_id, path="catalog.json").json()["data"]
120+
except Exception: # type: ignore
121+
log.info(
122+
"Openlineage could not find DBT catalog artifact, usually available when docs are generated."
123+
"Proceeding with metadata extraction. "
124+
"If you see error logs above about `HTTP error: Not Found` it's safe to ignore them."
125+
)
115126

116127
async def get_artifacts_for_steps(steps, artifacts):
117128
"""Get artifacts for a list of steps concurrently."""
@@ -127,16 +138,37 @@ async def get_artifacts_for_steps(steps, artifacts):
127138
return await asyncio.gather(*tasks)
128139

129140
# get artifacts for steps concurrently
141+
log.debug("Retrieving information about artifacts for all job steps from DBT.")
130142
step_artifacts = asyncio.run(
131143
get_artifacts_for_steps(steps=steps, artifacts=["manifest.json", "run_results.json"])
132144
)
133145

146+
log.debug("Preparing OpenLineage parent job information to be included in DBT events.")
147+
# generate same run id of current task instance
148+
parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
149+
dag_id=task_instance.dag_id,
150+
task_id=operator.task_id,
151+
logical_date=_get_logical_date(task_instance),
152+
try_number=_get_try_number(task_instance),
153+
map_index=task_instance.map_index,
154+
)
155+
156+
parent_job = ParentRunMetadata(
157+
run_id=parent_run_id,
158+
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
159+
job_namespace=namespace(),
160+
)
161+
client = get_openlineage_listener().adapter.get_or_create_openlineage_client()
162+
134163
# process each step in loop, sending generated events in the same order as steps
135-
for artifacts in step_artifacts:
164+
for counter, artifacts in enumerate(step_artifacts, 1):
165+
log.debug("Parsing information about artifact no. %s.", counter)
166+
136167
# process manifest
137168
manifest = artifacts["manifest.json"]
138169

139170
if not artifacts.get("run_results.json", None):
171+
log.debug("No run results found for artifact no. %s. Skipping.", counter)
140172
continue
141173

142174
processor = DbtCloudArtifactProcessor(
@@ -150,26 +182,14 @@ async def get_artifacts_for_steps(steps, artifacts):
150182
catalog=catalog,
151183
)
152184

153-
# generate same run id of current task instance
154-
parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
155-
dag_id=task_instance.dag_id,
156-
task_id=operator.task_id,
157-
logical_date=_get_logical_date(task_instance),
158-
try_number=_get_try_number(task_instance),
159-
map_index=task_instance.map_index,
160-
)
161-
162-
parent_job = ParentRunMetadata(
163-
run_id=parent_run_id,
164-
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
165-
job_namespace=namespace(),
166-
)
167185
processor.dbt_run_metadata = parent_job
168186

169187
events = processor.parse().events()
170-
171-
client = get_openlineage_listener().adapter.get_or_create_openlineage_client()
188+
log.debug("Found %s OpenLineage events for artifact no. %s.", len(events), counter)
172189

173190
for event in events:
174191
client.emit(event=event)
192+
log.debug("Emitted all OpenLineage events for artifact no. %s.", counter)
193+
194+
log.info("OpenLineage has successfully finished processing information about DBT job run.")
175195
return OperatorLineage()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY
19+
# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS
20+
# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT
21+
# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE
22+
#
23+
from __future__ import annotations
24+
25+
26+
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
27+
from packaging.version import Version
28+
29+
from airflow import __version__
30+
31+
airflow_version = Version(__version__)
32+
return airflow_version.major, airflow_version.minor, airflow_version.micro
33+
34+
35+
AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
36+
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)

tests/always/test_project_structure.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def test_providers_modules_should_have_tests(self):
108108
"providers/common/messaging/tests/unit/common/messaging/providers/test_base_provider.py",
109109
"providers/common/messaging/tests/unit/common/messaging/providers/test_sqs.py",
110110
"providers/databricks/tests/unit/databricks/test_version_compat.py",
111+
"providers/dbt/cloud/tests/unit/dbt/cloud/test_version_compat.py",
111112
"providers/edge/tests/unit/edge/models/test_edge_job.py",
112113
"providers/edge/tests/unit/edge/models/test_edge_logs.py",
113114
"providers/edge/tests/unit/edge/models/test_edge_worker.py",

0 commit comments

Comments
 (0)