Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ uv run pytest tests/ --cov=dataframe_expectations

**Basic usage with Pandas:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
import pandas as pd

# Build a suite with expectations
Expand All @@ -82,7 +82,7 @@ runner.run(df)

**PySpark example:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
from pyspark.sql import SparkSession

# Initialize Spark session
Expand Down Expand Up @@ -116,7 +116,7 @@ runner.run(df)

**Decorator pattern for automatic validation:**
```python
from dataframe_expectations.expectations_suite import DataFrameExpectationsSuite
from dataframe_expectations.suite import DataFrameExpectationsSuite
from pyspark.sql import SparkSession

# Initialize Spark session
Expand Down
22 changes: 9 additions & 13 deletions dataframe_expectations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from enum import Enum
from typing import Union
"""DataFrame Expectations - A validation library for pandas and PySpark DataFrames."""

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame
try:
from importlib.metadata import version

DataFrameLike = Union[PySparkDataFrame, PandasDataFrame]
__version__ = version("dataframe-expectations")
except Exception:
# Package is not installed (e.g., during development or linting)
# Catch all exceptions to handle various edge cases in different environments
__version__ = "0.0.0.dev0"


class DataFrameType(str, Enum):
"""
Enum for DataFrame types.
"""

PANDAS = "pandas"
PYSPARK = "pyspark"
__all__ = []
3 changes: 3 additions & 0 deletions dataframe_expectations/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Core base classes and interfaces for DataFrame expectations."""

__all__ = []
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from abc import abstractmethod
from typing import List, Union

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
110 changes: 110 additions & 0 deletions dataframe_expectations/core/expectation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from abc import ABC, abstractmethod
from typing import cast

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame

# Import the connect DataFrame type for Spark Connect
try:
from pyspark.sql.connect.dataframe import DataFrame as PySparkConnectDataFrame
except ImportError:
# Fallback for older PySpark versions that don't have connect
PySparkConnectDataFrame = None # type: ignore[misc,assignment]

from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.result_message import (
DataFrameExpectationResultMessage,
)


class DataFrameExpectation(ABC):
"""
Base class for DataFrame expectations.
"""

def get_expectation_name(self) -> str:
"""
Returns the class name as the expectation name.
"""
return type(self).__name__

@abstractmethod
def get_description(self) -> str:
"""
Returns a description of the expectation.
"""
raise NotImplementedError(
f"description method must be implemented for {self.__class__.__name__}"
)

def __str__(self):
"""
Returns a string representation of the expectation.
"""
return f"{self.get_expectation_name()} ({self.get_description()})"

@classmethod
def infer_data_frame_type(cls, data_frame: DataFrameLike) -> DataFrameType:
"""
Infer the DataFrame type based on the provided DataFrame.
"""
if isinstance(data_frame, PandasDataFrame):
return DataFrameType.PANDAS
elif isinstance(data_frame, PySparkDataFrame):
return DataFrameType.PYSPARK
elif PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
else:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")

def validate(self, data_frame: DataFrameLike, **kwargs):
"""
Validate the DataFrame against the expectation.
"""
data_frame_type = self.infer_data_frame_type(data_frame)

if data_frame_type == DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
elif data_frame_type == DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")

@abstractmethod
def validate_pandas(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a pandas DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pandas method must be implemented for {self.__class__.__name__}"
)

@abstractmethod
def validate_pyspark(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a PySpark DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pyspark method must be implemented for {self.__class__.__name__}"
)

@classmethod
def num_data_frame_rows(cls, data_frame: DataFrameLike) -> int:
"""
Count the number of rows in the DataFrame.
"""
data_frame_type = cls.infer_data_frame_type(data_frame)
if data_frame_type == DataFrameType.PANDAS:
# Cast to PandasDataFrame since we know it's a Pandas DataFrame at this point
return len(cast(PandasDataFrame, data_frame))
elif data_frame_type == DataFrameType.PYSPARK:
# Cast to PySparkDataFrame since we know it's a PySpark DataFrame at this point
return cast(PySparkDataFrame, data_frame).count()
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")
57 changes: 57 additions & 0 deletions dataframe_expectations/core/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Core types, enums, and data models for dataframe-expectations."""

from enum import Enum
from typing import Any, Dict, Union

from pandas import DataFrame as PandasDataFrame
from pydantic import BaseModel, ConfigDict, Field
from pyspark.sql import DataFrame as PySparkDataFrame

# Type aliases
DataFrameLike = Union[PySparkDataFrame, PandasDataFrame]


class DataFrameType(str, Enum):
"""Enum for DataFrame types."""

PANDAS = "pandas"
PYSPARK = "pyspark"


class ExpectationCategory(str, Enum):
"""Categories for expectations."""

COLUMN_EXPECTATIONS = "Column Expectations"
COLUMN_AGGREGATION_EXPECTATIONS = "Column Aggregation Expectations"
DATAFRAME_AGGREGATION_EXPECTATIONS = "DataFrame Aggregation Expectations"


class ExpectationSubcategory(str, Enum):
"""Subcategory of expectations."""

ANY_VALUE = "Any Value"
NUMERICAL = "Numerical"
STRING = "String"
UNIQUE = "Unique"


class ExpectationMetadata(BaseModel):
"""Metadata for a registered expectation."""

suite_method_name: str = Field(
..., description="Method name in ExpectationsSuite (e.g., 'expect_value_greater_than')"
)
pydoc: str = Field(..., description="Human-readable description of the expectation")
category: ExpectationCategory = Field(..., description="Category (e.g., 'Column Expectations')")
subcategory: ExpectationSubcategory = Field(
..., description="Subcategory (e.g., 'Numerical', 'String')"
)
params_doc: Dict[str, str] = Field(..., description="Documentation for each parameter")
params: list = Field(default_factory=list, description="List of required parameter names")
param_types: Dict[str, Any] = Field(
default_factory=dict, description="Type hints for parameters"
)
factory_func_name: str = Field(..., description="Name of the factory function")
expectation_name: str = Field(..., description="Name of the expectation class")

model_config = ConfigDict(frozen=True) # Make model immutable
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import wraps
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union, get_args

from dataframe_expectations.expectations import DataFrameExpectation
from dataframe_expectations.core.expectation import DataFrameExpectation


def requires_params(
Expand Down
111 changes: 1 addition & 110 deletions dataframe_expectations/expectations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,110 +1 @@
from abc import ABC, abstractmethod
from typing import cast

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame

# Import the connect DataFrame type for Spark Connect
try:
from pyspark.sql.connect.dataframe import DataFrame as PySparkConnectDataFrame
except ImportError:
# Fallback for older PySpark versions that don't have connect
PySparkConnectDataFrame = None # type: ignore[misc,assignment]

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.result_message import (
DataFrameExpectationResultMessage,
)


class DataFrameExpectation(ABC):
"""
Base class for DataFrame expectations.
"""

def get_expectation_name(self) -> str:
"""
Returns the class name as the expectation name.
"""
return type(self).__name__

@abstractmethod
def get_description(self) -> str:
"""
Returns a description of the expectation.
"""
raise NotImplementedError(
f"description method must be implemented for {self.__class__.__name__}"
)

def __str__(self):
"""
Returns a string representation of the expectation.
"""
return f"{self.get_expectation_name()} ({self.get_description()})"

@classmethod
def infer_data_frame_type(cls, data_frame: DataFrameLike) -> DataFrameType:
"""
Infer the DataFrame type based on the provided DataFrame.
"""
if isinstance(data_frame, PandasDataFrame):
return DataFrameType.PANDAS
elif isinstance(data_frame, PySparkDataFrame):
return DataFrameType.PYSPARK
elif PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
else:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")

def validate(self, data_frame: DataFrameLike, **kwargs):
"""
Validate the DataFrame against the expectation.
"""
data_frame_type = self.infer_data_frame_type(data_frame)

if data_frame_type == DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
elif data_frame_type == DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")

@abstractmethod
def validate_pandas(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a pandas DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pandas method must be implemented for {self.__class__.__name__}"
)

@abstractmethod
def validate_pyspark(
self, data_frame: DataFrameLike, **kwargs
) -> DataFrameExpectationResultMessage:
"""
Validate a PySpark DataFrame against the expectation.
"""
raise NotImplementedError(
f"validate_pyspark method must be implemented for {self.__class__.__name__}"
)

@classmethod
def num_data_frame_rows(cls, data_frame: DataFrameLike) -> int:
"""
Count the number of rows in the DataFrame.
"""
data_frame_type = cls.infer_data_frame_type(data_frame)
if data_frame_type == DataFrameType.PANDAS:
# Cast to PandasDataFrame since we know it's a Pandas DataFrame at this point
return len(cast(PandasDataFrame, data_frame))
elif data_frame_type == DataFrameType.PYSPARK:
# Cast to PySparkDataFrame since we know it's a PySpark DataFrame at this point
return cast(PySparkDataFrame, data_frame).count()
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")
"""Expectations package - contains all expectation implementations."""
3 changes: 3 additions & 0 deletions dataframe_expectations/expectations/aggregation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Aggregation expectations."""

__all__ = []
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F

from dataframe_expectations import DataFrameLike, DataFrameType
from dataframe_expectations.expectations.aggregation_expectation import (
from dataframe_expectations.core.aggregation_expectation import (
DataFrameAggregationExpectation,
)
from dataframe_expectations.expectations.expectation_registry import (
from dataframe_expectations.core.types import (
ExpectationCategory,
ExpectationSubcategory,
register_expectation,
DataFrameLike,
DataFrameType,
)
from dataframe_expectations.expectations.utils import requires_params
from dataframe_expectations.registry import register_expectation
from dataframe_expectations.core.utils import requires_params
from dataframe_expectations.result_message import (
DataFrameExpectationFailureMessage,
DataFrameExpectationResultMessage,
Expand Down
Loading
Loading