Skip to content

Commit 75a1830

Browse files
committed
[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
## What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: * IDF * Imputer * Interaction * MaxAbsScaler * MinHashLSH * MinMaxScaler * NGram ## How was this patch tested? It is a bunch of tests! Author: Joseph K. Bradley <[email protected]> Closes apache#20964 from jkbradley/SPARK-22883-part2.
1 parent 3cb8204 commit 75a1830

File tree

7 files changed

+89
-57
lines changed

7 files changed

+89
-57
lines changed

mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717

1818
package org.apache.spark.ml.feature
1919

20-
import org.apache.spark.SparkFunSuite
2120
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
2221
import org.apache.spark.ml.param.ParamsSuite
23-
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
22+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
2423
import org.apache.spark.ml.util.TestingUtils._
2524
import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
2625
import org.apache.spark.mllib.linalg.VectorImplicits._
27-
import org.apache.spark.mllib.util.MLlibTestSparkContext
2826
import org.apache.spark.sql.Row
2927

30-
class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
28+
class IDFSuite extends MLTest with DefaultReadWriteTest {
3129

3230
import testImplicits._
3331

@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
5755
Vectors.dense(0.0, 1.0, 2.0, 3.0),
5856
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
5957
)
60-
val numOfData = data.size
58+
val numOfData = data.length
6159
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
6260
math.log((numOfData + 1.0) / (x + 1.0))
6361
})
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
7270

7371
MLTestingUtils.checkCopyAndUids(idfEst, idfModel)
7472

75-
idfModel.transform(df).select("idfValue", "expected").collect().foreach {
73+
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
7674
case Row(x: Vector, y: Vector) =>
7775
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
7876
}
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
8583
Vectors.dense(0.0, 1.0, 2.0, 3.0),
8684
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
8785
)
88-
val numOfData = data.size
86+
val numOfData = data.length
8987
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
9088
if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
9189
})
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
9997
.setMinDocFreq(1)
10098
.fit(df)
10199

102-
idfModel.transform(df).select("idfValue", "expected").collect().foreach {
100+
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
103101
case Row(x: Vector, y: Vector) =>
104102
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
105103
}

mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
*/
1717
package org.apache.spark.ml.feature
1818

19-
import org.apache.spark.{SparkException, SparkFunSuite}
20-
import org.apache.spark.ml.util.DefaultReadWriteTest
21-
import org.apache.spark.mllib.util.MLlibTestSparkContext
19+
import org.apache.spark.SparkException
20+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
2221
import org.apache.spark.mllib.util.TestingUtils._
2322
import org.apache.spark.sql.{DataFrame, Row}
2423

