Skip to content

Commit 8733710

Browse files
gucciwangCalvin Wang
andauthored
hasPattern Support (#36)
* Refactor testing modules + testing spark3 bump * hasPattern Check support * Patch Check tests Co-authored-by: Calvin Wang <[email protected]>
1 parent 522f9bf commit 8733710

File tree

11 files changed

+197
-33
lines changed

11 files changed

+197
-33
lines changed

pydeequ/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from pydeequ.analyzers import AnalysisRunner
2020
from pydeequ.checks import Check, CheckLevel
2121

22-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3"
22+
deequ_maven_coord = "com.amazon.deequ:deequ:1.1.0_spark-2.4-scala-2.11"
23+
# deequ_maven_coord = "com.amazon.deequ:deequ:1.1.0_spark-3.0-scala-2.12"
2324
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all"
2425

2526
class PyDeequSession:

pydeequ/checks.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,13 @@ def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
557557
:param str hint: A hint that states why a constraint could have failed.
558558
:return: hasPattern self: A Check object that runs the condition on the column.
559559
"""
560-
pass
560+
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) if assertion \
561+
else getattr(self._Check, "hasPattern$default$2")()
562+
name = self._jvm.scala.Option.apply(name)
563+
hint = self._jvm.scala.Option.apply(hint)
564+
pattern_regex = self._jvm.scala.util.matching.Regex(pattern, None)
565+
self._Check = self._Check.hasPattern(column, pattern_regex, assertion_func, name, hint)
566+
return self
561567

562568
def containsCreditCardNumber(self, column, assertion=None, hint=None):
563569
"""
@@ -733,19 +739,22 @@ def isGreaterThanOrEqualTo(self, columnA, columnB, assertion=None, hint=None):
733739
self._Check = self._Check.isGreaterThanOrEqualTo(columnA, columnB, assertion_func, hint)
734740
return self
735741

736-
def isContainedIn(self, column, allowed_values):
742+
def isContainedIn(self, column, allowed_values, assertion=None, hint=None):
737743
"""
738744
Asserts that every non-null value in a column is contained in a set of predefined values
739-
740745
:param str column: Column in DataFrame to run the assertion on.
741746
:param list[str] allowed_values: A function that accepts allowed values for the column.
747+
:param lambda assertion: A function that accepts an int or float parameter.
742748
:param str hint: A hint that states why a constraint could have failed.
743749
:return: isContainedIn self: A Check object that runs the assertion on the columns.
744750
"""
745751
arr = self._spark_session.sparkContext._gateway.new_array(self._jvm.java.lang.String, len(allowed_values))
746752
for i in range(0, len(allowed_values)):
747753
arr[i] = allowed_values[i]
748-
self._Check = self._Check.isContainedIn(column, arr)
754+
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) if assertion \
755+
else getattr(self._Check, "IsOne")()
756+
hint = self._jvm.scala.Option.apply(hint)
757+
self._Check = self._Check.isContainedIn(column, arr, assertion_func, hint)
749758
return self
750759

751760
def evaluate(self, context):

tests/test_analyzers.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import unittest
22
from pyspark.sql import SparkSession, Row, DataFrame
33
from pydeequ.analyzers import *
4-
from pydeequ import PyDeequSession
4+
from pydeequ import *
55

66
class TestAnalyzers(unittest.TestCase):
77

