-
Notifications
You must be signed in to change notification settings - Fork 67
Drunken Data Quality 4.0.0
The core concepts in DDQ are checks, constraints, reporters and the runner.
A check contains a set of constraints for one data frame. Each constraint checks a specific property related to data quality on this data frame. By passing a set of checks to a runner, it executes them and hands them over to a set of reporters.
To create a check, it is sufficient to specify the data frame you would like to work with.
Check(dataFrame: DataFrame)There are more options available for advanced users:
- You can specify a display name, which will be used instead of the default string representation in reports.
- Also you can select a cache method (default is
MEMORY_ONLY). Caching the data frame makes sense if you execute a lot of different checks on it and it is not cached already. If you don't want to cache, passNone. - If required, you can specify a set of constraints upfront, if you don't want to add them using the fluent interface.
- Also it is possible to pick an ID instead of a randomly generated one. The ID is used in sophisticated reporters to identify a check. Don't change it if you don't need to.
Check(
dataFrame: DataFrame,
displayName: Option[String] = Option.empty,
cacheMethod: Option[StorageLevel] = Check.defaultCacheMethod,
constraints: Seq[Constraint] = Seq.empty,
id: String = UUID.randomUUID.toString
)A check offers multiple constraints to verify. You can add them using a fluent interface.
Check whether the given constraint is satisfied. You may provide a constraint as a SQL string or a Column instance.
def satisfies(constraint: String): Check
def satisfies(constraint: Column): CheckCheck(customers).satisfies("age > 0")
Check(customers).satisfies(customers("age") > 0)Check whether the given conditional constraint is satisfied. Be aware that it might cause problems with null values, as A -> B gets translated to !A || B and comparing null to anything will always yield false.
def satisfies(conditional: (Column, Column)): CheckCheck(customers).satisfies(customers("age") > 50 -> customers("seniority") === "high")Check whether the column with the given name contains only null values.
def isAlwaysNull(columnName: String): CheckCheck(customers).isAlwaysNull("complaint")Check whether the column with the given name contains no null values.
def isNeverNull(columnName: String): CheckCheck(customers).isNeverNull("age")Check whether the column with the given name is always matching the specified regular expression.
def isMatchingRegex(columnName: String, regex: String): CheckCheck("customers").isMatchingRegex("email", "^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$")Check whether the column with the given name is always any of the specified values.
def isAnyOf(columnName: String, allowed: Set[Any]): CheckCheck(customers).isAnyOf("gender", Set("m", "f"))Check whether the column with the given name can be converted to a date using the specified date format string.
def isFormattedAsDate(columnName: String, dateFormatString: String): CheckCheck(contracts).isFormattedAsDate("signatureDate", "yyyy-MM-dd")Check whether the column with the given name can be converted to the given type.
def isConvertibleTo(columnName: String, targetType: DataType): CheckCheck(transactions).isConveritbleTo("amount", DoubleType)Check whether the table has exactly the given number of rows.
def hasNumRows(expected: (Column) -> Column): CheckCheck(clicks).hasNumRows(_ > 10)Check whether the given columns are a unique key for this table.
def hasUniqueKey(columnName: String, columnNames: String*): CheckCheck(connections).hasUniqueKey("time", "thread")Check whether the given data set is exactly equal to the checked one. The equality check is performed by first computing a distinct count on both data frames and then computing a pairwise set difference. If both differences yield an empty set, the data frames are equal.
def isEqualTo(other: DataFrame): CheckCheck(oldTable).isEqualTo(newTable)Check whether the columns with the given names define a foreign key to the specified reference table. Note that a foreign key needs to be a unique key in the reference table, which will also be checked.
def hasForeignKey(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): CheckCheck(contracts).hasForeignKey(customers, "customerId" -> "id")Check whether a join between this table and the given reference table returns any results. It will also output a percentage of matching keys between the base and the reference table.
def isJoinableWith(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): CheckCheck(contracts).isJoinableWith(customers, "customerId" -> "id")Check whether the columns in the dependent set have a functional dependency on the determinant set. This can be used to check a "foreign key" relationship in a denormalized table.
def hasFunctionalDependency(determinantSet: Seq[String], dependentSet: Seq[String]): CheckCheck(records).hasFunctionalDependency(Seq("artist.id"), Seq("artist.name", "artist.country"))In order to run a set of constraints, just execute the run method on the check. You can pass a list of reporters as well. If no reporter is passed, it will report to the console output stream using a console reporter.
def run(reporters: Reporter*): CheckResultIn order to report one or multiple check results to one or multiple reporters, use the Runner object. The runner will then execute all checks, report the results to all reporters, and return all results in a programmatic way so you can use it for other purposes (e.g. unit testing).
val check1: Check = ???
val check2: Check = ???
val reporter1: Reporter = ???
val reporter2: Reporter = ???
val result = Runner.run(Seq(check1, check2), Seq(reporter1, reporter2)The console reporter is a simple reporter meant for interactive usage (e.g. on the Spark shell). It prints checks and constraint results to the specified print stream, coloured by ANSI terminal markup.
ConsoleReporter(
stream: PrintStream = Console.out
)The markdown reporter is another simple reporter suitable for both, interactive and non-interactive usage. It prints the constraint results to the specified print stream in markdown layout. If you want to store the markdown file, you can wrap a FileOutputStream into a PrintStream.
MarkdownReporter(stream: PrintStream)The Zeppelin reporter can be used to show the results in a Zeppelin notebook note. Make sure to use exactly one ZeppelinReporter instance per note.

If you are using %pyspark, you need to pass the ZeppelinContext (z) to the ZeppelinReporter. Example:
%pyspark
from pyddq.core import Check
from pyddq.reporters import ZeppelinReporter
df = spark.createDataFrame([(1, "a"), (1, None), (3, "c")])
check = Check(df)
reporter = ZeppelinReporter(z)
check.hasUniqueKey("_1", "_2").isNeverNull("_1").run([reporter])The log4j reporter is more sophisticated. It serializes all available information about the check, the constraints and the results into a JSON string and logs it to the specified logger using the specified level.
Log4jReporter(
logger: Logger = org.apache.log4j.Logger.getLogger("DDQ"),
logLevel: Level = org.apache.log4j.Level.INFO
)If you want to collect, parse, evaluate and visualize the results, the ELK stack might be a good fit. Please refer to the reporting showcase for an example setup.