Skip to content

Commit f739fed

Browse files
authored
Upgraded the Deequ version to 2.0.7 (#200)
- A new Deequ version 2.0.7 was released for Spark versions 3.1 to 3.5. As part of this change, we upgrade the Deequ dependencies to 2.0.7. - The new Deequ version contains updated APIs and therefore, we change the PyDeequ code to conform to the API changes. For now, we pass in default parameters for the API changes. In a future release, we will expose the parameter changes in PyDeequ's public interface as well. - Support for older Spark versions 2.4 and 3.0 is removed. The Github workflow is also updated accordingly.
1 parent 59274a5 commit f739fed

File tree

4 files changed

+66
-93
lines changed

4 files changed

+66
-93
lines changed

.github/workflows/base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
strategy:
1313
fail-fast: false
1414
matrix:
15-
PYSPARK_VERSION: ["3.0", "3.1.3", "3.2", "3.3"]
15+
PYSPARK_VERSION: ["3.1.3", "3.2", "3.3"]
1616

1717
steps:
1818
- uses: actions/checkout@v3

pydeequ/analyzers.py

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,11 @@ def _analyzer_jvm(self):
271271
272272
:return self: access the value of the Completeness analyzer.
273273
"""
274-
return self._deequAnalyzers.Completeness(self.column, self._jvm.scala.Option.apply(self.where))
274+
return self._deequAnalyzers.Completeness(
275+
self.column,
276+
self._jvm.scala.Option.apply(self.where),
277+
self._jvm.scala.Option.apply(None)
278+
)
275279

276280

277281
class Compliance(_AnalyzerObject):
@@ -303,19 +307,13 @@ def _analyzer_jvm(self):
303307
304308
:return self
305309
"""
306-
if SPARK_VERSION == "3.3":
307-
return self._deequAnalyzers.Compliance(
308-
self.instance,
309-
self.predicate,
310-
self._jvm.scala.Option.apply(self.where),
311-
self._jvm.scala.collection.Seq.empty()
312-
)
313-
else:
314-
return self._deequAnalyzers.Compliance(
315-
self.instance,
316-
self.predicate,
317-
self._jvm.scala.Option.apply(self.where)
318-
)
310+
return self._deequAnalyzers.Compliance(
311+
self.instance,
312+
self.predicate,
313+
self._jvm.scala.Option.apply(self.where),
314+
self._jvm.scala.collection.Seq.empty(),
315+
self._jvm.scala.Option.apply(None)
316+
)
319317

320318

321319
class Correlation(_AnalyzerObject):
@@ -469,22 +467,14 @@ def _analyzer_jvm(self):
469467
"""
470468
if not self.maxDetailBins:
471469
self.maxDetailBins = getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$3")()
472-
if SPARK_VERSION == "3.3":
473-
return self._deequAnalyzers.Histogram(
474-
self.column,
475-
self._jvm.scala.Option.apply(self.binningUdf),
476-
self.maxDetailBins,
477-
self._jvm.scala.Option.apply(self.where),
478-
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(),
479-
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")()
480-
)
481-
else:
482-
return self._deequAnalyzers.Histogram(
483-
self.column,
484-
self._jvm.scala.Option.apply(self.binningUdf),
485-
self.maxDetailBins,
486-
self._jvm.scala.Option.apply(self.where)
487-
)
470+
return self._deequAnalyzers.Histogram(
471+
self.column,
472+
self._jvm.scala.Option.apply(self.binningUdf),
473+
self.maxDetailBins,
474+
self._jvm.scala.Option.apply(self.where),
475+
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(),
476+
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")()
477+
)
488478

489479

490480
class KLLParameters:
@@ -553,7 +543,9 @@ def _analyzer_jvm(self):
553543
554544
:return self
555545
"""
556-
return self._deequAnalyzers.Maximum(self.column, self._jvm.scala.Option.apply(self.where))
546+
return self._deequAnalyzers.Maximum(
547+
self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None)
548+
)
557549

558550

