forked from mrpowers-io/quinn
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_dataframe_validator.py
More file actions
84 lines (72 loc) · 3.87 KB
/
test_dataframe_validator.py
File metadata and controls
84 lines (72 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import pytest
from pyspark.sql.types import StructType, StructField, StringType, LongType
import semver
import quinn
from .spark import spark
def describe_validate_presence_of_columns():
def it_raises_if_a_required_column_is_missing():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
with pytest.raises(quinn.DataFrameMissingColumnError) as excinfo:
quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"])
assert (
excinfo.value.args[0]
== "The ['fun'] columns are not included in the DataFrame with the following columns ['name', 'age']"
)
def it_does_nothing_if_all_required_columns_are_present():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
quinn.validate_presence_of_columns(source_df, ["name"])
def describe_validate_schema():
def it_raises_when_struct_field_is_missing1():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
required_schema = StructType(
[
StructField("name", StringType(), True),
StructField("city", StringType(), True),
]
)
with pytest.raises(quinn.DataFrameMissingStructFieldError) as excinfo:
quinn.validate_schema(required_schema, _df=source_df)
current_spark_version = semver.Version.parse(spark.version)
spark_330 = semver.Version.parse("3.3.0")
if semver.Version.compare(current_spark_version, spark_330) >= 0: # Spark 3.3+
expected_error_message = "The [StructField('city', StringType(), True)] StructFields are not included in the DataFrame with the following StructFields StructType([StructField('name', StringType(), True), StructField('age', LongType(), True)])" # noqa
else:
expected_error_message = "The [StructField(city,StringType,true)] StructFields are not included in the DataFrame with the following StructFields StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))" # noqa
assert excinfo.value.args[0] == expected_error_message
def it_does_nothing_when_the_schema_matches():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
required_schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", LongType(), True),
]
)
quinn.validate_schema(required_schema, _df=source_df)
def nullable_column_mismatches_are_ignored():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
required_schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", LongType(), False),
]
)
quinn.validate_schema(required_schema, ignore_nullable=True, _df=source_df)
def describe_validate_absence_of_columns():
def it_raises_when_a_unallowed_column_is_present():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
with pytest.raises(quinn.DataFrameProhibitedColumnError) as excinfo:
quinn.validate_absence_of_columns(source_df, ["age", "cool"])
assert (
excinfo.value.args[0]
== "The ['age'] columns are not allowed to be included in the DataFrame with the following columns ['name', 'age']" # noqa
)
def it_does_nothing_when_no_unallowed_columns_are_present():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])
quinn.validate_absence_of_columns(source_df, ["favorite_color"])