diff --git a/splearn/pipeline.py b/splearn/pipeline.py index 0d1df9f..1f55157 100644 --- a/splearn/pipeline.py +++ b/splearn/pipeline.py @@ -5,9 +5,8 @@ import numpy as np import scipy.sparse as sp from sklearn.externals import six -from sklearn.externals.joblib import Parallel, delayed from sklearn.pipeline import FeatureUnion, Pipeline, _name_estimators -from splearn.rdd import ArrayRDD, DictRDD +from splearn.rdd import DictRDD class SparkPipeline(Pipeline): @@ -218,6 +217,7 @@ class SparkFeatureUnion(FeatureUnion): List of transformer objects to be applied to the data. The first half of each tuple is the name of the transformer. n_jobs: int, optional + Ignored on spark, useful after conversion to scikit object Number of jobs to run in parallel (default 1). transformer_weights: dict, optional Multiplicative weights for features per transformer. @@ -239,9 +239,11 @@ def fit(self, Z, **fit_params): step, param = pname.split('__', 1) fit_params_steps[step][param] = pval - transformers = Parallel(n_jobs=self.n_jobs, backend="threading")( - delayed(_fit_one_transformer)(trans, Z, **fit_params_steps[name]) - for name, trans in self.transformer_list) + transformers = [_fit_one_transformer(trans, + Z, + **fit_params_steps[name]) + for name, trans in self.transformer_list] + self._update_transformer_list(transformers) return self