559551
class MaxLength(_AnalyzerObject):
@@ -575,17 +567,11 @@ def _analyzer_jvm(self):
575567
576568
:return self
577569
"""
578-
if SPARK_VERSION == "3.3":
579-
return self._deequAnalyzers.MaxLength(
580-
self.column,
581-
self._jvm.scala.Option.apply(self.where),
582-
self._jvm.scala.Option.apply(None)
583-
)
584-
else:
585-
return self._deequAnalyzers.MaxLength(
586-
self.column,
587-
self._jvm.scala.Option.apply(self.where)
588-
)
570+
return self._deequAnalyzers.MaxLength(
571+
self.column,
572+
self._jvm.scala.Option.apply(self.where),
573+
self._jvm.scala.Option.apply(None)
574+
)
589575

590576

591577
class Mean(_AnalyzerObject):
@@ -628,7 +614,9 @@ def _analyzer_jvm(self):
628614
629615
:return self
630616
"""
631-
return self._deequAnalyzers.Minimum(self.column, self._jvm.scala.Option.apply(self.where))
617+
return self._deequAnalyzers.Minimum(
618+
self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None)
619+
)
632620

633621

634622
class MinLength(_AnalyzerObject):
@@ -651,17 +639,11 @@ def _analyzer_jvm(self):
651639
652640
:return self
653641
"""
654-
if SPARK_VERSION == "3.3":
655-
return self._deequAnalyzers.MinLength(
656-
self.column,
657-
self._jvm.scala.Option.apply(self.where),
658-
self._jvm.scala.Option.apply(None)
659-
)
660-
else:
661-
return self._deequAnalyzers.MinLength(
662-
self.column,
663-
self._jvm.scala.Option.apply(self.where)
664-
)
642+
return self._deequAnalyzers.MinLength(
643+
self.column,
644+
self._jvm.scala.Option.apply(self.where),
645+
self._jvm.scala.Option.apply(None)
646+
)
665647

666648

667649
class MutualInformation(_AnalyzerObject):
@@ -725,6 +707,7 @@ def _analyzer_jvm(self):
725707
# TODO: revisit bc scala constructor does some weird implicit type casting from python str -> java list
726708
# if we don't cast it to str()
727709
self._jvm.scala.Option.apply(self.where),
710+
self._jvm.scala.Option.apply(None)
728711
)
729712

730713

@@ -814,7 +797,9 @@ def _analyzer_jvm(self):
814797
:return self
815798
"""
816799
return self._deequAnalyzers.Uniqueness(
817-
to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where)
800+
to_scala_seq(self._jvm, self.columns),
801+
self._jvm.scala.Option.apply(self.where),
802+
self._jvm.scala.Option.apply(None)
818803
)
819804

820805

@@ -839,7 +824,9 @@ def _analyzer_jvm(self):
839824
:return self
840825
"""
841826
return self._deequAnalyzers.UniqueValueRatio(
842-
to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where)
827+
to_scala_seq(self._jvm, self.columns),
828+
self._jvm.scala.Option.apply(self.where),
829+
self._jvm.scala.Option.apply(None)
843830
)
844831

845832
class DataTypeInstances(Enum):

pydeequ/checks.py

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def isComplete(self, column, hint=None):
154154
:return: isComplete self:A Check.scala object that asserts on a column completion.
155155
"""
156156
hint = self._jvm.scala.Option.apply(hint)
157-
self._Check = self._Check.isComplete(column, hint)
157+
self._Check = self._Check.isComplete(column, hint, self._jvm.scala.Option.apply(None))
158158
return self
159159

160160
def hasCompleteness(self, column, assertion, hint=None):
@@ -170,7 +170,7 @@ def hasCompleteness(self, column, assertion, hint=None):
170170
"""
171171
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
172172
hint = self._jvm.scala.Option.apply(hint)
173-
self._Check = self._Check.hasCompleteness(column, assertion_func, hint)
173+
self._Check = self._Check.hasCompleteness(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
174174
return self
175175

176176
def areComplete(self, columns, hint=None):
@@ -234,7 +234,7 @@ def isUnique(self, column, hint=None):
234234
:return: isUnique self: A Check.scala object that asserts uniqueness in the column.
235235
"""
236236
hint = self._jvm.scala.Option.apply(hint)
237-
self._Check = self._Check.isUnique(column, hint)
237+
self._Check = self._Check.isUnique(column, hint, self._jvm.scala.Option.apply(None))
238238
return self
239239

240240
def isPrimaryKey(self, column, *columns, hint=None):
@@ -297,7 +297,7 @@ def hasUniqueValueRatio(self, columns, assertion, hint=None):
297297
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
298298
hint = self._jvm.scala.Option.apply(hint)
299299
columns_seq = to_scala_seq(self._jvm, columns)
300-
self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint)
300+
self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint, self._jvm.scala.Option.apply(None))
301301
return self
302302

