Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 720c94f

Browse files
ajaysaini725jkbradley
authored andcommitted
[SPARK-21027][ML][PYTHON] Added tunable parallelism to one vs. rest in both Scala mllib and Pyspark
# What changes were proposed in this pull request? Added tunable parallelism to the pyspark implementation of one vs. rest classification. Added a parallelism parameter to the Scala implementation of one vs. rest along with functionality for using the parameter to tune the level of parallelism. I take this PR apache#18281 over because the original author is busy but we need merge this PR soon. After this been merged, we can close apache#18281 . ## How was this patch tested? Test suite added. Author: Ajay Saini <[email protected]> Author: WeichenXu <[email protected]> Closes apache#19110 from WeichenXu123/spark-21027.
1 parent 515910e commit 720c94f

File tree

7 files changed

+146
-28
lines changed

7 files changed

+146
-28
lines changed

docs/ml-guide.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ MLlib is under active development.
105105
The APIs marked `Experimental`/`DeveloperApi` may change in future releases,
106106
and the migration guide below will explain all changes between releases.
107107

108+
## From 2.2 to 2.3
109+
110+
### Breaking changes
111+
112+
There are no breaking changes.
113+
114+
### Deprecations and changes of behavior
115+
116+
**Deprecations**
117+
118+
There are no deprecations.
119+
120+
**Changes of behavior**
121+
122+
* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027):
123+
We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial), in 2.2 and earlier version,
124+
the `OneVsRest` parallelism would be parallelism of the default threadpool in scala.
125+
108126
## From 2.1 to 2.2
109127

110128
### Breaking changes

mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.spark.ml.classification
1919

20-
import java.util.{List => JList}
2120
import java.util.UUID
2221

23-
import scala.collection.JavaConverters._
22+
import scala.concurrent.Future
23+
import scala.concurrent.duration.Duration
2424
import scala.language.existentials
2525

2626
import org.apache.hadoop.fs.Path
@@ -34,12 +34,13 @@ import org.apache.spark.ml._
3434
import org.apache.spark.ml.attribute._
3535
import org.apache.spark.ml.linalg.Vector
3636
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
37-
import org.apache.spark.ml.param.shared.HasWeightCol
37+
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
3838
import org.apache.spark.ml.util._
3939
import org.apache.spark.sql.{DataFrame, Dataset, Row}
4040
import org.apache.spark.sql.functions._
4141
import org.apache.spark.sql.types._
4242
import org.apache.spark.storage.StorageLevel
43+
import org.apache.spark.util.ThreadUtils
4344

