Skip to content

Commit 44388e5

Browse files
authored
Support new SeparateLines Dataframe Diff output (#209)
* Changes: - add truncate option to dataframe diff output - Remove cell width as it's a duplicate of row width - as Ufansi colors don't change string length - extract cellToString method as it has better implementation when converting dataframe cell to string
1 parent caa0a2e commit 44388e5

File tree

7 files changed

+690
-67
lines changed

7 files changed

+690
-67
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ assertSmallDataFrameEquality(sourceDF, expectedDF)
6868
<p>
6969
<img src="./images/assertSmallDataFrameEquality_DatasetContentMissmatch_message.png" alt="assertSmallDataFrameEquality_DatasetContentMissmatch_message" width="500", height="200"/>
7070
</p>
71+
Or if you prefer wide dataframes
72+
```scala
73+
assertSmallDataFrameEquality(..., outputFormat = DataframeDiffOutputFormat.SeparateLines)
74+
```
75+
<p>
76+
<img src="./images/assertSmallWideDataframe.png" alt="assertSmallWideDataframe"/>
77+
</p>
7178

7279
The `assertSmallDatasetEquality` method can be used to compare two Datasets or DataFrames(Dataset).
7380
Nicely formatted error messages are displayed when the Datasets are not equal. Here is an example of content mismatch:

core/src/main/scala/com/github/mrpowers/spark/fast/tests/DataFrameComparer.scala

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,27 @@
11
package com.github.mrpowers.spark.fast.tests
22

3+
import com.github.mrpowers.spark.fast.tests.SeqLikesExtensions.SeqExtensions
34
import org.apache.spark.sql.{DataFrame, Row}
5+
import com.github.mrpowers.spark.fast.tests.DataframeDiffOutputFormat.DataframeDiffOutputFormat
6+
47
trait DataFrameComparer extends DatasetComparer {
58

69
/**
710
* Raises an error unless `actualDF` and `expectedDF` are equal
11+
* @param actualDF
12+
* \- actual dataframe
13+
* @param expectedDF
14+
* \- expected dataframe
15+
* @param ignoreNullable
16+
* \- ignore nullable parameter when matching schemas
17+
* @param ignoreColumnNames
18+
* \- ignore column names
19+
* @param orderedComparison
20+
* \- if false sorts actual and expected
21+
* @param ignoreMetadata
22+
* \- don't compare column metadata when matching schemas
23+
* @param truncate
24+
* \- truncate column if length more than specified number
825
*/
926
def assertSmallDataFrameEquality(
1027
actualDF: DataFrame,
@@ -14,18 +31,54 @@ trait DataFrameComparer extends DatasetComparer {
1431
orderedComparison: Boolean = true,
1532
ignoreColumnOrder: Boolean = false,
1633
ignoreMetadata: Boolean = true,
17-
truncate: Int = 500
34+
truncate: Int = 500,
35+
outputFormat: DataframeDiffOutputFormat = DataframeDiffOutputFormat.SideBySide
1836
): Unit = {
19-
assertSmallDatasetEquality(
20-
actualDF,
21-
expectedDF,
22-
ignoreNullable,
23-
ignoreColumnNames,
24-
orderedComparison,
25-
ignoreColumnOrder,
26-
ignoreMetadata,
27-
truncate
28-
)
37+
outputFormat match {
38+
case DataframeDiffOutputFormat.SideBySide =>
39+
assertSmallDatasetEquality(
40+
actualDF,
41+
expectedDF,
42+
ignoreNullable,
43+
ignoreColumnNames,
44+
orderedComparison,
45+
ignoreColumnOrder,
46+
ignoreMetadata,
47+
truncate
48+
)
49+
case DataframeDiffOutputFormat.SeparateLines =>
50+
SchemaComparer.assertSchemaEqual(
51+
actualDF.schema,
52+
expectedDF.schema,
53+
ignoreNullable,
54+
ignoreColumnNames,
55+
ignoreColumnOrder,
56+
ignoreMetadata
57+
)
58+
val actual = if (ignoreColumnOrder) orderColumns(actualDF, expectedDF) else actualDF
59+
if (orderedComparison)
60+
assertSmallDataFrameEquality(actual, expectedDF, truncate)
61+
else
62+
assertSmallDataFrameEquality(
63+
defaultSortDataset(actual),
64+
defaultSortDataset(expectedDF),
65+
truncate
66+
)
67+
}
68+
69+
}
70+
71+
private def assertSmallDataFrameEquality(
72+
actualDF: DataFrame,
73+
expectedDF: DataFrame,
74+
truncate: Int
75+
): Unit = {
76+
val a = actualDF.collect()
77+
val e = expectedDF.collect()
78+
if (!a.toSeq.approximateSameElements(e, (o1: Row, o2: Row) => o1.equals(o2))) {
79+
val msg = "Difference\n" ++ DataframeUtil.showDataframeDiff(a, e, actualDF.schema.fieldNames, truncate)
80+
throw DatasetContentMismatch(msg)
81+
}
2982
}
3083

3184
/**

core/src/main/scala/com/github/mrpowers/spark/fast/tests/DataFramePrettyPrint.scala

Lines changed: 62 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,7 @@ object DataFramePrettyPrint {
1818
// For cells that are beyond `truncate` characters, replace it with the
1919
// first `truncate-3` and "..."
2020
val rows: Seq[Seq[String]] = df.schema.fieldNames.toSeq +: data.map { row =>
21-
row.toSeq.map { cell =>
22-
val str = cell match {
23-
case null => "null"
24-
case binary: Array[Byte] =>
25-
binary
26-
.map("%02X".format(_))
27-
.mkString(
28-
"[",
29-
" ",
30-
"]"
31-
)
32-
case array: Array[_] =>
33-
array.mkString(
34-
"[",
35-
", ",
36-
"]"
37-
)
38-
case seq: Seq[_] =>
39-
seq.mkString(
40-
"[",
41-
", ",
42-
"]"
43-
)
44-
case d: Date =>
45-
d.toLocalDate.format(DateTimeFormatter.ISO_DATE)
46-
case r: Row =>
47-
r.schema.fieldNames
48-
.zip(r.toSeq)
49-
.map { case (k, v) =>
50-
s"$k -> $v"
51-
}
52-
.mkString(
53-
"{",
54-
", ",
55-
"}"
56-
)
57-
case _ => cell.toString
58-
}
59-
if (truncate > 0 && str.length > truncate) {
60-
// do not show ellipses for strings shorter than 4 characters.
61-
if (truncate < 4)
62-
str.substring(
63-
0,
64-
truncate
65-
)
66-
else
67-
str.substring(
68-
0,
69-
truncate - 3
70-
) + "..."
71-
} else {
72-
str
73-
}
74-
}: Seq[String]
21+
row.toSeq.map(cellToString(_, truncate)): Seq[String]
7522
}
7623

7724
val sb = new StringBuilder
@@ -160,4 +107,65 @@ object DataFramePrettyPrint {
160107
sb.toString()
161108
}
162109

110+
/**
111+
* Convert dataframe cell to string
112+
* @param cell
113+
* \- cell value
114+
* @param truncate
115+
* \-
116+
*/
117+
private[mrpowers] def cellToString(cell: Any, truncate: Int): String = {
118+
val str = cell match {
119+
case null => "null"
120+
case binary: Array[Byte] =>
121+
binary
122+
.map("%02X".format(_))
123+
.mkString(
124+
"[",
125+
" ",
126+
"]"
127+
)
128+
case array: Array[_] =>
129+
array.mkString(
130+
"[",
131+
", ",
132+
"]"
133+
)
134+
case seq: Seq[_] =>
135+
seq.mkString(
136+
"[",
137+
", ",
138+
"]"
139+
)
140+
case d: Date =>
141+
d.toLocalDate.format(DateTimeFormatter.ISO_DATE)
142+
case r: Row =>
143+
r.schema.fieldNames
144+
.zip(r.toSeq)
145+
.map { case (k, v) =>
146+
s"$k -> $v"
147+
}
148+
.mkString(
149+
"{",
150+
", ",
151+
"}"
152+
)
153+
case _ => cell.toString
154+
}
155+
if (truncate > 0 && str.length > truncate) {
156+
// do not show ellipses for strings shorter than 4 characters.
157+
if (truncate < 4)
158+
str.substring(
159+
0,
160+
truncate
161+
)
162+
else
163+
str.substring(
164+
0,
165+
truncate - 3
166+
) + "..."
167+
} else {
168+
str
169+
}
170+
}
163171
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.github.mrpowers.spark.fast.tests
2+
3+
object DataframeDiffOutputFormat extends Enumeration {
4+
type DataframeDiffOutputFormat = Value
5+
val SideBySide, SeparateLines = Value
6+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package com.github.mrpowers.spark.fast.tests
2+
3+
import com.github.mrpowers.spark.fast.tests.ufansi.Color.{DarkGray, Green, Red}
4+
import com.github.mrpowers.spark.fast.tests.ufansi.FansiExtensions.StrOps
5+
import com.github.mrpowers.spark.fast.tests.ufansi.Str
6+
import org.apache.commons.lang3.StringUtils
7+
import org.apache.spark.sql.Row
8+
9+
object DataframeUtil {
10+
private val EmptyStr = Str("")
11+
private[mrpowers] def showDataframeDiff(
12+
actual: Array[Row],
13+
expected: Array[Row],
14+
fieldNames: Array[String],
15+
truncate: Int = 20,
16+
minColWidth: Int = 3
17+
): String = {
18+
val sb = new StringBuilder
19+
val colWidths = getColWidths(fieldNames, actual.toSeq ++ expected.toSeq, truncate, minColWidth)
20+
// needed to calculate padding for other elements
21+
val largestIndexOffset = {
22+
if (actual.isEmpty && expected.isEmpty) 0
23+
else {
24+
val maxIndex = Math.max(actual.length, expected.length) - 1
25+
val largestIndex = maxIndex.toString + ":" // Largest index that is visible on the side. Like `100:`
26+
largestIndex.length + 1
27+
}
28+
}
29+
30+
val fullJoinWithEquals = actual
31+
.zipAll(expected, Row.empty, Row.empty)
32+
.map { case (actualRow, expectedRow) => (actualRow, expectedRow, actualRow.equals(expectedRow)) }
33+
val diff = fullJoinWithEquals.map { case (actualRow, expectedRow, rowsAreEqual) =>
34+
lazy val paddedActualRow = pad(actualRow.toSeq, colWidths, truncate)
35+
lazy val paddedExpectedRow = pad(expectedRow.toSeq, colWidths, truncate)
36+
if (rowsAreEqual) {
37+
List(DarkGray(paddedActualRow.mkString("|")), DarkGray(paddedActualRow.mkString("|")))
38+
} else {
39+
val actualSeq = actualRow.toSeq
40+
val expectedSeq = expectedRow.toSeq
41+
if (actualSeq.isEmpty)
42+
List(
43+
EmptyStr,
44+
Green(paddedExpectedRow.mkString("", "|", ""))
45+
)
46+
else if (expectedSeq.isEmpty)
47+
List(Red(paddedActualRow.mkString("", "|", "")), EmptyStr)
48+
else {
49+
val withEquals = actualSeq
50+
.zip(expectedSeq)
51+
.map { case (actualRowField, expectedRowField) =>
52+
(actualRowField, expectedRowField, actualRowField == expectedRowField)
53+
}
54+
val allFieldsAreNotEqual = !withEquals.exists(_._3)
55+
if (allFieldsAreNotEqual) {
56+
List(
57+
Red(paddedActualRow.mkString("", "|", "")),
58+
Green(paddedExpectedRow.mkString("", "|", ""))
59+
)
60+
} else {
61+
val coloredDiff = withEquals.zipWithIndex
62+
.map {
63+
case ((actualRowField, expectedRowField, true), i) =>
64+
val paddedActualRow = padAny(actualRowField, colWidths(i), truncate)
65+
val paddedExpected = padAny(expectedRowField, colWidths(i), truncate)
66+
(DarkGray(paddedActualRow), DarkGray(paddedExpected))
67+
case ((actualRowField, expectedRowField, false), i) =>
68+
val paddedActualRow = padAny(actualRowField, colWidths(i), truncate)
69+
val paddedExpected = padAny(expectedRowField, colWidths(i), truncate)
70+
(Red(paddedActualRow), Green(paddedExpected))
71+
}
72+
val start = DarkGray("")
73+
val sep = DarkGray("|")
74+
val end = DarkGray("")
75+
List(
76+
coloredDiff.map(_._1).mkStr(start, sep, end),
77+
coloredDiff.map(_._2).mkStr(start, sep, end)
78+
)
79+
}
80+
}
81+
}
82+
}
83+
84+
val headerWithLeftPadding = pad(fieldNames, colWidths, truncate)
85+
val headerFields = List(headerWithLeftPadding.mkString("|"))
86+
87+
val separatorLine: String =
88+
colWidths
89+
.map("-" * _)
90+
.mkString(StringUtils.leftPad("+", largestIndexOffset), "+", "+\n")
91+
92+
sb.append(separatorLine)
93+
94+
headerFields
95+
.zip(colWidths)
96+
.map { case (cell, colWidth) =>
97+
StringUtils.leftPad(cell, colWidth)
98+
}
99+
.addString(sb, StringUtils.leftPad("|", largestIndexOffset), "|", "|\n")
100+
diff.zipWithIndex.foreach { case (actual :: expected :: Nil, i) =>
101+
def appendRow(row: Str, i: Int): Unit = {
102+
if (row.length > 0) {
103+
val indexString = StringUtils.leftPad(s"${i + 1}:|", largestIndexOffset)
104+
sb.append(indexString)
105+
sb.append(row)
106+
sb.append(s"|:${i + 1}\n")
107+
}
108+
}
109+
appendRow(actual, i)
110+
val rowsAreDifferent = !fullJoinWithEquals(i)._3
111+
if (rowsAreDifferent) {
112+
appendRow(expected, i)
113+
if (i < diff.length - 1)
114+
sb.append(separatorLine)
115+
} else if (i < diff.length - 1 && !fullJoinWithEquals(i + 1)._3) { // if current rows are equal and next ones are not
116+
sb.append(separatorLine)
117+
}
118+
}
119+
sb.append(separatorLine).toString()
120+
}
121+
122+
private def pad(items: Seq[Any], colWidths: Array[Int], truncateColumnLen: Int): Seq[String] =
123+
items.zip(colWidths).map { case (v, colWidth) => padAny(v, colWidth, truncateColumnLen) }
124+
125+
private def padAny(s: Any, width: Int, truncateColumnLen: Int) = {
126+
StringUtils.leftPad(DataFramePrettyPrint.cellToString(s, truncateColumnLen), width)
127+
}
128+
129+
private def getColWidths(fields: Array[String], rows: Seq[Row], truncate: Int, minColWidth: Int) = {
130+
val numCols = if (rows.isEmpty) 0 else rows.map(_.size).max
131+
// Initialise the width of each column to a minimum value
132+
val colWidths = Array.fill(numCols)(minColWidth)
133+
for ((cell, i) <- fields.zipWithIndex) {
134+
colWidths(i) = math.max(colWidths(i), DataFramePrettyPrint.cellToString(cell, truncate).length)
135+
}
136+
// Compute the width of each column
137+
for (row <- rows)
138+
for ((cell, i) <- row.toSeq.zipWithIndex)
139+
colWidths(i) = math.max(colWidths(i), DataFramePrettyPrint.cellToString(cell, truncate).length)
140+
colWidths
141+
}
142+
143+
}

0 commit comments

Comments
 (0)