Skip to content

Commit bca4746

Browse files
authored
Spark ML: additional ops (#262)
* SparkML POC; unit tests pass for all 4 operators * Added Pipeline logic+tests; Digressed Pipeline Conversion from sklearn; all unit tests pass * Adding documentation; Changing locations of some files * fixes after relocating file; Added Profiling test to sparkml * fixing broken link * added verification step to the profiling test for sparkml * removed the zipmap step from LogisticRegression conversion; conversion has sped up already * fixing profile sparkml to include all columns * removing individual shape_calculator files and merging their code with the converters * adding Binarizer to SparkML * removing individual shape_calculator files and merging their code with the converters * Adding MLeap to sparkml profiler; generating plot graphics * fixing the SPARK_HOME detection code * fixing sparkml test base for start_spark() args * Adding Normalizer to Spark ML converter * updating gitignore * fixing unit tests for sparkml * adding cmake exclusions to gitignore * cherrypicking with slight modifications changes from sparkml branch related to profile pipeline * fixing imports based on review comments * fixing unit tests * restructuring code * Adding LinearSVC operator * Adding Scaler(MinMax, MaxAbs,Standard) convertors to Spark ML * adding sys_platform qualifiers to requirements * adding Imputer to sparkML * adding Imputer to sparkML * removing sklearn from this project's requirements * adding LinearRegressor to Spark ML * code cleanup * proper renaming of files * adding GenerelizedLinearRegression to Spark ML * moved sparkml utils from where it caused depenency errors * skipping tests for py2.7; skipping failed pipeline tests for now until more investigation is done * disabling all pipeline tests in spark since they cause issues on the build machine * disabling all pipeline tests in spark since they cause issues on the build machine * removing onnxruntime from requirements-dev * formatting * interim checkin * Adding VectorIndexer conversion; Allowing Spark Session to be passed to the parser/converter code; untested DecisionTree code (needs ore work) * adding proposal for missing ops * first seemingly working version of DecisionTree; needs more testing * fixing the testcase for DecisionTree; DecisionTree conversion is complete * merging from additional-ops with squash * fixing randomforest regressor unit test * Leaving out Imputer tests because of bug in pyspark * fixing import issues * skipping tests under python2 * fixing missing import * fixing imports * skipping tests that require opset 9 * fixing the environment marker for py2.7 * enabling coremltools in the requirements again * Adding bucketizer converter to Sparkml * adding convertors for StopWordsRemover and VectorSlicer * Adding converter for Spark ML NaiveBayes * fixing build issues for my PR * Adding Tokenizer and PolynomialExpansion operator converters * adding PCA converter to Sparkml * removing comments * adding DCT converter to sparkml * Adding GBTClassifier converter to Spark ML; some re-orging of files * Adding converter for GBTRegressor; some code cleanup * Adding converter for OneVsRest in Spark ML; some formatting changes * RandomForests were missing from the init file * Adding script to read pipeline as saved by R on Spark, and run in runtime * Adding converter for ChiSqSelector in Spark ML * formatting * Added converter for IndexToString; Currently cannot handle cases where Labels are not specified in the Spark model * Adding converter for Word2Vec in Spark ML * cleanup * removing a hardcoded value * Adding converters ElementWiseProduct and MinHashLSH in Spark ML * cleanup * cleanup * Adding converter for AftSurvivalRegression in Spark ML * Adding converter for BucketedRandomProjectionLSH in Spark ML * using separators for Tokenizer conversion in Spark ML * path cleanup * excluding test from py2.7 * fixing failures in LinearClassifier tests * excluding word2vec test from automatic run due to pyspark issues * fixing unit test errors * removing verbose prints * Changing the test run and validation routines to account for output names; Retrofitting all tests cases accordingly * fixing an issue in MinHashLSH which is related to Spark version * reducing the ReduceMin version to 1 * formatting * fixing ReduceMin issue in min_hash_lsh * formatting and cleanup * copyright headers * fixed accidental line deletion
1 parent e8e3ab1 commit bca4746

File tree

92 files changed

+5055
-584
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+5055
-584
lines changed

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,19 @@ temp_*.onnx
3737
tests/sklearn/tests/*
3838
tests_backend/*.onnx
3939
tests/*/tests/*
40+
tests/*/tests_dump/*
4041
tests/build/*
4142
tests/tests/
43+
derby.log
44+
metastore_db/
4245
tests/*/tests/*.pkl
4346
tests/*/tests/*.onnx
4447
tests/*/tests/*.keras
4548
docs/auto_examples/*
4649
docs/examples/dense121.onnx
4750
docs/examples/graph.dot
4851
docs/examples/graph.dot.png
52+
# CMake
53+
CMakeCache.txt
54+
CMakeFiles/
55+

onnxmltools/convert/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,14 @@ def convert_sklearn(model, name=None, initial_types=None, doc_string='', target_
6565
custom_conversion_functions, custom_shape_calculators)
6666

6767
def convert_sparkml(model, name=None, initial_types=None, doc_string='', target_opset=None,
68-
targeted_onnx=onnx.__version__, custom_conversion_functions=None, custom_shape_calculators=None):
68+
targeted_onnx=onnx.__version__, custom_conversion_functions=None,
69+
custom_shape_calculators=None, spark_session=None):
6970
if not utils.sparkml_installed():
7071
raise RuntimeError('Spark is not installed. Please install Spark to use this feature.')
7172

7273
from .sparkml.convert import convert
7374
return convert(model, name, initial_types, doc_string, target_opset, targeted_onnx,
74-
custom_conversion_functions, custom_shape_calculators)
75+
custom_conversion_functions, custom_shape_calculators, spark_session)
7576

7677
def convert_xgboost(*args, **kwargs):
7778
if not utils.xgboost_installed():

onnxmltools/convert/sparkml/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@
55
# --------------------------------------------------------------------------
66

77
from .convert import convert
8-
from .utils import *
8+
from .utils import buildInitialTypesSimple, getTensorTypeFromSpark, buildInputDictSimple
9+
from .ops_names import get_sparkml_operator_name
10+
from .ops_input_output import get_input_names, get_output_names

onnxmltools/convert/sparkml/_parse.py

Lines changed: 16 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -3,154 +3,15 @@
33
# Licensed under the MIT License. See License.txt in the project root for
44
# license information.
55
# --------------------------------------------------------------------------
6+
from .ops_names import get_sparkml_operator_name
7+
from .ops_input_output import get_input_names, get_output_names
68

79
from ..common._container import SparkmlModelContainer
810
from ..common._topology import *
911

1012
from pyspark.ml import PipelineModel
1113

1214

13-
from pyspark.ml.feature import Binarizer
14-
from pyspark.ml.feature import BucketedRandomProjectionLSHModel
15-
from pyspark.ml.feature import Bucketizer
16-
from pyspark.ml.feature import ChiSqSelectorModel
17-
from pyspark.ml.feature import CountVectorizerModel
18-
from pyspark.ml.feature import DCT
19-
from pyspark.ml.feature import ElementwiseProduct
20-
from pyspark.ml.feature import HashingTF
21-
from pyspark.ml.feature import IDFModel
22-
from pyspark.ml.feature import ImputerModel
23-
from pyspark.ml.feature import IndexToString
24-
from pyspark.ml.feature import MaxAbsScalerModel
25-
from pyspark.ml.feature import MinHashLSHModel
26-
from pyspark.ml.feature import MinMaxScalerModel
27-
from pyspark.ml.feature import NGram
28-
from pyspark.ml.feature import Normalizer
29-
from pyspark.ml.feature import OneHotEncoderModel
30-
from pyspark.ml.feature import PCAModel
31-
from pyspark.ml.feature import PolynomialExpansion
32-
from pyspark.ml.feature import QuantileDiscretizer
33-
from pyspark.ml.feature import RegexTokenizer
34-
from pyspark.ml.feature import RFormulaModel
35-
from pyspark.ml.feature import SQLTransformer
36-
from pyspark.ml.feature import StandardScalerModel
37-
from pyspark.ml.feature import StopWordsRemover
38-
from pyspark.ml.feature import StringIndexerModel
39-
from pyspark.ml.feature import Tokenizer
40-
from pyspark.ml.feature import VectorAssembler
41-
from pyspark.ml.feature import VectorIndexerModel
42-
from pyspark.ml.feature import VectorSlicer
43-
from pyspark.ml.feature import Word2VecModel
44-
45-
from pyspark.ml.classification import LinearSVCModel
46-
from pyspark.ml.classification import LogisticRegressionModel
47-
from pyspark.ml.classification import DecisionTreeClassifier
48-
from pyspark.ml.classification import GBTClassifier
49-
from pyspark.ml.classification import RandomForestClassifier
50-
from pyspark.ml.classification import NaiveBayesModel
51-
from pyspark.ml.classification import MultilayerPerceptronClassifier
52-
from pyspark.ml.classification import OneVsRestModel
53-
54-
from pyspark.ml.regression import AFTSurvivalRegressionModel
55-
from pyspark.ml.regression import DecisionTreeRegressor
56-
from pyspark.ml.regression import GBTRegressionModel
57-
from pyspark.ml.regression import GeneralizedLinearRegressionModel
58-
from pyspark.ml.regression import IsotonicRegressionModel
59-
from pyspark.ml.regression import LinearRegressionModel
60-
from pyspark.ml.regression import RandomForestRegressor
61-
62-
from pyspark.ml.clustering import BisectingKMeans
63-
from pyspark.ml.clustering import KMeans
64-
from pyspark.ml.clustering import GaussianMixture
65-
from pyspark.ml.clustering import LDA
66-
67-
# In most cases, spark-ml operator produces only one output. However, each classifier has basically two outputs:
68-
# 1. prediction: selected label
69-
# 2. rawPrediction: Dense vector containing values for each class
70-
# Here is a list of supported spark-ml classifiers.
71-
# In the parsing stage, we produce two outputs for objects included in the following list and
72-
# one output for everything not in the list.
73-
sparkml_classifier_list = [LinearSVCModel, LogisticRegressionModel, DecisionTreeClassifier, GBTClassifier,
74-
RandomForestClassifier, NaiveBayesModel, MultilayerPerceptronClassifier, OneVsRestModel]
75-
76-
# Associate spark-ml types with our operator names. If two spark-ml models share a single name, it means their
77-
# are equivalent in terms of conversion.
78-
79-
def build_sparkml_operator_name_map():
80-
res = {k: "pyspark.ml.feature." + k.__name__ for k in [
81-
Binarizer, BucketedRandomProjectionLSHModel, Bucketizer,
82-
ChiSqSelectorModel, CountVectorizerModel, DCT, ElementwiseProduct, HashingTF, IDFModel, ImputerModel,
83-
IndexToString, MaxAbsScalerModel, MinHashLSHModel, MinMaxScalerModel, NGram, Normalizer, OneHotEncoderModel,
84-
PCAModel, PolynomialExpansion, QuantileDiscretizer, RegexTokenizer, RFormulaModel, SQLTransformer,
85-
StandardScalerModel, StopWordsRemover, StringIndexerModel, Tokenizer, VectorAssembler, VectorIndexerModel,
86-
VectorSlicer, Word2VecModel
87-
]}
88-
res.update({k: "pyspark.ml.classification." + k.__name__ for k in [
89-
LinearSVCModel, LogisticRegressionModel, DecisionTreeClassifier, GBTClassifier, RandomForestClassifier,
90-
NaiveBayesModel, MultilayerPerceptronClassifier, OneVsRestModel
91-
]})
92-
res.update({k: "pyspark.ml.regression." + k.__name__ for k in [
93-
AFTSurvivalRegressionModel, DecisionTreeRegressor, GBTRegressionModel, GBTRegressionModel,
94-
GeneralizedLinearRegressionModel, IsotonicRegressionModel, LinearRegressionModel, RandomForestRegressor
95-
]})
96-
return res
97-
98-
99-
sparkml_operator_name_map = build_sparkml_operator_name_map()
100-
101-
def build_io_name_map():
102-
map = {
103-
"pyspark.ml.classification.LogisticRegressionModel": (
104-
lambda model: [model.getOrDefault("featuresCol")],
105-
lambda model: [model.getOrDefault("predictionCol"), model.getOrDefault("probabilityCol")]
106-
),
107-
"pyspark.ml.feature.OneHotEncoderModel": (
108-
lambda model: model.getOrDefault("inputCols"),
109-
lambda model: model.getOrDefault("outputCols")
110-
),
111-
"pyspark.ml.feature.StringIndexerModel": (
112-
lambda model: [model.getOrDefault("inputCol")],
113-
lambda model: [model.getOrDefault("outputCol")]
114-
),
115-
"pyspark.ml.feature.VectorAssembler": (
116-
lambda model: model.getOrDefault("inputCols"),
117-
lambda model: [model.getOrDefault("outputCol")]
118-
)
119-
}
120-
return map
121-
122-
io_name_map = build_io_name_map()
123-
124-
def _get_input_names(model):
125-
'''
126-
Returns the name(s) of the input(s) for a SparkML operator
127-
:param model: SparkML Model
128-
:return: list of input names
129-
'''
130-
return io_name_map[_get_sparkml_operator_name(type(model))][0](model)
131-
132-
133-
def _get_output_names(model):
134-
'''
135-
Returns the name(s) of the output(s) for a SparkML operator
136-
:param model: SparkML Model
137-
:return: list of output names
138-
'''
139-
return io_name_map[_get_sparkml_operator_name(type(model))][1](model)
140-
141-
142-
def _get_sparkml_operator_name(model_type):
143-
'''
144-
Get operator name of the input argument
145-
146-
:param model_type: A spark-ml object (LinearRegression, StringIndexer, ...)
147-
:return: A string which stands for the type of the input model in our conversion framework
148-
'''
149-
if model_type not in sparkml_operator_name_map:
150-
raise ValueError("No proper operator name found for '%s'" % model_type)
151-
return sparkml_operator_name_map[model_type]
152-
153-
15415
def _get_variable_for_input(scope, input_name, global_inputs, output_dict):
15516
'''
15617
Find the corresponding Variable for a given raw operator (model) name
@@ -175,7 +36,8 @@ def _get_variable_for_input(scope, input_name, global_inputs, output_dict):
17536
#
17637
return scope.declare_local_variable(input_name)
17738

178-
def _parse_sparkml_simple_model(scope, model, global_inputs, output_dict):
39+
40+
def _parse_sparkml_simple_model(spark, scope, model, global_inputs, output_dict):
17941
'''
18042
This function handles all non-pipeline models.
18143
@@ -185,33 +47,18 @@ def _parse_sparkml_simple_model(scope, model, global_inputs, output_dict):
18547
:param output_dict: An accumulated list of output_original_name->(ref_count, variable)
18648
:return: A list of output variables which will be passed to next stage
18749
'''
188-
this_operator = scope.declare_local_operator(_get_sparkml_operator_name(type(model)), model)
189-
raw_input_names = _get_input_names(model)
50+
this_operator = scope.declare_local_operator(get_sparkml_operator_name(type(model)), model)
51+
this_operator.raw_params = {'SparkSession': spark}
52+
raw_input_names = get_input_names(model)
19053
this_operator.inputs = [_get_variable_for_input(scope, x, global_inputs, output_dict) for x in raw_input_names]
191-
raw_output_names = _get_output_names(model)
54+
raw_output_names = get_output_names(model)
19255
for output_name in raw_output_names:
19356
variable = scope.declare_local_variable(output_name, FloatTensorType())
19457
this_operator.outputs.append(variable)
19558
output_dict[variable.raw_name] = [0, variable]
19659

19760

198-
# if type(model) in sparkml_classifier_list:
199-
# # For classifiers, we may have two outputs, one for label and the other one for probabilities of all classes.
200-
# # Notice that their types here are not necessarily correct and they will be fixed in shape inference phase
201-
# label_variable = scope.declare_local_variable('label', FloatTensorType())
202-
# probability_map_variable = scope.declare_local_variable('probabilities', FloatTensorType())
203-
# this_operator.outputs.append(label_variable)
204-
# this_operator.outputs.append(probability_map_variable)
205-
# output_dict[label_variable.raw_name] = [0, label_variable]
206-
# output_dict[probability_map_variable.raw_name] = [0, probability_map_variable]
207-
# else:
208-
# # We assume that all spark-ml operator can only produce a single float tensor.
209-
# variable = scope.declare_local_variable('output', FloatTensorType())
210-
# this_operator.outputs.append(variable)
211-
# output_dict[variable.raw_name] = [0, variable]
212-
213-
214-
def _parse_sparkml_pipeline(scope, model, global_inputs, output_dict):
61+
def _parse_sparkml_pipeline(spark, scope, model, global_inputs, output_dict):
21562
'''
21663
The basic ideas of spark-ml parsing:
21764
1. Sequentially go though all stages defined in the considered spark-ml pipeline
@@ -224,9 +71,10 @@ def _parse_sparkml_pipeline(scope, model, global_inputs, output_dict):
22471
:return: A list of output variables produced by the input pipeline
22572
'''
22673
for stage in model.stages:
227-
_parse_sparkml(scope, stage, global_inputs, output_dict)
74+
_parse_sparkml(spark, scope, stage, global_inputs, output_dict)
75+
22876

229-
def _parse_sparkml(scope, model, global_inputs, output_dict):
77+
def _parse_sparkml(spark, scope, model, global_inputs, output_dict):
23078
'''
23179
This is a delegate function. It doesn't nothing but invoke the correct parsing function according to the input
23280
model's type.
@@ -236,12 +84,12 @@ def _parse_sparkml(scope, model, global_inputs, output_dict):
23684
:return: The output variables produced by the input model
23785
'''
23886
if isinstance(model, PipelineModel):
239-
return _parse_sparkml_pipeline(scope, model, global_inputs, output_dict)
87+
return _parse_sparkml_pipeline(spark, scope, model, global_inputs, output_dict)
24088
else:
241-
return _parse_sparkml_simple_model(scope, model, global_inputs, output_dict)
89+
return _parse_sparkml_simple_model(spark, scope, model, global_inputs, output_dict)
24290

24391

244-
def parse_sparkml(model, initial_types=None, target_opset=None,
92+
def parse_sparkml(spark, model, initial_types=None, target_opset=None,
24593
custom_conversion_functions=None, custom_shape_calculators=None):
24694
# Put spark-ml object into an abstract container so that our framework can work seamlessly on models created
24795
# with different machine learning tools.
@@ -270,7 +118,7 @@ def parse_sparkml(model, initial_types=None, target_opset=None,
270118

271119
# Parse the input spark-ml model as a Topology object.
272120
output_dict = {}
273-
_parse_sparkml(scope, model, inputs, output_dict)
121+
_parse_sparkml(spark, scope, model, inputs, output_dict)
274122
outputs = []
275123
for k, v in output_dict.items():
276124
if v[0] == 0: # ref count is zero

onnxmltools/convert/sparkml/convert.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@
88
from ...proto import onnx, get_opset_number_from_onnx
99
from ..common._topology import convert_topology
1010
from ._parse import parse_sparkml
11-
12-
# Invoke the registration of all our converters and shape calculators
13-
from . import shape_calculators
1411
from . import operator_converters
1512

1613

1714
def convert(model, name=None, initial_types=None, doc_string='', target_opset=None,
18-
targeted_onnx=onnx.__version__, custom_conversion_functions=None, custom_shape_calculators=None):
15+
targeted_onnx=onnx.__version__, custom_conversion_functions=None, custom_shape_calculators=None,
16+
spark_session=None):
1917
'''
2018
This function produces an equivalent ONNX model of the given spark-ml model. The supported spark-ml
2119
modules are listed below.
@@ -67,7 +65,7 @@ def convert(model, name=None, initial_types=None, doc_string='', target_opset=No
6765

6866
target_opset = target_opset if target_opset else get_opset_number_from_onnx()
6967
# Parse spark-ml model as our internal data structure (i.e., Topology)
70-
topology = parse_sparkml(model, initial_types, target_opset, custom_conversion_functions, custom_shape_calculators)
68+
topology = parse_sparkml(spark_session, model, initial_types, target_opset, custom_conversion_functions, custom_shape_calculators)
7169

7270
# Infer variable shapes
7371
topology.compile()

0 commit comments

Comments
 (0)