Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e642b86
Adding decoratory factory to validate_schema to make it work both as …
kunaljubce Mar 29, 2024
3188b54
Fix to execute the validation when func is called and replaced the ol…
kunaljubce Mar 29, 2024
a353a69
Changes to tests to conform to new validate_schema definition
kunaljubce Mar 29, 2024
85cc47a
Updating README description for validate_schema
kunaljubce Mar 29, 2024
e38ba8e
README fix
kunaljubce Mar 29, 2024
e58ccdd
Improved documentation in README
kunaljubce Mar 29, 2024
fe843b5
Added success msg to be printed in case df schema matches the require…
kunaljubce Mar 29, 2024
a964e16
Added a uncommitted directory for developers to store their scripts o…
kunaljubce Mar 29, 2024
c856f79
Minor README documentation update
kunaljubce Mar 29, 2024
151dcc2
Moved uncommitted folder
kunaljubce Mar 29, 2024
b9a6d08
Removing uncommitted dir
kunaljubce Mar 29, 2024
7c8ae16
update column extension function names and desc in readme
Jul 12, 2024
e9f8948
Merge pull request #240 from fatemetardasti96/main
SemyonSinchenko Jul 12, 2024
21d87e5
Static type error fixes
kunaljubce Jul 16, 2024
7ab9a42
Resolved merge conflicts
kunaljubce Jul 16, 2024
1d33b91
Changed _df param name to df_to_be_validated and associated tests cha…
kunaljubce Jul 16, 2024
2fca007
README changes for _df change
kunaljubce Jul 16, 2024
c4cc8af
Remove the print_athena_create_table function
nijanthanvijayakumar Jul 10, 2024
5faadae
Remove deprecated functions exists and forall
nijanthanvijayakumar Jul 8, 2024
0823158
Remove imported and unused Callable module to avoid ruff lint failure
nijanthanvijayakumar Jul 8, 2024
a9b040f
Drop Spark-2 support and update dependencies
SemyonSinchenko Jul 14, 2024
60c7fb7
Update linting CI
SemyonSinchenko Jul 14, 2024
5b545ef
Fix typo in CI
SemyonSinchenko Jul 14, 2024
e505a21
Fix failed tests
SemyonSinchenko Jul 14, 2024
7802545
Updates from review
SemyonSinchenko Jul 14, 2024
e6ee244
Create the first-version of files for Spark-Connect tests
nijanthanvijayakumar Jul 14, 2024
dbd3f66
Address the fixtures issue in the test file
nijanthanvijayakumar Jul 15, 2024
3ef4219
Update the CI workflow to initiate the sparkconnect test on the 1.0
nijanthanvijayakumar Jul 15, 2024
b1573b4
Update the poetry & pyproject with the dependencies for Spark-Connect
nijanthanvijayakumar Jul 15, 2024
fc85013
Update the CI workflow to run Spark-Connect tests only for v3.4+
nijanthanvijayakumar Jul 15, 2024
3e8776a
Update the script to check if Spark-Connect server is running or not
nijanthanvijayakumar Jul 15, 2024
8f76b0c
Remove the spark-connect server run check
nijanthanvijayakumar Jul 15, 2024
0fb197e
Update workflows & pytest to choose the Sparksession instance based o…
nijanthanvijayakumar Jul 15, 2024
b413920
Add a TODO statement so that the spark-connect server check can be ad…
nijanthanvijayakumar Jul 15, 2024
f3cf717
Remove the 1.0 planning branch for the CI file
nijanthanvijayakumar Jul 15, 2024
0ab7493
Attribute the original script that inspired this
nijanthanvijayakumar Jul 15, 2024
3c669fc
Mark recently added deps as optional for Spark-Classic
nijanthanvijayakumar Jul 15, 2024
f62185f
Rename the spark-classic to connect & update makefile to install thes…
nijanthanvijayakumar Jul 15, 2024
93f39d1
update column extension function names and desc in readme
Jul 12, 2024
b9926fd
add acknowledgement
fpgmaas Jul 15, 2024
74545c3
Fix the linting issues in the linting CI workflow
nijanthanvijayakumar Jul 15, 2024
943918a
remove .python-version
fpgmaas Jul 15, 2024
0a71190
apply hotfix
fpgmaas Jul 15, 2024
ad45d31
run lint also on pr
fpgmaas Jul 15, 2024
f04ab78
update column extension function names and desc in readme
Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,28 @@ quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"])

**validate_schema()**

Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`.
Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`. By default, `ignore_nullable` is set to False, so exception will be raised even if column names and data types are matching but nullability conditions are mismatching.

```python
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, _df=source_df)
```

You can also set `ignore_nullable` to True, so the validation will happen only on column names and data types, not on nullability.

```python
quinn.validate_schema(required_schema, ignore_nullable=True, _df=source_df)
```

> [!TIP]
> This function can also be used as a decorator to other functions that return a dataframe. This can help validate the schema of the returned df. When used as a decorator, you don't need to pass the `_df` argument as this validation is performed on the df returned by the base function on which the decorator is applied.
>
> ```python
> @quinn.validate_schema(required_schema, ignore_nullable=True)
> def get_df():
> return df
> ```


