Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9938fb1
Add analytics operator
gopidesupavan Feb 20, 2026
24012b9
Add analytics operator
gopidesupavan Feb 20, 2026
6b37769
Fix tests
gopidesupavan Feb 20, 2026
d1db351
Add docs
gopidesupavan Feb 20, 2026
8472844
update params
gopidesupavan Feb 20, 2026
5303379
update datafusion version
gopidesupavan Feb 21, 2026
8c4a18d
Fix selective checks
gopidesupavan Feb 21, 2026
b5095a8
Fix tests
gopidesupavan Feb 21, 2026
36d088e
Fix mypy checks
gopidesupavan Feb 21, 2026
88abbfa
move examples to example_dag folder
gopidesupavan Feb 21, 2026
9c84af1
Update docs
gopidesupavan Feb 22, 2026
53fbdea
Update docs
gopidesupavan Feb 22, 2026
1dde6d7
Update datasource config to support options parameter
gopidesupavan Feb 22, 2026
ce1b52b
Update docstring
gopidesupavan Feb 22, 2026
bc9dd21
Resolve comments
gopidesupavan Feb 23, 2026
0644756
Resolve comments
gopidesupavan Feb 24, 2026
259a34f
Resolve comments
gopidesupavan Feb 24, 2026
c7bc7c6
fixup tests
gopidesupavan Feb 25, 2026
12ab800
Move analytics operator to common-sql
gopidesupavan Feb 27, 2026
56d171d
Fixup tests
gopidesupavan Feb 27, 2026
5ed8d92
Updated changes imports
gopidesupavan Feb 27, 2026
316057a
Updated changes test paths
gopidesupavan Feb 27, 2026
b141a12
Resolve comments
gopidesupavan Feb 27, 2026
428b8d0
Update endpoint in extras
gopidesupavan Feb 27, 2026
2f12141
Merge branch 'main' into analytics-operator
gopidesupavan Feb 27, 2026
de2931c
Update dependency
gopidesupavan Feb 27, 2026
0351be1
Update dependency
gopidesupavan Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow-core/tests/unit/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def test_providers_modules_should_have_tests(self):
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_delete_from.py",
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_k8s_hashlib_wrapper.py",
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py",
"providers/common/sql/tests/unit/common/sql/datafusion/test_base.py",
"providers/common/sql/tests/unit/common/sql/datafusion/test_exceptions.py",
"providers/common/compat/tests/unit/common/compat/lineage/test_entities.py",
"providers/common/compat/tests/unit/common/compat/standard/test_operators.py",
"providers/common/compat/tests/unit/common/compat/standard/test_triggers.py",
Expand Down
3 changes: 3 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,10 @@ datasetId
Datasets
datasets
datasource
DataSourceConfig
Datastore
datastore
datastores
Datasync
datasync
datatransfer
Expand Down Expand Up @@ -2062,6 +2064,7 @@ wape
warmup
Wasb
wasb
wasn
weaviate
WebClient
webhdfs
Expand Down
3 changes: 2 additions & 1 deletion providers/common/sql/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ You can install such cross-provider dependencies when installing from PyPI. For

.. code-block:: bash

pip install apache-airflow-providers-common-sql[common.compat]
pip install apache-airflow-providers-common-sql[amazon]


================================================================================================================== =================
Dependent package Extra
================================================================================================================== =================
`apache-airflow-providers-amazon <https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_ ``amazon``
`apache-airflow-providers-common-compat <https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_ ``common.compat``
`apache-airflow-providers-openlineage <https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_ ``openlineage``
================================================================================================================== =================
Expand Down
65 changes: 65 additions & 0 deletions providers/common/sql/docs/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,68 @@ between two connections.
:dedent: 4
:start-after: [START howto_operator_generic_transfer]
:end-before: [END howto_operator_generic_transfer]

Analytics Operator
~~~~~~~~~~~~~~~~~~

The Analytics operator is designed to run analytic queries on data stored in various datastores. It is a generic operator that can query data in S3, GCS, Azure, and Local File System.

When to Use Analytics Operator
------------------------------

The Analytics Operator is ideal for performing efficient, high-performance analytics on large volumes of data across various storage systems. Under the hood, it uses Apache DataFusion, a high-performance, extensible query engine for Apache Arrow, which enables fast SQL queries on various data formats and storage systems. DataFusion is chosen for its ability to handle large-scale data processing on a single node, providing low-latency analytics without the need for a full database setup and without the need for high compute clusters. For more on Analytics Operator with DataFusion use cases, see `<https://datafusion.apache.org/user-guide/introduction.html#use-cases>`_.


Supported Storage Systems
-------------------------
- S3
- Local File System

.. note::
GCS, Azure, HTTP, Delta, Iceberg are not yet supported but will be added in the future.



Supported File Formats
----------------------
- Parquet
- CSV
- Avro

.. _howto/operator:AnalyticsOperator:

Use the :class:`~airflow.providers.common.sql.operators.analytics.AnalyticsOperator` to run analytic queries.

Parameters
----------
* ``datasource_configs`` (list[DataSourceConfig], required): List of datasource configurations
* ``queries`` (list[str], required): List of SQL queries to run on the data
* ``max_rows_check`` (int, optional): Maximum number of rows to check for each query. Default is 100. If any query returns more than this number of rows, it will be skipped in the results returned by the operator. This is to prevent returning too many rows in the results which can cause xcom rendering issues in Airflow UI.
* ``engine`` (DataFusionEngine, optional): Query engine to use. Default is "datafusion". Currently, only "datafusion" is supported.
* ``result_output_format`` (str, optional): Output format for the results. Default is ``tabulate``. Supported formats are ``tabulate``, ``json``.

