Skip to content

Commit 22cb741

Browse files
authored
Make sqlalchemy optional dependency for openlineage provider (#59921)
* Make sqlalchemy optional dependency for openlineage provider * Refactor sqlalchemy imports to be lazy in openlineage utils * The function is already guarded by 'if not AIRFLOW_V_3_0_PLUS:' so the inner check for AIRFLOW_V_3_0_PLUS was unreachable dead code. * Remove sqlalchemy-specific ImportError handling in get_openlineage_facets_with_sql
1 parent 945f981 commit 22cb741

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

providers/openlineage/pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ dependencies = [
6666
"openlineage-python>=1.41.0",
6767
]
6868

69+
# The optional dependencies should be modified in place in the generated file
70+
# Any change in the dependencies is preserved when the file is regenerated
71+
[project.optional-dependencies]
72+
"sqlalchemy" = [
73+
"sqlalchemy>=1.4.49",
74+
]
75+
6976
[dependency-groups]
7077
dev = [
7178
"apache-airflow",

providers/openlineage/src/airflow/providers/openlineage/utils/sql.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
from attrs import define
2626
from openlineage.client.event_v2 import Dataset
2727
from openlineage.client.facet_v2 import schema_dataset
28-
from sqlalchemy import Column, MetaData, Table, and_, or_, union_all
28+
29+
from airflow.exceptions import AirflowOptionalProviderFeatureException
2930

3031
if TYPE_CHECKING:
32+
from sqlalchemy import Table
3133
from sqlalchemy.engine import Engine
3234
from sqlalchemy.sql.elements import ColumnElement
3335

@@ -157,6 +159,13 @@ def create_information_schema_query(
157159
sqlalchemy_engine: Engine | None = None,
158160
) -> str:
159161
"""Create query for getting table schemas from information schema."""
162+
try:
163+
from sqlalchemy import Column, MetaData, Table, union_all
164+
except ImportError:
165+
raise AirflowOptionalProviderFeatureException(
166+
"sqlalchemy is required for SQL schema query generation. "
167+
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
168+
)
160169
metadata = MetaData()
161170
select_statements = []
162171
# Don't iterate over tables hierarchy, just pass it to query single information schema table
@@ -217,6 +226,13 @@ def create_filter_clauses(
217226
therefore it is expected the table has them defined.
218227
:param uppercase_names: if True use schema and table names uppercase
219228
"""
229+
try:
230+
from sqlalchemy import and_, or_
231+
except ImportError:
232+
raise AirflowOptionalProviderFeatureException(
233+
"sqlalchemy is required for SQL filter clause generation. "
234+
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
235+
)
220236
table_schema_column_name = information_schema_table.columns[ColumnIndex.SCHEMA].name
221237
table_name_column_name = information_schema_table.columns[ColumnIndex.TABLE_NAME].name
222238
try:

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from openlineage.client.utils import RedactMixin
3232

3333
from airflow import __version__ as AIRFLOW_VERSION
34+
from airflow.exceptions import AirflowOptionalProviderFeatureException
3435

3536
# TODO: move this maybe to Airflow's logic?
3637
from airflow.models import DagRun, TaskInstance, TaskReschedule
@@ -537,24 +538,19 @@ def is_selective_lineage_enabled(obj: DAG | SerializedDAG | AnyOperator) -> bool
537538

538539
@provide_session
539540
def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
540-
from sqlalchemy import exists, select
541+
try:
542+
from sqlalchemy import exists, select
543+
except ImportError:
544+
raise AirflowOptionalProviderFeatureException(
545+
"sqlalchemy is required for checking task instance reschedule status. "
546+
"Install it with: pip install 'apache-airflow-providers-openlineage[sqlalchemy]'"
547+
)
541548

542549
if not isinstance(ti.task, BaseSensorOperator):
543550
return False
544551

545552
if not ti.task.reschedule:
546553
return False
547-
if AIRFLOW_V_3_0_PLUS:
548-
return (
549-
session.scalar(
550-
select(
551-
exists().where(
552-
TaskReschedule.ti_id == ti.id, TaskReschedule.try_number == ti.try_number
553-
)
554-
)
555-
)
556-
is True
557-
)
558554
return (
559555
session.scalar(
560556
select(

0 commit comments

Comments
 (0)