25-
class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
24+
class ImputerSuite extends MLTest with DefaultReadWriteTest {
2625

2726
test("Imputer for Double with default missing Value NaN") {
2827
val df = spark.createDataFrame( Seq(
@@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default
7675
ImputerSuite.iterateStrategyTest(imputer, df)
7776
}
7877

78+
test("Imputer should work with Structured Streaming") {
79+
val localSpark = spark
80+
import localSpark.implicits._
81+
val df = Seq[(java.lang.Double, Double)](
82+
(4.0, 4.0),
83+
(10.0, 10.0),
84+
(10.0, 10.0),
85+
(Double.NaN, 8.0),
86+
(null, 8.0)
87+
).toDF("value", "expected_mean_value")
88+
val imputer = new Imputer()
89+
.setInputCols(Array("value"))
90+
.setOutputCols(Array("out"))
91+
.setStrategy("mean")
92+
val model = imputer.fit(df)
93+
testTransformer[(java.lang.Double, Double)](df, model, "expected_mean_value", "out") {
94+
case Row(exp: java.lang.Double, out: Double) =>
95+
assert((exp.isNaN && out.isNaN) || (exp == out),
96+
s"Imputed values differ. Expected: $exp, actual: $out")
97+
}
98+
}
99+
79100
test("Imputer throws exception when surrogate cannot be computed") {
80101
val df = spark.createDataFrame( Seq(
81102
(0, Double.NaN, 1.0, 1.0),
@@ -164,8 +185,6 @@ object ImputerSuite {
164185
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median"
165186
*/
166187
def iterateStrategyTest(imputer: Imputer, df: DataFrame): Unit = {
167-
val inputCols = imputer.getInputCols
168-
169188
Seq("mean", "median").foreach { strategy =>
170189
imputer.setStrategy(strategy)
171190
val model = imputer.fit(df)

mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ package org.apache.spark.ml.feature
1919

2020
import scala.collection.mutable.ArrayBuilder
2121

22-
import org.apache.spark.{SparkException, SparkFunSuite}
22+
import org.apache.spark.SparkException
2323
import org.apache.spark.ml.attribute._
2424
import org.apache.spark.ml.linalg.{Vector, Vectors}
2525
import org.apache.spark.ml.param.ParamsSuite
26-
import org.apache.spark.ml.util.DefaultReadWriteTest
27-
import org.apache.spark.mllib.util.MLlibTestSparkContext
26+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
27+
import org.apache.spark.sql.Row
2828
import org.apache.spark.sql.functions.col
2929

30-
class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
30+
class InteractionSuite extends MLTest with DefaultReadWriteTest {
3131

3232
import testImplicits._
3333

@@ -63,24 +63,25 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
6363

6464
test("numeric interaction") {
6565
val data = Seq(
66-
(2, Vectors.dense(3.0, 4.0)),
67-
(1, Vectors.dense(1.0, 5.0))
68-
).toDF("a", "b")
66+
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
67+
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
68+
).toDF("a", "b", "expected")
6969
val groupAttr = new AttributeGroup(
7070
"b",
7171
Array[Attribute](
7272
NumericAttribute.defaultAttr.withName("foo"),
7373
NumericAttribute.defaultAttr.withName("bar")))
7474
val df = data.select(
7575
col("a").as("a", NumericAttribute.defaultAttr.toMetadata()),
76-
col("b").as("b", groupAttr.toMetadata()))
76+
col("b").as("b", groupAttr.toMetadata()),
77+
col("expected"))
7778
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
79+
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
80+
case Row(features: Vector, expected: Vector) =>
81+
assert(features === expected)
82+
}
83+
7884
val res = trans.transform(df)
79-
val expected = Seq(
80-
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
81-
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
82-
).toDF("a", "b", "features")
83-
assert(res.collect() === expected.collect())
8485
val attrs = AttributeGroup.fromStructField(res.schema("features"))
8586
val expectedAttrs = new AttributeGroup(
8687
"features",
@@ -92,9 +93,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
9293

9394
test("nominal interaction") {
9495
val data = Seq(
95-
(2, Vectors.dense(3.0, 4.0)),
96-
(1, Vectors.dense(1.0, 5.0))
97-
).toDF("a", "b")
96+
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
97+
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
98+
).toDF("a", "b", "expected")
9899
val groupAttr = new AttributeGroup(
99100
"b",
100101
Array[Attribute](
@@ -103,14 +104,15 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
103104
val df = data.select(
104105
col("a").as(
105106
"a", NominalAttribute.defaultAttr.withValues(Array("up", "down", "left")).toMetadata()),
106-
col("b").as("b", groupAttr.toMetadata()))
107+
col("b").as("b", groupAttr.toMetadata()),
108+
col("expected"))
107109
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
110+
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
111+
case Row(features: Vector, expected: Vector) =>
112+
assert(features === expected)
113+
}
114+
108115
val res = trans.transform(df)
109-
val expected = Seq(
110-
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
111-
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
112-
).toDF("a", "b", "features")
113-
assert(res.collect() === expected.collect())
114116
val attrs = AttributeGroup.fromStructField(res.schema("features"))
115117
val expectedAttrs = new AttributeGroup(
116118
"features",

mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
package org.apache.spark.ml.feature
1819

19-
import org.apache.spark.SparkFunSuite
2020
import org.apache.spark.ml.linalg.{Vector, Vectors}
21-
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
22-
import org.apache.spark.mllib.util.MLlibTestSparkContext
21+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
2322
import org.apache.spark.sql.Row
2423

25-
class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
24+
class MaxAbsScalerSuite extends MLTest with DefaultReadWriteTest {
2625

2726
import testImplicits._
2827

@@ -45,9 +44,10 @@ class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
4544
.setOutputCol("scaled")
4645

4746
val model = scaler.fit(df)
48-
model.transform(df).select("expected", "scaled").collect()
49-
.foreach { case Row(vector1: Vector, vector2: Vector) =>
50-
assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1")
47+
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
48+
case Row(expectedVec: Vector, actualVec: Vector) =>
49+
assert(expectedVec === actualVec,
50+
s"MaxAbsScaler error: Expected $expectedVec but computed $actualVec")
5151
}
5252

5353
MLTestingUtils.checkCopyAndUids(scaler, model)

mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.ml.feature
1919

20-
import org.apache.spark.SparkFunSuite
2120
import org.apache.spark.ml.linalg.{Vector, Vectors}
2221
import org.apache.spark.ml.param.ParamsSuite
23-
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
24-
import org.apache.spark.mllib.util.MLlibTestSparkContext
25-
import org.apache.spark.sql.Dataset
22+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
23+
import org.apache.spark.sql.{Dataset, Row}
2624

27-
class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
25+
26+
class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {
2827

2928
@transient var dataset: Dataset[_] = _
3029

@@ -175,4 +174,20 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
175174
assert(precision == 1.0)
176175
assert(recall >= 0.7)
177176
}
177+
178+
test("MinHashLSHModel.transform should work with Structured Streaming") {
179+
val localSpark = spark
180+
import localSpark.implicits._
181+
182+
val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
183+
model.set(model.inputCol, "keys")
184+
testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) {
185+
case Row(_: Vector, output: Seq[_]) =>
186+
assert(output.length === model.randCoefficients.length)
187+
// no AND-amplification yet: SPARK-18450, so each hash output is of length 1
188+
output.foreach {
189+
case hashOutput: Vector => assert(hashOutput.size === 1)
190+
}
191+
}
192+
}
178193
}

mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.spark.ml.feature
1919

20-
import org.apache.spark.SparkFunSuite
2120
import org.apache.spark.ml.linalg.{Vector, Vectors}
22-
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
23-
import org.apache.spark.mllib.util.MLlibTestSparkContext
21+
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
2422
import org.apache.spark.sql.Row
2523

26-
class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
24+
class MinMaxScalerSuite extends MLTest with DefaultReadWriteTest {
2725

2826
import testImplicits._
2927

@@ -48,9 +46,9 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
4846
.setMax(5)
4947

5048
val model = scaler.fit(df)
51-
model.transform(df).select("expected", "scaled").collect()
52-
.foreach { case Row(vector1: Vector, vector2: Vector) =>
53-
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
49+
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
50+
case Row(vector1: Vector, vector2: Vector) =>
51+
assert(vector1 === vector2, "Transformed vector is different with expected.")
5452
}
5553

5654
MLTestingUtils.checkCopyAndUids(scaler, model)
@@ -114,7 +112,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
114112
val model = scaler.fit(df)
115113
model.transform(df).select("expected", "scaled").collect()
116114
.foreach { case Row(vector1: Vector, vector2: Vector) =>
117-
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
115+
assert(vector1 === vector2, "Transformed vector is different with expected.")
118116
}
119117
}
120118
}

mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {
8484

8585
def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
8686
testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") {
87-
case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
87+
case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
8888
assert(actualNGrams === wantedNGrams)
8989
}
9090
}

0 commit comments

Comments
 (0)