Skip to content

Commit 2978f8b

Browse files
fix DescribeDeltaHistorySuite
1 parent 3cdeed7 commit 2978f8b

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMetricsBase.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ trait MergeIntoMetricsBase
237237
mergeCmdFn: MergeCmd,
238238
expectedOpMetrics: Map[String, Int],
239239
testConfig: MergeTestConfiguration,
240-
overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty
240+
overrideExpectedOpMetrics: Seq[((Boolean, Boolean), (String, Int))] = Seq.empty,
241+
customMetricsChecker: Option[Map[String, String] => Unit] = None
241242
): Unit = {
242243
withSQLConf(
243244
DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true",
@@ -307,6 +308,8 @@ trait MergeIntoMetricsBase
307308
checkOperationTimeMetricsInvariant(mergeTimeMetrics, operationMetrics)
308309
// Check CDF metrics invariants.
309310
checkMergeOperationCdfMetricsInvariants(operationMetrics, testConfig.cdfEnabled)
311+
// Perform custom checks on operationMetrics.
312+
customMetricsChecker.map(f => f(operationMetrics))
310313
}
311314
}
312315
}
@@ -1002,7 +1005,6 @@ trait MergeIntoMetricsBase
10021005
"numTargetRowsMatchedDeleted" -> 50,
10031006
"numTargetRowsRemoved" -> -1,
10041007
"numTargetRowsCopied" -> 10,
1005-
"numTargetFilesAdded" -> 2,
10061008
"numTargetFilesRemoved" -> 3
10071009
)
10081010
runMergeCmdAndTestMetrics(
@@ -1011,13 +1013,20 @@ trait MergeIntoMetricsBase
10111013
mergeCmdFn = mergeCmdFn,
10121014
expectedOpMetrics = expectedOpMetrics,
10131015
testConfig = testConfig,
1014-
// When cdf=true in this test we hit the corner case where there are duplicate matches with a
1015-
// delete clause and we generate duplicate cdc data. This is further detailed in
1016-
// MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for this
1017-
// scenario includes deduplicating the output data which reshuffles the output data.
1018-
// Thus when the table is not partitioned, the data is rewritten into 1 new file rather than 2
1019-
overrideExpectedOpMetrics = Seq(((false, true), ("numTargetFilesAdded", 1)))
1020-
)
1016+
customMetricsChecker = Some(operationMetrics => {
1017+
val metricValue = operationMetrics("numTargetFilesAdded")
1018+
(testConfig.partitioned, testConfig.cdfEnabled) match {
1019+
// When cdf=true in this test we hit the corner case where there are duplicate matches
1020+
// with a delete clause and we generate duplicate cdc data. This is further detailed in
1021+
// MergeIntoCommand at the definition of isDeleteWithDuplicateMatchesAndCdc. Our fix for
1022+
// this scenario includes deduplicating the output data which reshuffles the output data.
1023+
// Thus when the table is not partitioned, the data is rewritten into 1 new file rather
1024+
// than 2.
1025+
case (false, true) => assert(metricValue == "1")
1026+
// Depending on the Spark version, for non-partitioned tables we may add 1 or 2 files.
1027+
case (false, false) => assert(metricValue == "1" || metricValue == "2")
1028+
case _ => assert(metricValue == "2")
1029+
}}))
10211030
}}
10221031

10231032
/////////////////////////////

0 commit comments

Comments
 (0)