Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ uv run pytest tests/ --cov=dataframe_expectations

**Basic usage with Pandas:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
import pandas as pd

# Build a suite with expectations
Expand All @@ -82,7 +82,7 @@ runner.run(df)

**PySpark example:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
from pyspark.sql import SparkSession

# Initialize Spark session
Expand Down Expand Up @@ -116,7 +116,7 @@ runner.run(df)

**Decorator pattern for automatic validation:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
from pyspark.sql import SparkSession

# Initialize Spark session
Expand Down
22 changes: 9 additions & 13 deletions dataframe_expectations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from enum import Enum
from typing import Union
"""DataFrame Expectations - A validation library for pandas and PySpark DataFrames."""

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame
try:
from importlib.metadata import version

DataFrameLike = Union[PySparkDataFrame, PandasDataFrame]
__version__ = version("dataframe-expectations")
except Exception:
# Package is not installed (e.g., during development or linting)
# Catch all exceptions to handle various edge cases in different environments
__version__ = "0.0.0.dev0"


class DataFrameType(str, Enum):
"""
Enum for DataFrame types.
"""

PANDAS = "pandas"
PYSPARK = "pyspark"
__all__ = []
3 changes: 3 additions & 0 deletions dataframe_expectations/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Core base classes and interfaces for DataFrame expectations."""

__all__ = []
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from abc import abstractmethod
from typing import List, Union

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
110 changes: 110 additions & 0 deletions dataframe_expectations/core/expectation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from abc import ABC, abstractmethod
from typing import cast

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame

# Import the connect DataFrame type for Spark Connect
try:
from pyspark.sql.connect.dataframe import DataFrame as PySparkConnectDataFrame
except ImportError:
# Fallback for older PySpark versions that don't have connect
PySparkConnectDataFrame = None # type: ignore[misc,assignment]

from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.result_message import (
DataFrameExpectationResultMessage,
)


class DataFrameExpectation(ABC):
"""
Base class for DataFrame expectations.
"""

def get_expectation_name(self) -> str:
"""
Returns the class name as the expectation name.
"""
return type(self).__name__

@abstractmethod
def get_description(self) -> str:
"""
Returns a description of the expectation.
"""
raise NotImplementedError(
f"description method must be implemented for {self.__class__.__name__}"
)

def __str__(self):
"""
Returns a string representation of the expectation.
"""
return f"{self.get_expectation_name()} ({self.get_description()})"

@classmethod
def infer_data_frame_type(cls, data_frame: DataFrameLike) -> DataFrameType:
"""
Infer the DataFrame type based on the provided DataFrame.
"""
if isinstance(data_frame, PandasDataFrame):
return DataFrameType.PANDAS
elif isinstance(data_frame, PySparkDataFrame):
return DataFrameType.PYSPARK
elif PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
else:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")

def validate(self, data_frame: DataFrameLike, **kwargs):
"""
Validate the DataFrame against the expectation.
"""
data_frame_type = self.infer_data_frame_type(data_frame)

if data_frame_type == DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
elif data_frame_type == DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")

@abstractmethod
def validate_pandas(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a pandas DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pandas method must be implemented for {self.__class__.__name__}"
)

@abstractmethod
def validate_pyspark(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a PySpark DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pyspark method must be implemented for {self.__class__.__name__}"
)

@classmethod
def num_data_frame_rows(cls, data_frame: DataFrameLike) -> int:
"""
Count the number of rows in the DataFrame.
"""
data_frame_type = cls.infer_data_frame_type(data_frame)
if data_frame_type == DataFrameType.PANDAS:
# Cast to PandasDataFrame since we know it's a Pandas DataFrame at this point
return len(cast(PandasDataFrame, data_frame))
elif data_frame_type == DataFrameType.PYSPARK:
# Cast to PySparkDataFrame since we know it's a PySpark DataFrame at this point
return cast(PySparkDataFrame, data_frame).count()
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")
57 changes: 57 additions & 0 deletions dataframe_expectations/core/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Core types, enums, and data models for dataframe-expectations."""

from enum import Enum
from typing import Any, Dict, Union

from pandas import DataFrame as PandasDataFrame
from pydantic import BaseModel, ConfigDict, Field
from pyspark.sql import DataFrame as PySparkDataFrame

# Type aliases
DataFrameLike = Union[PySparkDataFrame, PandasDataFrame]


class DataFrameType(str, Enum):
"""Enum for DataFrame types."""

PANDAS = "pandas"
PYSPARK = "pyspark"


class ExpectationCategory(str, Enum):
"""Categories for expectations."""

