Skip to content

Commit a699341

Browse files
authored
feat: SparkML MultilayerPerceptronClassifier (#570)
* feat: SparkML MultilayerPerceptronClassifier Signed-off-by: Jason Wang <[email protected]> * imports Signed-off-by: Jason Wang <[email protected]>
1 parent 46a2bf1 commit a699341

File tree

4 files changed

+175
-1
lines changed

4 files changed

+175
-1
lines changed

onnxmltools/convert/sparkml/operator_converters/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@
3333
from . import onehot_encoder
3434
from . import vector_assembler
3535
from . import k_means
36-
from . import count_vectorizer
36+
from . import count_vectorizer
37+
from . import mlp_classifier
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
from pyspark.ml.classification import MultilayerPerceptronClassificationModel
4+
5+
from ...common._registration import register_converter, register_shape_calculator
6+
from ...common.data_types import Int64TensorType, FloatTensorType
7+
from ...common.utils import check_input_and_output_numbers, check_input_and_output_types
8+
from ...common._topology import Operator, Scope, ModelComponentContainer
9+
from ....proto import onnx_proto
10+
from typing import List
11+
import numpy as np
12+
13+
14+
def convert_sparkml_mlp_classifier(scope: Scope, operator: Operator, container: ModelComponentContainer):
15+
op: MultilayerPerceptronClassificationModel = operator.raw_operator
16+
layers: List[int] = op.getLayers()
17+
weights: np.ndarray = op.weights.toArray()
18+
19+
offset = 0
20+
21+
input: str
22+
for i in range(len(layers) - 1):
23+
weight_matrix = weights[offset : offset + layers[i] * layers[i + 1]].reshape([layers[i], layers[i + 1]])
24+
offset += layers[i] * layers[i + 1]
25+
bias_vector = weights[offset : offset + layers[i + 1]]
26+
offset += layers[i + 1]
27+
28+
if i == 0:
29+
input = operator.inputs[0].full_name
30+
31+
weight_variable = scope.get_unique_variable_name("w")
32+
container.add_initializer(
33+
weight_variable,
34+
onnx_proto.TensorProto.FLOAT,
35+
weight_matrix.shape,
36+
weight_matrix.flatten().astype(np.float32),
37+
)
38+
39+
bias_variable = scope.get_unique_variable_name("b")
40+
container.add_initializer(
41+
bias_variable, onnx_proto.TensorProto.FLOAT, bias_vector.shape, bias_vector.astype(np.float32),
42+
)
43+
44+
gemm_output_variable = scope.get_unique_variable_name("gemm_output")
45+
container.add_node(
46+
op_type="Gemm",
47+
inputs=[input, weight_variable, bias_variable],
48+
outputs=[gemm_output_variable],
49+
op_version=7,
50+
name=scope.get_unique_operator_name("Gemm"),
51+
)
52+
53+
if i == len(layers) - 2:
54+
container.add_node(
55+
op_type="Softmax",
56+
inputs=[gemm_output_variable],
57+
outputs=[operator.outputs[1].full_name],
58+
op_version=1,
59+
name=scope.get_unique_operator_name("Softmax"),
60+
)
61+
else:
62+
input = scope.get_unique_variable_name("activation_output")
63+
container.add_node(
64+
op_type="Sigmoid",
65+
inputs=[gemm_output_variable],
66+
outputs=[input],
67+
op_version=1,
68+
name=scope.get_unique_operator_name("Sigmoid"),
69+
)
70+
71+
container.add_node(
72+
"ArgMax",
73+
[operator.outputs[1].full_name],
74+
[operator.outputs[0].full_name],
75+
name=scope.get_unique_operator_name("ArgMax"),
76+
axis=1,
77+
keepdims = 0,
78+
)
79+
80+
81+
register_converter("pyspark.ml.classification.MultilayerPerceptronClassificationModel", convert_sparkml_mlp_classifier)
82+
83+
84+
def calculate_mlp_classifier_output_shapes(operator: Operator):
85+
op: MultilayerPerceptronClassificationModel = operator.raw_operator
86+
87+
check_input_and_output_numbers(operator, input_count_range=1, output_count_range=[1, 2])
88+
check_input_and_output_types(operator, good_input_types=[FloatTensorType, Int64TensorType])
89+
90+
if len(operator.inputs[0].type.shape) != 2:
91+
raise RuntimeError("Input must be a [N, C]-tensor")
92+
93+
N = operator.inputs[0].type.shape[0]
94+
operator.outputs[0].type = Int64TensorType(shape=[N])
95+
class_count = op.numClasses
96+
operator.outputs[1].type = FloatTensorType([N, class_count])
97+
98+
99+
register_shape_calculator(
100+
"pyspark.ml.classification.MultilayerPerceptronClassificationModel", calculate_mlp_classifier_output_shapes
101+
)

onnxmltools/convert/sparkml/ops_input_output.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ def build_io_name_map():
9797
lambda model: [model.getOrDefault("featuresCol")],
9898
lambda model: [model.getOrDefault("predictionCol"), model.getOrDefault("probabilityCol")],
9999
),
100+
"pyspark.ml.classification.MultilayerPerceptronClassificationModel": (
101+
lambda model: [model.getOrDefault("featuresCol")],
102+
lambda model: [model.getOrDefault("predictionCol"), model.getOrDefault("probabilityCol")],
103+
),
100104
"pyspark.ml.regression.DecisionTreeRegressionModel": (
101105
lambda model: [model.getOrDefault("featuresCol")],
102106
lambda model: [model.getOrDefault("predictionCol")],
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
import sys
4+
import unittest
5+
import inspect
6+
import os
7+
import numpy
8+
import pandas
9+
from pyspark.ml.classification import MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel
10+
from pyspark.ml.linalg import VectorUDT, SparseVector
11+
from onnx.defs import onnx_opset_version
12+
from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER
13+
from onnxmltools import convert_sparkml
14+
from onnxmltools.convert.common.data_types import FloatTensorType
15+
from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results
16+
from tests.sparkml import SparkMlTestCase
17+
18+
19+
TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version())
20+
21+
22+
class TestSparkmlMLPClassifier(SparkMlTestCase):
23+
@unittest.skipIf(sys.version_info < (3, 8), reason="pickle fails on python 3.7")
24+
def test_model_mlp_classifier_binary_class(self):
25+
this_script_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
26+
input_path = os.path.join(this_script_dir, "data", "sample_libsvm_data.txt")
27+
original_data = self.spark.read.format("libsvm").load(input_path)
28+
#
29+
# truncate the features
30+
#
31+
self.spark.udf.register(
32+
"truncateFeatures", lambda x: SparseVector(100, range(0, 100), x.toArray()[30:130]), VectorUDT()
33+
)
34+
35+
data = original_data.selectExpr("label", "truncateFeatures(features) as features")
36+
37+
mlp = MultilayerPerceptronClassifier(maxIter=100, tol=0.0001, seed=137, layers=[100, 20, 5, 2],)
38+
model: MultilayerPerceptronClassificationModel = mlp.fit(data)
39+
40+
# the name of the input for Logistic Regression is 'features'
41+
C = model.numFeatures
42+
model_onnx = convert_sparkml(
43+
model,
44+
"sparkml multilayer perceptron classifier",
45+
[("features", FloatTensorType([None, C]))],
46+
target_opset=TARGET_OPSET,
47+
)
48+
49+
self.assertTrue(model_onnx is not None)
50+
51+
# run the model
52+
predicted = model.transform(data)
53+
# predicted.select("prediction", "probability", "label").show(100, truncate=False)
54+
55+
data_np = data.toPandas().features.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32)
56+
expected = [
57+
predicted.toPandas().prediction.values.astype(numpy.float32),
58+
predicted.toPandas().probability.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32),
59+
]
60+
61+
paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlMLPClassifier")
62+
onnx_model_path = paths[-1]
63+
output, output_shapes = run_onnx_model(["prediction", "probability"], data_np, onnx_model_path)
64+
compare_results(expected, output, decimal=5)
65+
66+
67+
if __name__ == "__main__":
68+
unittest.main()

0 commit comments

Comments
 (0)