Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
uv run python scripts/sanity_checks.py
- name: Run tests
run: |
uv run pytest tests/ --cov=dataframe_expectations
uv run pytest tests/ -n auto --tb=line --cov=dataframe_expectations

lint:
runs-on: ubuntu-latest
Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,44 @@ Some examples of violations:

```

**Tag-based filtering for selective execution:**
```python
from dataframe_expectations import DataFrameExpectationsSuite, TagMatchMode

# Tag expectations with priorities and environments
suite = (
DataFrameExpectationsSuite()
.expect_value_greater_than(column_name="age", value=18, tags=["priority:high", "env:prod"])
.expect_value_not_null(column_name="name", tags=["priority:high"])
.expect_min_rows(min_rows=1, tags=["priority:low", "env:test"])
)

# Run only high-priority checks (OR logic - matches ANY tag)
runner = suite.build(tags=["priority:high"], tag_match_mode=TagMatchMode.ANY)
runner.run(df)

# Run production-critical checks (AND logic - matches ALL tags)
runner = suite.build(tags=["priority:high", "env:prod"], tag_match_mode=TagMatchMode.ALL)
runner.run(df)
```

**Programmatic result inspection:**
```python
# Get detailed results without raising exceptions
result = runner.run(df, raise_on_failure=False)

# Inspect validation outcomes
print(f"Total: {result.total_expectations}, Passed: {result.total_passed}, Failed: {result.total_failed}")
print(f"Pass rate: {result.pass_rate:.2%}")
print(f"Duration: {result.total_duration_seconds:.2f}s")
print(f"Applied filters: {result.applied_filters}")

# Access individual results
for exp_result in result.results:
if exp_result.status == "failed":
print(f"Failed: {exp_result.description} - {exp_result.violation_count} violations")
```

### How to contribute?
Contributions are welcome! You can enhance the library by adding new expectations, refining existing ones, or improving the testing framework.

Expand Down
22 changes: 21 additions & 1 deletion dataframe_expectations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,24 @@
# Catch all exceptions to handle various edge cases in different environments
__version__ = "0.0.0.dev0"

__all__ = []
from dataframe_expectations.core.suite_result import (
ExpectationResult,
SuiteExecutionResult,
serialize_violations,
)
from dataframe_expectations.core.types import TagMatchMode
from dataframe_expectations.suite import (
DataFrameExpectationsSuite,
DataFrameExpectationsSuiteRunner,
DataFrameExpectationsSuiteFailure,
)

__all__ = [
"ExpectationResult",
"SuiteExecutionResult",
"serialize_violations",
"DataFrameExpectationsSuite",
"DataFrameExpectationsSuiteRunner",
"DataFrameExpectationsSuiteFailure",
"TagMatchMode",
]
14 changes: 13 additions & 1 deletion dataframe_expectations/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
"""Core base classes and interfaces for DataFrame expectations."""

__all__ = []
from dataframe_expectations.core.suite_result import (
ExpectationResult,
ExpectationStatus,
SuiteExecutionResult,
serialize_violations,
)

__all__ = [
"ExpectationResult",
"ExpectationStatus",
"SuiteExecutionResult",
"serialize_violations",
]
6 changes: 5 additions & 1 deletion dataframe_expectations/core/aggregation_expectation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import List, Union
from typing import List, Optional, Union

from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
Expand All @@ -20,6 +20,7 @@ def __init__(
expectation_name: str,
column_names: List[str],
description: str,
tags: Optional[List[str]] = None,
):
"""
Template for implementing DataFrame aggregation expectations, where data is first aggregated
Expand All @@ -28,7 +29,10 @@ def __init__(
:param expectation_name: The name of the expectation. This will be used during logging.
:param column_names: The list of column names to aggregate on.
:param description: A description of the expectation used in logging.
:param tags: Optional tags as list of strings in "key:value" format.
Example: ["priority:high", "env:test"]
"""
super().__init__(tags=tags)
self.expectation_name = expectation_name
self.column_names = column_names
self.description = description
Expand Down
6 changes: 5 additions & 1 deletion dataframe_expectations/core/column_expectation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, List, Optional

from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.expectation import DataFrameExpectation
Expand All @@ -23,6 +23,7 @@ def __init__(
fn_violations_pyspark: Callable,
description: str,
error_message: str,
tags: Optional[List[str]] = None,
):
"""
Template for implementing DataFrame column expectations, where a column value is tested against a
Expand All @@ -34,7 +35,10 @@ def __init__(
:param fn_violations_pyspark: Function to find violations in a PySpark DataFrame.
:param description: A description of the expectation used in logging.
:param error_message: The error message to return if the expectation fails.
:param tags: Optional tags as list of strings in "key:value" format.
Example: ["priority:high", "env:test"]
"""
super().__init__(tags=tags)
self.column_name = column_name
self.expectation_name = expectation_name
self.fn_violations_pandas = fn_violations_pandas
Expand Down
51 changes: 34 additions & 17 deletions dataframe_expectations/core/expectation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import cast
from typing import List, Optional, cast

from pandas import DataFrame as PandasDataFrame
from pyspark.sql import DataFrame as PySparkDataFrame
Expand All @@ -12,6 +12,7 @@
PySparkConnectDataFrame = None # type: ignore[misc,assignment]

from dataframe_expectations.core.types import DataFrameLike, DataFrameType
from dataframe_expectations.core.tagging import TagSet
from dataframe_expectations.result_message import (
DataFrameExpectationResultMessage,
)
Expand All @@ -22,6 +23,20 @@ class DataFrameExpectation(ABC):
Base class for DataFrame expectations.
"""

def __init__(self, tags: Optional[List[str]] = None):
"""
Initialize the base expectation with optional tags.
:param tags: Optional tags as list of strings in "key:value" format.
Example: ["priority:high", "env:test"]
"""
self.__tags = TagSet(tags)

def get_tags(self) -> TagSet:
"""
Returns the tags for this expectation.
"""
return self.__tags

def get_expectation_name(self) -> str:
"""
Returns the class name as the expectation name.
Expand All @@ -48,29 +63,31 @@ def infer_data_frame_type(cls, data_frame: DataFrameLike) -> DataFrameType:
"""
Infer the DataFrame type based on the provided DataFrame.
"""
if isinstance(data_frame, PandasDataFrame):
return DataFrameType.PANDAS
elif isinstance(data_frame, PySparkDataFrame):
return DataFrameType.PYSPARK
elif PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
else:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")
match data_frame:
case PandasDataFrame():
return DataFrameType.PANDAS
case PySparkDataFrame():
return DataFrameType.PYSPARK
case _ if PySparkConnectDataFrame is not None and isinstance(
data_frame, PySparkConnectDataFrame
):
return DataFrameType.PYSPARK
case _:
raise ValueError(f"Unsupported DataFrame type: {type(data_frame)}")

def validate(self, data_frame: DataFrameLike, **kwargs):
"""
Validate the DataFrame against the expectation.
"""
data_frame_type = self.infer_data_frame_type(data_frame)

if data_frame_type == DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
elif data_frame_type == DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
else:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")
match data_frame_type:
case DataFrameType.PANDAS:
return self.validate_pandas(data_frame=data_frame, **kwargs)
case DataFrameType.PYSPARK:
return self.validate_pyspark(data_frame=data_frame, **kwargs)
case _:
raise ValueError(f"Unsupported DataFrame type: {data_frame_type}")

@abstractmethod
def validate_pandas(
Expand Down
Loading
Loading