**validate_absence_of_columns()**

Raises an exception if `source_df` contains `age` or `cool` columns.
Expand Down
54 changes: 36 additions & 18 deletions quinn/dataframe_validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
from __future__ import annotations # noqa: I001

import copy
from typing import TYPE_CHECKING
from typing import Any, Callable, TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import DataFrame
Expand Down Expand Up @@ -37,40 +37,58 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: list[str]) -
if missing_col_names:
raise DataFrameMissingColumnError(error_message)


def validate_schema(
df: DataFrame,
required_schema: StructType,
ignore_nullable: bool = False,
) -> None:
_df: DataFrame = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using private variables (I mean _df) naming convention for a public API (I mean function arguments)?

Copy link
Contributor Author

@kunaljubce kunaljubce Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duh! Sorry, I meant to change this and completely forgot. Let me fix this.

Meanwhile, can we not have ruff-format as one of the pre-commit hooks? First - It's experimental and called out so; second - it seems to be reformatting a whole lot of files which are not part of this PR when I run it on local.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko Renamed _df to df_to_be_validated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpgmaas May you take a look, please?

Copy link

@fpgmaas fpgmaas Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kunaljubce In the CI/CD pipeline I see that only 1 file is improperly formatted. I see your .pre-commit-config.yaml contains unstaged changes, my guess that is causing your issue. Why is that changed, and what does the pre-commit-config.yaml look like?

Copy link
Contributor Author

@kunaljubce kunaljubce Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpgmaas The unstaged changes are because I added - id: ruff-format to my pre-commit-config.yaml. Here's a screen recording of my changes and pre-commit succeeding before ruff-format v/s pre-commit failing and fixing 9 files after ruff-format - https://ufile.io/792yfg0c

I could not upload the video here itself due to size restrictions.

Copy link

@fpgmaas fpgmaas Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should edit that file manually, did you try pulling or rebasing on top of planning-1.0-release?

If I look at your branch, you are still using an outdated version of ruff, see here. That is likely causing the issue you are seeing. In order to prevent that you have to update your branch with the changes on planning-1.0-release from this repo. e.g.

git fetch upstream
git rebase -i upstream/planning-1.0-release

Copy link
Contributor Author

@kunaljubce kunaljubce Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpgmaas That was my mistake. Out of instinct, I rebased this branch with main and now retrying to rebase it with planning-1.0-release has corrupted this PR. 😭

Closing this and reopening a fresh PR - #255.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kunaljubce Ah, that explains! Understandable mistake :) Good you were able to figure it out.

) -> Callable[[Any, Any], Any]:
"""Function that validate if a given DataFrame has a given StructType as its schema.
Implemented as a decorator factory so can be used both as a standalone function or as
a decorator to another function.

:param df: DataFrame to validate
:type df: DataFrame
:param required_schema: StructType required for the DataFrame
:type required_schema: StructType
:param ignore_nullable: (Optional) A flag for if nullable fields should be
ignored during validation
:type ignore_nullable: bool, optional
:param _df: DataFrame to validate, mandatory when called as a function. Not required
when called as a decorator
:type _df: DataFrame

:raises DataFrameMissingStructFieldError: if any StructFields from the required
schema are not included in the DataFrame schema
"""
_all_struct_fields = copy.deepcopy(df.schema)
_required_schema = copy.deepcopy(required_schema)

if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None
def decorator(func: Callable[..., DataFrame]) -> Callable[..., DataFrame]:
def wrapper(*args: object, **kwargs: object) -> DataFrame:
dataframe = func(*args, **kwargs)
_all_struct_fields = copy.deepcopy(dataframe.schema)
_required_schema = copy.deepcopy(required_schema)

if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None

for x in _required_schema:
x.nullable = None

missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
error_message = f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}" # noqa: E501

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)

print("Success! DataFrame matches the required schema!")

for x in _required_schema:
x.nullable = None
return dataframe
return wrapper

missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
error_message = f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}"
if _df is None:
# This means the function is being used as a decorator
return decorator

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)
# This means the function is being called directly with a DataFrame
return decorator(lambda: _df)()


def validate_absence_of_columns(df: DataFrame, prohibited_col_names: list[str]) -> None:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def it_raises_when_struct_field_is_missing1():
]
)
with pytest.raises(quinn.DataFrameMissingStructFieldError) as excinfo:
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, _df=source_df)

current_spark_version = semver.Version.parse(spark.version)
spark_330 = semver.Version.parse("3.3.0")
Expand All @@ -53,7 +53,7 @@ def it_does_nothing_when_the_schema_matches():
StructField("age", LongType(), True),
]
)
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, _df=source_df)

def nullable_column_mismatches_are_ignored():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
Expand All @@ -64,7 +64,7 @@ def nullable_column_mismatches_are_ignored():
StructField("age", LongType(), False),
]
)
quinn.validate_schema(source_df, required_schema, ignore_nullable=True)
quinn.validate_schema(required_schema, ignore_nullable=True, _df=source_df)


def describe_validate_absence_of_columns():
Expand Down