DataSourceConfig Parameters
---------------------------

* ``conn_id`` (str, required): Connection ID of the storage. e.g: "aws_default" for S3.
* ``uri`` (str, required): URI of the datasource.
* ``format`` (str, required): Format of the data.
* ``table_name`` (str, required): Name of the table. Note: This name can be any identifier and should match the table name used in the SQL queries.
* ``storage_type`` (StorageType, optional): Type of storage. Default is None. If not provided, it will be inferred from the URI.
* ``options`` (dict, optional): Additional options for the datasource. eg: if the datasource is partitioned, you can provide partitioning information in the options, e.g: ``{"table_partition_cols": [("year", "integer")]}``

S3 Storage
----------
.. exampleinclude:: /../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
:language: python
:dedent: 4
:start-after: [START howto_analytics_operator_with_s3]
:end-before: [END howto_analytics_operator_with_s3]

Local File System Storage
-------------------------
.. exampleinclude:: /../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
:language: python
:dedent: 4
:start-after: [START howto_analytics_operator_with_local]
:end-before: [END howto_analytics_operator_with_local]
1 change: 1 addition & 0 deletions providers/common/sql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ operators:
python-modules:
- airflow.providers.common.sql.operators.sql
- airflow.providers.common.sql.operators.generic_transfer
- airflow.providers.common.sql.operators.analytics

dialects:
- dialect-type: default
Expand Down
10 changes: 9 additions & 1 deletion providers/common/sql/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,19 @@ dependencies = [
"sqlalchemy" = [
"sqlalchemy>=1.4.54",
]
"amazon" = [
"apache-airflow-providers-amazon"
]
"datafusion" =[
"datafusion>=50.0.0",
]

[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-amazon",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
Expand All @@ -100,7 +107,8 @@ dev = [
"apache-airflow-providers-postgres",
"apache-airflow-providers-odbc",
"apache-airflow-providers-sqlite",
"apache-airflow-providers-common-sql[sqlalchemy]"
"apache-airflow-providers-common-sql[sqlalchemy]",
"datafusion>=50.0.0"
]

# To build docs:
Expand Down
86 changes: 86 additions & 0 deletions providers/common/sql/src/airflow/providers/common/sql/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Any


@dataclass(frozen=True)
class ConnectionConfig:
"""Configuration for datafusion object store connections."""

conn_id: str
credentials: dict[str, Any] = field(default_factory=dict)
extra_config: dict[str, Any] = field(default_factory=dict)


class FormatType(str, Enum):
"""Supported data formats."""

PARQUET = "parquet"
CSV = "csv"
AVRO = "avro"


class StorageType(str, Enum):
"""Storage types for Data Fusion."""

S3 = "s3"
LOCAL = "local"


@dataclass
class DataSourceConfig:
"""
Configuration for an input data source.

:param conn_id: The connection ID to use for accessing the data source.
:param uri: The URI of the data source (e.g., file path, S3 bucket, etc.).
:param format: The format of the data (e.g., 'parquet', 'csv').
:param table_name: The name of the table if applicable.
:param schema: A dictionary mapping column names to their types.
:param db_name: The database name if applicable.
:param storage_type: The type of storage (automatically inferred from URI).
:param options: Additional options for the data source. eg: you can set partition columns to any datasource
that will be set in while registering the data
"""

conn_id: str
uri: str
format: str | None = None
table_name: str | None = None
storage_type: StorageType | None = None
options: dict[str, Any] = field(default_factory=dict)

def __post_init__(self):

if self.storage_type is None:
self.storage_type = self._extract_storage_type

if self.storage_type is not None and self.table_name is None:
raise ValueError("Table name must be provided for storage type")

@property
def _extract_storage_type(self) -> StorageType | None:
"""Extract storage type."""
if self.uri.startswith("s3://"):
return StorageType.S3
if self.uri.startswith("file://"):
return StorageType.LOCAL
raise ValueError(f"Unsupported storage type for URI: {self.uri}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from datafusion import SessionContext

from airflow.providers.common.sql.config import ConnectionConfig, FormatType, StorageType


class ObjectStorageProvider(LoggingMixin, ABC):
"""Abstract base class for object storage providers."""

@property
def get_storage_type(self) -> StorageType:
"""Return storage type handled by this provider (e.g., 's3', 'gcs', 'local')."""
raise NotImplementedError

@abstractmethod
def create_object_store(self, path: str, connection_config: ConnectionConfig | None = None) -> Any:
"""Create and return a DataFusion object store instance."""
raise NotImplementedError

@abstractmethod
def get_scheme(self) -> str:
"""Return URL scheme for this storage type (e.g., 's3://', 'gs://')."""
raise NotImplementedError

def get_bucket(self, path: str) -> str | None:
"""Extract the bucket name from the given path."""
if path and path.startswith(self.get_scheme()):
path_parts = path[len(self.get_scheme()) :].split("/", 1)
return path_parts[0]
return None


class FormatHandler(LoggingMixin, ABC):
"""Abstract base class for format handlers."""

@property
def get_format(self) -> FormatType:
"""Return file format type."""
raise NotImplementedError

@abstractmethod
def register_data_source_format(self, ctx: SessionContext, table_name: str, path: str) -> None:
"""Register data source format."""
raise NotImplementedError
Loading
Loading