88
@classmethod
99
def setUpClass(cls):
10-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3" # TODO: get Maven Coord from Configs
11-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1210
cls.spark = (SparkSession
1311
.builder
1412
.master('local[*]')

tests/test_anomaly_detection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
from pydeequ.anomaly_detection import *
55
from pydeequ.repository import *
66
from pydeequ.analyzers import *
7+
from pydeequ import *
78

89
class TestAnomalies(unittest.TestCase):
910
@classmethod
1011
def setUpClass(cls):
11-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3"
12-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1312
cls.spark = (SparkSession
1413
.builder
1514
.master('local[*]')

tests/test_checks.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,11 @@
22
from pyspark.sql import SparkSession, Row, DataFrame
33
from pydeequ.verification import *
44
from pydeequ.checks import *
5-
import py4j
5+
from pydeequ import *
66

77
class TestChecks(unittest.TestCase):
88
@classmethod
99
def setUpClass(cls):
10-
# TODO share spark context between test cases?
11-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3" # TODO get Maven Coord from Configs
12-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1310
cls.spark = (SparkSession
1411
.builder
1512
.master('local[*]')
@@ -23,9 +20,9 @@ def setUpClass(cls):
2320
.getOrCreate())
2421
cls.sc = cls.spark.sparkContext
2522
cls.df = cls.sc.parallelize([
26-
Row(a="foo", b=1, c=5, d=5, e=3, f=1, g='a', h=0, creditCard="5130566665286573", email="[email protected]", ssn="123-45-6789", URL="http://[email protected]:8080", boolean="true"),
27-
Row(a="bar", b=2, c=6, d=5, e=2, f=2, g='b', h=-1, creditCard="4532677117740914", email="[email protected]", ssn="123456789", URL="http://foo.com/(something)?after=parens", boolean="false"),
28-
Row(a="baz", b=3, c=None, d=5, e=1, f=1, g=None, h=2, creditCard="340145324521741", email="yourusername@example.com", ssn="000-00-0000", URL ="http://[email protected]:8080", boolean="true")]).toDF()
23+
Row(a="foo", b=1, c=5, d=5, e=3, f=1, g='a', h=0, creditCard="5130566665286573", email="[email protected]", ssn="123-45-6789", URL="http://[email protected]:8080", boolean="true"),
24+
Row(a="bar", b=2, c=6, d=5, e=2, f=2, g='b', h=-1, creditCard="4532677117740914", email="[email protected]", ssn="123456789", URL="http://foo.com/(something)?after=parens", boolean="false"),
25+
Row(a="baz", b=3, c=None, d=5, e=1, f=1, g=None, h=2, creditCard="340145324521741", email="yourusername@meow.com", ssn="000-00-0000", URL ="http://[email protected]:8080", boolean="true")]).toDF()
2926

3027
@classmethod
3128
def tearDownClass(cls):
@@ -55,6 +52,16 @@ def hasSize(self, assertion, hint = None):
5552
df = VerificationResult.checkResultsAsDataFrame(self.spark, result)
5653
return df.select('constraint_status').collect()
5754

55+
def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
56+
check = Check(self.spark, CheckLevel.Warning, "test hasPattern")
57+
58+
result = VerificationSuite(self.spark).onData(self.df) \
59+
.addCheck((check.hasPattern(column, pattern, assertion, name, hint))) \
60+
.run()
61+
62+
df = VerificationResult.checkResultsAsDataFrame(self.spark, result)
63+
return df.select('constraint_status').collect()
64+
5865
def containsCreditCardNumber(self, column, assertion=None, hint=None):
5966
check = Check(self.spark, CheckLevel.Warning, "test containsCreditCardNumber")
6067
result = VerificationSuite(self.spark).onData(self.df) \
@@ -405,6 +412,40 @@ def test_fail_hasSize(self):
405412
self.assertEqual(self.hasSize(lambda x: (x >2.0), "size of dataframe should be 3"),
406413
[Row(constraint_status='Failure')])
407414

415+
def test_hasPattern(self):
416+
self.assertEqual(self.hasPattern(column='email',
417+
pattern=r".*@meow.com",
418+
assertion=lambda x: x == 1/3),
419+
[Row(constraint_status='Success')])
420+
421+
self.assertEqual(self.hasPattern(column='creditCard',
422+
pattern=r"\(|\)|\d{16}",
423+
assertion=lambda x: x == 0.0),
424+
[Row(constraint_status='Failure')])
425+
426+
self.assertEqual(self.hasPattern(column='email',
427+
pattern=r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])""",
428+
assertion=lambda x: x == 1.0),
429+
[Row(constraint_status='Success')])
430+
431+
432+
@unittest.expectedFailure
433+
def test_fail_hasPattern(self):
434+
self.assertEqual(self.hasPattern(column='email',
435+
pattern=r".*@meow.com",
436+
assertion=lambda x: x == 2 / 3),
437+
[Row(constraint_status='Success')])
438+
439+
self.assertEqual(self.hasPattern(column='creditCard',
440+
pattern=r"\(|\)|\d{16}",
441+
assertion=lambda x: x == 1.0),
442+
[Row(constraint_status='Failure')])
443+
444+
self.assertEqual(self.hasPattern(column='email',
445+
pattern=r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])""",
446+
assertion=lambda x: x == 0.0),
447+
[Row(constraint_status='Success')])
448+
408449
def test_containsCreditCardNumber(self):
409450
self.assertEqual(self.containsCreditCardNumber("creditCard"), [Row(constraint_status='Success')])
410451
self.assertEqual(self.containsCreditCardNumber("creditCard", lambda x: x == 1.0, "All rows contain a credit card number"),
@@ -763,7 +804,7 @@ def test_fail_hasMinLength(self):
763804
[Row(constraint_status='Success')])
764805