303303
def hasNumberOfDistinctValues(self, column, assertion, binningUdf, maxBins, hint=None):
@@ -418,11 +418,7 @@ def hasMinLength(self, column, assertion, hint=None):
418418
"""
419419
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
420420
hint = self._jvm.scala.Option.apply(hint)
421-
if SPARK_VERSION == "3.3":
422-
self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
423-
else:
424-
self._Check = self._Check.hasMinLength(column, assertion_func, hint)
425-
421+
self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
426422
return self
427423

428424
def hasMaxLength(self, column, assertion, hint=None):
@@ -437,10 +433,7 @@ def hasMaxLength(self, column, assertion, hint=None):
437433
"""
438434
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
439435
hint = self._jvm.scala.Option.apply(hint)
440-
if SPARK_VERSION == "3.3":
441-
self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
442-
else:
443-
self._Check = self._Check.hasMaxLength(column, assertion_func, hint)
436+
self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
444437
return self
445438

446439
def hasMin(self, column, assertion, hint=None):
@@ -456,7 +449,7 @@ def hasMin(self, column, assertion, hint=None):
456449
"""
457450
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
458451
hint = self._jvm.scala.Option.apply(hint)
459-
self._Check = self._Check.hasMin(column, assertion_func, hint)
452+
self._Check = self._Check.hasMin(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
460453
return self
461454

462455
def hasMax(self, column, assertion, hint=None):
@@ -472,7 +465,7 @@ def hasMax(self, column, assertion, hint=None):
472465
"""
473466
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
474467
hint = self._jvm.scala.Option.apply(hint)
475-
self._Check = self._Check.hasMax(column, assertion_func, hint)
468+
self._Check = self._Check.hasMax(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
476469
return self
477470

478471
def hasMean(self, column, assertion, hint=None):
@@ -565,21 +558,14 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None):
565558
else getattr(self._Check, "satisfies$default$3")()
566559
)
567560
hint = self._jvm.scala.Option.apply(hint)
568-
if SPARK_VERSION == "3.3":
569-
self._Check = self._Check.satisfies(
570-
columnCondition,
571-
constraintName,
572-
assertion_func,
573-
hint,
574-
self._jvm.scala.collection.Seq.empty()
575-
)
576-
else:
577-
self._Check = self._Check.satisfies(
578-
columnCondition,
579-
constraintName,
580-
assertion_func,
581-
hint
582-
)
561+
self._Check = self._Check.satisfies(
562+
columnCondition,
563+
constraintName,
564+
assertion_func,
565+
hint,
566+
self._jvm.scala.collection.Seq.empty(),
567+
self._jvm.scala.Option.apply(None)
568+
)
583569
return self
584570

585571
def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
@@ -602,7 +588,9 @@ def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
602588
name = self._jvm.scala.Option.apply(name)
603589
hint = self._jvm.scala.Option.apply(hint)
604590
pattern_regex = self._jvm.scala.util.matching.Regex(pattern, None)
605-
self._Check = self._Check.hasPattern(column, pattern_regex, assertion_func, name, hint)
591+
self._Check = self._Check.hasPattern(
592+
column, pattern_regex, assertion_func, name, hint, self._jvm.scala.Option.apply(None)
593+
)
606594
return self
607595

608596
def containsCreditCardNumber(self, column, assertion=None, hint=None):

pydeequ/configs.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@
55

66

77
SPARK_TO_DEEQU_COORD_MAPPING = {
8-
"3.3": "com.amazon.deequ:deequ:2.0.4-spark-3.3",
9-
"3.2": "com.amazon.deequ:deequ:2.0.1-spark-3.2",
10-
"3.1": "com.amazon.deequ:deequ:2.0.0-spark-3.1",
11-
"3.0": "com.amazon.deequ:deequ:1.2.2-spark-3.0",
12-
"2.4": "com.amazon.deequ:deequ:1.1.0_spark-2.4-scala-2.11",
8+
"3.3": "com.amazon.deequ:deequ:2.0.7-spark-3.3",
9+
"3.2": "com.amazon.deequ:deequ:2.0.7-spark-3.2",
10+
"3.1": "com.amazon.deequ:deequ:2.0.7-spark-3.1"
1311
}
1412

1513

0 commit comments

Comments
 (0)