4445
private[ml] trait ClassifierTypeTrait {
4546
// scalastyle:off structural.type
@@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
273274
@Since("1.4.0")
274275
final class OneVsRest @Since("1.4.0") (
275276
@Since("1.4.0") override val uid: String)
276-
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {
277+
extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable {
277278

278279
@Since("1.4.0")
279280
def this() = this(Identifiable.randomUID("oneVsRest"))
@@ -296,6 +297,16 @@ final class OneVsRest @Since("1.4.0") (
296297
@Since("1.5.0")
297298
def setPredictionCol(value: String): this.type = set(predictionCol, value)
298299

300+
/**
301+
* The implementation of parallel one vs. rest runs the classification for
302+
* each class in a separate threads.
303+
*
304+
* @group expertSetParam
305+
*/
306+
def setParallelism(value: Int): this.type = {
307+
set(parallelism, value)
308+
}
309+
299310
/**
300311
* Sets the value of param [[weightCol]].
301312
*
@@ -318,7 +329,7 @@ final class OneVsRest @Since("1.4.0") (
318329
transformSchema(dataset.schema)
319330

320331
val instr = Instrumentation.create(this, dataset)
321-
instr.logParams(labelCol, featuresCol, predictionCol)
332+
instr.logParams(labelCol, featuresCol, predictionCol, parallelism)
322333
instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)
323334

324335
// determine number of classes either from metadata if provided, or via computation.
@@ -352,8 +363,10 @@ final class OneVsRest @Since("1.4.0") (
352363
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
353364
}
354365

366+
val executionContext = getExecutionContext
367+
355368
// create k columns, one for each binary classifier.
356-
val models = Range(0, numClasses).par.map { index =>
369+
val modelFutures = Range(0, numClasses).map { index =>
357370
// generate new label metadata for the binary problem.
358371
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
359372
val labelColName = "mc2b$" + index
@@ -364,14 +377,18 @@ final class OneVsRest @Since("1.4.0") (
364377
paramMap.put(classifier.labelCol -> labelColName)
365378
paramMap.put(classifier.featuresCol -> getFeaturesCol)
366379
paramMap.put(classifier.predictionCol -> getPredictionCol)
367-
if (weightColIsUsed) {
368-
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
369-
paramMap.put(classifier_.weightCol -> getWeightCol)
370-
classifier_.fit(trainingDataset, paramMap)
371-
} else {
372-
classifier.fit(trainingDataset, paramMap)
373-
}
374-
}.toArray[ClassificationModel[_, _]]
380+
Future {
381+
if (weightColIsUsed) {
382+
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
383+
paramMap.put(classifier_.weightCol -> getWeightCol)
384+
classifier_.fit(trainingDataset, paramMap)
385+
} else {
386+
classifier.fit(trainingDataset, paramMap)
387+
}
388+
}(executionContext)
389+
}
390+
val models = modelFutures
391+
.map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]]
375392
instr.logNumFeatures(models.head.numFeatures)
376393

377394
if (handlePersistence) {

mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import org.apache.spark.ml.feature.StringIndexer
2525
import org.apache.spark.ml.linalg.Vectors
2626
import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
2727
import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, MLTestingUtils}
28+
import org.apache.spark.ml.util.TestingUtils._
2829
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
2930
import org.apache.spark.mllib.evaluation.MulticlassMetrics
3031
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
3132
import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
3233
import org.apache.spark.mllib.util.MLlibTestSparkContext
33-
import org.apache.spark.mllib.util.TestingUtils._
3434
import org.apache.spark.rdd.RDD
3535
import org.apache.spark.sql.Dataset
3636
import org.apache.spark.sql.functions._
@@ -98,7 +98,45 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
9898
// bound how much error we allow compared to multinomial logistic regression.
9999
val expectedMetrics = new MulticlassMetrics(results)
100100
val ovaMetrics = new MulticlassMetrics(ovaResults)
101-
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
101+
assert(expectedMetrics.confusionMatrix.asML ~== ovaMetrics.confusionMatrix.asML absTol 400)
102+
}
103+
104+
test("one-vs-rest: tuning parallelism does not change output") {
105+
val ovaPar1 = new OneVsRest()
106+
.setClassifier(new LogisticRegression)
107+
108+
val ovaModelPar1 = ovaPar1.fit(dataset)
109+
110+
val transformedDatasetPar1 = ovaModelPar1.transform(dataset)
111+
112+
val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map {
113+
row => (row.getDouble(0), row.getDouble(1))
114+
}
115+
116+
val ovaPar2 = new OneVsRest()
117+
.setClassifier(new LogisticRegression)
118+
.setParallelism(2)
119+
120+
val ovaModelPar2 = ovaPar2.fit(dataset)
121+
122+
val transformedDatasetPar2 = ovaModelPar2.transform(dataset)
123+
124+
val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map {
125+
row => (row.getDouble(0), row.getDouble(1))
126+
}
127+
128+
val metricsPar1 = new MulticlassMetrics(ovaResultsPar1)
129+
val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
130+
assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix)
131+
132+
ovaModelPar1.models.zip(ovaModelPar2.models).foreach {
133+
case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) =>
134+
assert(lrModel1.coefficients ~== lrModel2.coefficients relTol 1E-3)
135+
assert(lrModel1.intercept ~== lrModel2.intercept relTol 1E-3)
136+
case other =>
137+
throw new AssertionError(s"Loaded OneVsRestModel expected model of type" +
138+
s" LogisticRegressionModel but found ${other.getClass.getName}")
139+
}
102140
}
103141

104142
test("one-vs-rest: pass label metadata correctly during train") {

python/pyspark/ml/classification.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import operator
19+
from multiprocessing.pool import ThreadPool
1920

2021
from pyspark import since, keyword_only
2122
from pyspark.ml import Estimator, Model
@@ -1567,7 +1568,7 @@ def getClassifier(self):
15671568

15681569

15691570
@inherit_doc
1570-
class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):
1571+
class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable):
15711572
"""
15721573
.. note:: Experimental
15731574
@@ -1612,22 +1613,23 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):
16121613

16131614
@keyword_only
16141615
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
1615-
classifier=None, weightCol=None):
1616+
classifier=None, weightCol=None, parallelism=1):
16161617
"""
16171618
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
1618-
classifier=None, weightCol=None)
1619+
classifier=None, weightCol=None, parallelism=1):
16191620
"""
16201621
super(OneVsRest, self).__init__()
1622+
self._setDefault(parallelism=1)
16211623
kwargs = self._input_kwargs
16221624
self._set(**kwargs)
16231625

16241626
@keyword_only
16251627
@since("2.0.0")
1626-
def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
1627-
classifier=None, weightCol=None):
1628+
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
1629+
classifier=None, weightCol=None, parallelism=1):
16281630
"""
1629-
setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \
1630-
classifier=None, weightCol=None):
1631+
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
1632+
classifier=None, weightCol=None, parallelism=1):
16311633
Sets params for OneVsRest.
16321634
"""
16331635
kwargs = self._input_kwargs
@@ -1674,8 +1676,9 @@ def trainSingleClass(index):
16741676
paramMap[classifier.weightCol] = weightCol
16751677
return classifier.fit(trainingDataset, paramMap)
16761678

1677-
# TODO: Parallel training for all classes.
1678-
models = [trainSingleClass(i) for i in range(numClasses)]
1679+
pool = ThreadPool(processes=min(self.getParallelism(), numClasses))
1680+
1681+
models = pool.map(trainSingleClass, range(numClasses))
16791682

16801683
if handlePersistence:
16811684
multiclassLabeled.unpersist()
@@ -1709,8 +1712,9 @@ def _from_java(cls, java_stage):
17091712
labelCol = java_stage.getLabelCol()
17101713
predictionCol = java_stage.getPredictionCol()
17111714
classifier = JavaParams._from_java(java_stage.getClassifier())
1715+
parallelism = java_stage.getParallelism()
17121716
py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol,
1713-
classifier=classifier)
1717+
classifier=classifier, parallelism=parallelism)
17141718
py_stage._resetUid(java_stage.uid())
17151719
return py_stage
17161720

@@ -1723,6 +1727,7 @@ def _to_java(self):
17231727
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest",
17241728
self.uid)
17251729
_java_obj.setClassifier(self.getClassifier()._to_java())
1730+
_java_obj.setParallelism(self.getParallelism())
17261731
_java_obj.setFeaturesCol(self.getFeaturesCol())
17271732
_java_obj.setLabelCol(self.getLabelCol())
17281733
_java_obj.setPredictionCol(self.getPredictionCol())

python/pyspark/ml/param/_shared_params_code_gen.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ def get$Name(self):
152152
("varianceCol", "column name for the biased sample variance of prediction.",
153153
None, "TypeConverters.toString"),
154154
("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
155-
"TypeConverters.toInt")]
155+
"TypeConverters.toInt"),
156+
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
157+
"1", "TypeConverters.toInt")]
156158

157159
code = []
158160
for name, doc, defaultValueStr, typeConverter in shared:

python/pyspark/ml/param/shared.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,30 @@ def getAggregationDepth(self):
608608
return self.getOrDefault(self.aggregationDepth)
609609

610610

611+
class HasParallelism(Params):
612+
"""
613+
Mixin for param parallelism: the number of threads to use when running parallel algorithms (>= 1).
614+
"""
615+
616+
parallelism = Param(Params._dummy(), "parallelism", "the number of threads to use when running parallel algorithms (>= 1).", typeConverter=TypeConverters.toInt)
617+
618+
def __init__(self):
619+
super(HasParallelism, self).__init__()
620+
self._setDefault(parallelism=1)
621+
622+
def setParallelism(self, value):
623+
"""
624+
Sets the value of :py:attr:`parallelism`.
625+
"""
626+
return self._set(parallelism=value)
627+
628+
def getParallelism(self):
629+
"""
630+
Gets the value of parallelism or its default value.
631+
"""
632+
return self.getOrDefault(self.parallelism)
633+
634+
611635
class DecisionTreeParams(Params):
612636
"""
613637
Mixin for Decision Tree parameters.

python/pyspark/ml/tests.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1548,11 +1548,25 @@ def test_output_columns(self):
15481548
(2.0, Vectors.dense(0.5, 0.5))],
15491549
["label", "features"])
15501550
lr = LogisticRegression(maxIter=5, regParam=0.01)
1551-
ovr = OneVsRest(classifier=lr)
1551+
ovr = OneVsRest(classifier=lr, parallelism=1)
15521552
model = ovr.fit(df)
15531553
output = model.transform(df)
15541554
self.assertEqual(output.columns, ["label", "features", "prediction"])
15551555

1556+
def test_parallelism_doesnt_change_output(self):
1557+
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
1558+
(1.0, Vectors.sparse(2, [], [])),
1559+
(2.0, Vectors.dense(0.5, 0.5))],
1560+
["label", "features"])
1561+
ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
1562+
modelPar1 = ovrPar1.fit(df)
1563+
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
1564+
modelPar2 = ovrPar2.fit(df)
1565+
for i, model in enumerate(modelPar1.models):
1566+
self.assertTrue(np.allclose(model.coefficients.toArray(),
1567+
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
1568+
self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))
1569+
15561570
def test_support_for_weightCol(self):
15571571
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
15581572
(1.0, Vectors.sparse(2, [], []), 1.0),

0 commit comments

Comments
 (0)