765806
def test_hasMaxLength(self):
766-
self.assertEqual(self.hasMaxLength("email", lambda x: x == 24, "Column email has 24 characters max"),
807+
self.assertEqual(self.hasMaxLength("email", lambda x: x == 21, "Column email has 24 characters max"),
767808
[Row(constraint_status='Success')])
768809
self.assertEqual(self.hasMaxLength('email', lambda x: x == 25, "does not meet criteria"),
769810
[Row(constraint_status='Failure')])

tests/test_pandas_utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@
66
from pydeequ.profiles import ColumnProfilerRunBuilder, ColumnProfilerRunner
77
from pydeequ.verification import *
88
from pydeequ.checks import *
9-
from pydeequ import PyDeequSession
9+
from pydeequ import *
1010
from pandas import DataFrame as pandasDF
1111
import numpy as np
1212

1313
class TestPandasUtils(unittest.TestCase):
1414

1515
@classmethod
1616
def setUpClass(cls):
17-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3" # TODO: get Maven Coord from Configs
18-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1917
cls.spark = (SparkSession
2018
.builder
2119
.master('local[*]')

tests/test_profiles.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
from pydeequ.profiles import ColumnProfilerRunBuilder, ColumnProfilerRunner
33
from pydeequ.analyzers import KLLParameters
44
from pyspark.sql import SparkSession, Row
5-
5+
from pydeequ import *
66

77
class TestProfiles(unittest.TestCase):
88
@classmethod
99
def setUpClass(cls):
10-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3"
11-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1210
cls.spark = (SparkSession
1311
.builder
1412
.master('local[*]')

tests/test_repository.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
from pydeequ.repository import *
55
from pydeequ.verification import *
66
from pydeequ.checks import *
7+
from pydeequ import *
78

89

910
class TestRepository(unittest.TestCase):
1011
@classmethod
1112
def setUpClass(cls):
12-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3"
13-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1413
cls.spark = (SparkSession
1514
.builder
1615
.master('local[*]')

tests/test_scala_utils.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import unittest
22
from pydeequ.scala_utils import ScalaFunction1, ScalaFunction2
33
from pyspark.sql import SparkSession
4-
4+
from pydeequ import *
55

66
class TestScalaUtils(unittest.TestCase):
77
@classmethod
88
def setUpClass(cls):
9-
# TODO share spark context between test cases?
10-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3-rc2" # TODO get Maven Coord from Configs
11-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
9+
# deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3-rc2" # TODO This ran rc2?
1210
cls.spark = (SparkSession
1311
.builder
1412
.master('local[*]')

tests/test_suggestions.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
from pyspark.sql import SparkSession, Row, DataFrame
33
from pydeequ.suggestions import *
44
import json
5-
5+
from pydeequ import *
66

77
class TestSuggestions(unittest.TestCase):
88
@classmethod
99
def setUpClass(cls):
10-
deequ_maven_coord = "com.amazon.deequ:deequ:1.0.3"
11-
f2j_maven_coord = "net.sourceforge.f2j:arpack_combined_all" # This package is excluded because it causes an error in the SparkSession fig
1210
cls.spark = (SparkSession
1311
.builder
1412
.master('local[*]')

0 commit comments

Comments
 (0)