Skip to content

Commit 0461482

Browse files
WeichenXu123jkbradley
authored andcommitted
[SPARK-21088][ML] CrossValidator, TrainValidationSplit support collect all models when fitting: Python API
## What changes were proposed in this pull request? Add python API for collecting sub-models during CrossValidator/TrainValidationSplit fitting. ## How was this patch tested? UT added. Author: WeichenXu <[email protected]> Closes apache#19627 from WeichenXu123/expose-model-list-py.
1 parent 5003736 commit 0461482

File tree

7 files changed

+211
-29
lines changed

7 files changed

+211
-29
lines changed

mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,17 @@ class CrossValidatorModel private[ml] (
270270
this
271271
}
272272

273+
// A Python-friendly auxiliary method
274+
private[tuning] def setSubModels(subModels: JList[JList[Model[_]]])
275+
: CrossValidatorModel = {
276+
_subModels = if (subModels != null) {
277+
Some(subModels.asScala.toArray.map(_.asScala.toArray))
278+
} else {
279+
None
280+
}
281+
this
282+
}
283+
273284
/**
274285
* @return submodels represented in two dimension array. The index of outer array is the
275286
* fold index, and the index of inner array corresponds to the ordering of

mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,17 @@ class TrainValidationSplitModel private[ml] (
262262
this
263263
}
264264

265+
// A Python-friendly auxiliary method
266+
private[tuning] def setSubModels(subModels: JList[Model[_]])
267+
: TrainValidationSplitModel = {
268+
_subModels = if (subModels != null) {
269+
Some(subModels.asScala.toArray)
270+
} else {
271+
None
272+
}
273+
this
274+
}
275+
265276
/**
266277
* @return submodels represented in array. The index of array corresponds to the ordering of
267278
* estimatorParamMaps

python/pyspark/ml/param/_shared_params_code_gen.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ def get$Name(self):
157157
"TypeConverters.toInt"),
158158
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
159159
"1", "TypeConverters.toInt"),
160+
("collectSubModels", "Param for whether to collect a list of sub-models trained during " +
161+
"tuning. If set to false, then only the single best sub-model will be available after " +
162+
"fitting. If set to true, then all sub-models will be available. Warning: For large " +
163+
"models, collecting all sub-models can cause OOMs on the Spark driver.",
164+
"False", "TypeConverters.toBoolean"),
160165
("loss", "the loss function to be optimized.", None, "TypeConverters.toString")]
161166

162167
code = []

python/pyspark/ml/param/shared.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,30 @@ def getParallelism(self):
655655
return self.getOrDefault(self.parallelism)
656656

657657

658+
class HasCollectSubModels(Params):
659+
"""
660+
Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.
661+
"""
662+
663+
collectSubModels = Param(Params._dummy(), "collectSubModels", "Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.", typeConverter=TypeConverters.toBoolean)
664+
665+
def __init__(self):
666+
super(HasCollectSubModels, self).__init__()
667+
self._setDefault(collectSubModels=False)
668+
669+
def setCollectSubModels(self, value):
670+
"""
671+
Sets the value of :py:attr:`collectSubModels`.
672+
"""
673+
return self._set(collectSubModels=value)
674+
675+
def getCollectSubModels(self):
676+
"""
677+
Gets the value of collectSubModels or its default value.
678+
"""
679+
return self.getOrDefault(self.collectSubModels)
680+
681+
658682
class HasLoss(Params):
659683
"""
660684
Mixin for param loss: the loss function to be optimized.

python/pyspark/ml/tests.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,50 @@ def test_parallel_evaluation(self):
10181018
cvParallelModel = cv.fit(dataset)
10191019
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
10201020

1021+
def test_expose_sub_models(self):
1022+
temp_path = tempfile.mkdtemp()
1023+
dataset = self.spark.createDataFrame(
1024+
[(Vectors.dense([0.0]), 0.0),
1025+
(Vectors.dense([0.4]), 1.0),
1026+
(Vectors.dense([0.5]), 0.0),
1027+
(Vectors.dense([0.6]), 1.0),
1028+
(Vectors.dense([1.0]), 1.0)] * 10,
1029+
["features", "label"])
1030+
1031+
lr = LogisticRegression()
1032+
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
1033+
evaluator = BinaryClassificationEvaluator()
1034+
1035+
numFolds = 3
1036+
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
1037+
numFolds=numFolds, collectSubModels=True)
1038+
1039+
def checkSubModels(subModels):
1040+
self.assertEqual(len(subModels), numFolds)
1041+
for i in range(numFolds):
1042+
self.assertEqual(len(subModels[i]), len(grid))
1043+
1044+
cvModel = cv.fit(dataset)
1045+
checkSubModels(cvModel.subModels)
1046+
1047+
# Test the default value for option "persistSubModel" to be "true"
1048+
testSubPath = temp_path + "/testCrossValidatorSubModels"
1049+
savingPathWithSubModels = testSubPath + "cvModel3"
1050+
cvModel.save(savingPathWithSubModels)
1051+
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
1052+
checkSubModels(cvModel3.subModels)
1053+
cvModel4 = cvModel3.copy()
1054+
checkSubModels(cvModel4.subModels)
1055+
1056+
savingPathWithoutSubModels = testSubPath + "cvModel2"
1057+
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
1058+
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
1059+
self.assertEqual(cvModel2.subModels, None)
1060+
1061+
for i in range(numFolds):
1062+
for j in range(len(grid)):
1063+
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
1064+
10211065
def test_save_load_nested_estimator(self):
10221066
temp_path = tempfile.mkdtemp()
10231067
dataset = self.spark.createDataFrame(
@@ -1186,6 +1230,40 @@ def test_parallel_evaluation(self):
11861230
tvsParallelModel = tvs.fit(dataset)
11871231
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
11881232

1233+
def test_expose_sub_models(self):
1234+
temp_path = tempfile.mkdtemp()
1235+
dataset = self.spark.createDataFrame(
1236+
[(Vectors.dense([0.0]), 0.0),
1237+
(Vectors.dense([0.4]), 1.0),
1238+
(Vectors.dense([0.5]), 0.0),
1239+
(Vectors.dense([0.6]), 1.0),
1240+
(Vectors.dense([1.0]), 1.0)] * 10,
1241+
["features", "label"])
1242+
lr = LogisticRegression()
1243+
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
1244+
evaluator = BinaryClassificationEvaluator()
1245+
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
1246+
collectSubModels=True)
1247+
tvsModel = tvs.fit(dataset)
1248+
self.assertEqual(len(tvsModel.subModels), len(grid))
1249+
1250+
# Test the default value for option "persistSubModel" to be "true"
1251+
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
1252+
savingPathWithSubModels = testSubPath + "cvModel3"
1253+
tvsModel.save(savingPathWithSubModels)
1254+
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
1255+
self.assertEqual(len(tvsModel3.subModels), len(grid))
1256+
tvsModel4 = tvsModel3.copy()
1257+
self.assertEqual(len(tvsModel4.subModels), len(grid))
1258+
1259+
savingPathWithoutSubModels = testSubPath + "cvModel2"
1260+
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
1261+
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
1262+
self.assertEqual(tvsModel2.subModels, None)
1263+
1264+
for i in range(len(grid)):
1265+
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
1266+
11891267
def test_save_load_nested_estimator(self):
11901268
# This tests saving and loading the trained model only.
11911269
# Save/load for TrainValidationSplit will be added later: SPARK-13786

0 commit comments

Comments
 (0)