COLUMN_EXPECTATIONS = "Column Expectations"
COLUMN_AGGREGATION_EXPECTATIONS = "Column Aggregation Expectations"
DATAFRAME_AGGREGATION_EXPECTATIONS = "DataFrame Aggregation Expectations"


class ExpectationSubcategory(str, Enum):
"""Subcategory of expectations."""

ANY_VALUE = "Any Value"
NUMERICAL = "Numerical"
STRING = "String"
UNIQUE = "Unique"


class ExpectationMetadata(BaseModel):
"""Metadata for a registered expectation."""

suite_method_name: str = Field(
..., description="Method name in ExpectationsSuite (e.g., 'expect_value_greater_than')"
)
pydoc: str = Field(..., description="Human-readable description of the expectation")
category: ExpectationCategory = Field(..., description="Category (e.g., 'Column Expectations')")
subcategory: ExpectationSubcategory = Field(
..., description="Subcategory (e.g., 'Numerical', 'String')"
)
params_doc: Dict[str, str] = Field(..., description="Documentation for each parameter")
params: list = Field(default_factory=list, description="List of required parameter names")
param_types: Dict[str, Any] = Field(
default_factory=dict, description="Type hints for parameters"
)
factory_func_name: str = Field(..., description="Name of the factory function")
expectation_name: str = Field(..., description="Name of the expectation class")

model_config = ConfigDict(frozen=True) # Make model immutable
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import wraps
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union, get_args

from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.expectation import DataFrameExpectation


def requires_params(
Expand Down
111 changes: 1 addition & 110 deletions dataframe_expectations/expectations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,110 +1 @@
from abc import ABC, abstractmethod
from typing import cast

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame

# Import the connect DataFrame type for Spark Connect
try:
from pyspark.sql.connect.dataframe import DataFrame as PySparkConnectDataFrame
except ImportError:
# Fallback for older PySpark versions that don't have connect
PySparkConnectDataFrame = None # type: ignore[misc,assignment]

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.result_message import (
DataFrameExpectationResultMessage,
)


class DataFrameExpectation(ABC):
"""
Base class for DataFrame expectations.
"""

def get_expectation_name(self) -> str:
"""
Returns the class name as the expectation name.
"""
return type(self).__name__

@abstractmethod
def get_description(self) -> str:
"""
Returns a description of the expectation.
"""
raise NotImplementedError(
f"description method must be implemented for {self.__class__.__name__}"
)

def __str__(self):
"""
Returns a string representation of the expectation.
"""
return f"{self.get_expectation_name()} ({self.get_description()})"

@classmethod
def infer_data_frame_type(cls, data_frame: DataFrameLike) -> DataFrameType:
"""
Infer the DataFrame type based on the provided DataFrame.
"""
if isinstance(data_frame, PandasDataFrame):
return DataFrameType.PANDAS
elif isinstance(data_frame, PySparkDataFrame):
return DataFrameType.PYSPARK
elif PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
else:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")

def validate(self, data_frame: DataFrameLike, **kwargs):
"""
Validate the DataFrame against the expectation.
"""
data_frame_type = self.infer_data_frame_type(data_frame)

if data_frame_type == DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
elif data_frame_type == DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")

@abstractmethod
def validate_pandas(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a pandas DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pandas method must be implemented for {self.__class__.__name__}"
)

@abstractmethod
def validate_pyspark(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a PySpark DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pyspark method must be implemented for {self.__class__.__name__}"
)

@classmethod
def num_data_frame_rows(cls, data_frame: DataFrameLike) -> int:
"""
Count the number of rows in the DataFrame.
"""
data_frame_type = cls.infer_data_frame_type(data_frame)
if data_frame_type == DataFrameType.PANDAS:
# Cast to PandasDataFrame since we know it's a Pandas DataFrame at this point
return len(cast(PandasDataFrame, data_frame))
elif data_frame_type == DataFrameType.PYSPARK:
# Cast to PySparkDataFrame since we know it's a PySpark DataFrame at this point
return cast(PySparkDataFrame, data_frame).count()
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")
"""Expectations package - contains all expectation implementations."""
3 changes: 3 additions & 0 deletions dataframe_expectations/expectations/aggregation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Aggregation expectations."""

__all__ = []
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations.aggregation_expectation import (
from dataframe_expectations.core.aggregation_expectation import (
DataFrameAggregationExpectation,
)
from dataframe_expectations.expectations.expectation_registry import (
from dataframe_expectations.core.types import (
ExpectationCategory,
ExpectationSubcategory,
register_expectation,
DataFrameLike,
DataFrameType,
)
from dataframe_expectations.expectations.utils import requires_params
from dataframe_expectations.registry import register_expectation
from dataframe_expectations.core.utils import requires_params
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
Loading