Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/benchmark/run_amlb.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class BenchmarkRunConfig:

def _default_sampling_config(seed: int) -> Dict[str, Any]:
return {
'strategy_kind': 'subset',
'provider': 'sampling_zoo',
'strategy': 'random',
'strategy_params': {},
Expand Down
32 changes: 30 additions & 2 deletions fedot/api/api_utils/api_composer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import gc
import time
from copy import deepcopy
from typing import List, Optional, Sequence, Tuple, Union

Expand All @@ -18,7 +19,8 @@
from fedot.core.composer.composer_builder import ComposerBuilder
from fedot.core.composer.gp_composer.gp_composer import GPComposer
from fedot.core.constants import DEFAULT_TUNING_ITERATIONS_NUMBER
from fedot.core.data.data import InputData
from fedot.core.data.data import InputData, InputDataList
from fedot.core.pipelines.pipeline_ensemble import PipelineEnsemble
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.metrics_repository import MetricIDType
Expand Down Expand Up @@ -112,6 +114,32 @@ def obtain_model(self, train_data: InputData) -> Tuple[Pipeline, Sequence[Pipeli
self.log.message('Model generation finished')
return best_pipeline, best_pipeline_candidates, gp_composer.history

def obtain_ensemble_model(self, train_data_list: InputDataList) -> \
Tuple[PipelineEnsemble, Sequence[Sequence[Pipeline]], List[OptHistory]]:
pipelines: List[Pipeline] = []
best_models: List[Sequence[Pipeline]] = []
histories: List[OptHistory] = []
initial_timeout = self.params.timeout
started_at = time.perf_counter()

for chunk_data in train_data_list:
if initial_timeout is not None:
elapsed_minutes = (time.perf_counter() - started_at) / 60.0
remaining_minutes = max(0.0, initial_timeout - elapsed_minutes)
remaining_chunks = max(1, len(train_data_list) - len(pipelines))
self.params.timeout = remaining_minutes / remaining_chunks

pipeline, best_pipeline_candidates, history = self.obtain_model(chunk_data)
if pipeline is None:
raise ValueError('No models were found for one of the chunks')
pipelines.append(pipeline)
best_models.append(best_pipeline_candidates)
histories.append(history)

self.params.timeout = initial_timeout
ensemble = PipelineEnsemble(pipelines)
return ensemble, best_models, histories

def propose_and_fit_initial_assumption(self, train_data: InputData) -> Tuple[Sequence[Pipeline], Pipeline]:
""" Method for obtaining and fitting initial assumption"""
available_operations = self.params.get('available_operations')
Expand Down Expand Up @@ -210,4 +238,4 @@ def tune_final_pipeline(self, train_data: InputData,
tuned_pipeline = tuner.tune(pipeline_gp_composed)
self.log.message('Hyperparameters tuning finished')
self.was_tuned = tuner.was_tuned
return tuned_pipeline
return tuned_pipeline
4 changes: 3 additions & 1 deletion fedot/api/api_utils/api_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from fedot.core.data.data import InputData, OutputData, data_type_is_table
from fedot.core.data.data_preprocessing import convert_into_column
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.pipelines.pipeline_ensemble import PipelineEnsemble
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.ts_wrappers import in_sample_ts_forecast, convert_forecast_to_output
from fedot.core.repository.tasks import Task, TaskTypesEnum
Expand Down Expand Up @@ -85,7 +86,8 @@ def define_data(self,
data = self.preprocessor.obligatory_prepare_for_fit(data)
return data

def define_predictions(self, current_pipeline: Pipeline, test_data: Union[InputData, MultiModalData],
def define_predictions(self, current_pipeline: Union[Pipeline, PipelineEnsemble],
test_data: Union[InputData, MultiModalData],
in_sample: bool = False, validation_blocks: int = None) -> OutputData:
""" Prepare predictions """
forecast_length = getattr(test_data.task.task_params, 'forecast_length', None)
Expand Down
4 changes: 2 additions & 2 deletions fedot/api/api_utils/assumptions/assumption_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class PresetSpec:

def default_repository_name_for_data(data) -> str:
if data.data_type == DataTypesEnum.multi_ts:
return RepositoryKind.ALL.value
return RepositoryKind.MODEL.value
return RepositoryKind.all.value
return RepositoryKind.model.value


def required_operations_for_data(data, data_type: DataTypesEnum) -> Tuple[str, ...]:
Expand Down
42 changes: 30 additions & 12 deletions fedot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
from fedot.api.api_utils.predefined_model import PredefinedModel
from fedot.api.sampling_stage.executor import SamplingStageExecutor
from fedot.core.constants import DEFAULT_API_TIMEOUT_MINUTES, DEFAULT_TUNING_ITERATIONS_NUMBER
from fedot.core.data.data import InputData, OutputData, PathType
from fedot.core.data.data import InputData, InputDataList, OutputData, PathType
from fedot.core.data.multi_modal import MultiModalData
from fedot.core.data.visualisation import plot_biplot, plot_forecast, plot_roc_auc
from fedot.core.optimisers.objective import PipelineObjectiveEvaluate
from fedot.core.optimisers.objective.metrics_objective import MetricsObjective
from fedot.core.pipelines.pipeline_ensemble import PipelineEnsemble
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.ts_wrappers import convert_forecast_to_output, out_of_sample_ts_forecast
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
Expand Down Expand Up @@ -118,13 +119,13 @@ def __init__(self,
self.target: Optional[TargetType] = None
self.prediction: Optional[OutputData] = None
self._is_in_sample_prediction = True
self.train_data: Optional[InputData] = None
self.train_data: Optional[Union[InputData, InputDataList]] = None
self.test_data: Optional[InputData] = None

# Outputs
self.current_pipeline: Optional[Pipeline] = None
self.best_models: Sequence[Pipeline] = ()
self.history: Optional[OptHistory] = None
self.current_pipeline: Optional[Union[Pipeline, PipelineEnsemble]] = None
self.best_models: Sequence[Union[Pipeline, Sequence[Pipeline]]] = ()
self.history: Optional[Union[OptHistory, Sequence[OptHistory]]] = None
self.sampling_stage_metadata: Optional[dict] = None

fedot_composer_timer.reset_timer()
Expand Down Expand Up @@ -202,7 +203,12 @@ def fit(self,
api_preprocessor=self.data_processor.preprocessor,
).fit()
else:
self.current_pipeline, self.best_models, self.history = self.api_composer.obtain_model(self.train_data)
if isinstance(self.train_data, list) and all(isinstance(x, InputData) for x in self.train_data):
self.current_pipeline, self.best_models, self.history = \
self.api_composer.obtain_ensemble_model(self.train_data)
else:
self.current_pipeline, self.best_models, self.history = \
self.api_composer.obtain_model(self.train_data)

if self.current_pipeline is None:
raise ValueError('No models were found')
Expand All @@ -219,13 +225,20 @@ def fit(self,
self.log.message('Already fitted initial pipeline is used')

# Merge API & pipelines encoders if it is required
self.current_pipeline.preprocessor = BasePreprocessor.merge_preprocessors(
merged_preprocessor = BasePreprocessor.merge_preprocessors(
api_preprocessor=self.data_processor.preprocessor,
pipeline_preprocessor=self.current_pipeline.preprocessor,
use_auto_preprocessing=self.params.get('use_auto_preprocessing')
)
self.current_pipeline.preprocessor = merged_preprocessor
if isinstance(self.current_pipeline, PipelineEnsemble):
for pipeline in self.current_pipeline.pipelines:
pipeline.preprocessor = merged_preprocessor

self.log.message(f'Final pipeline: {graph_structure(self.current_pipeline)}')
if isinstance(self.current_pipeline, Pipeline):
self.log.message(f'Final pipeline: {graph_structure(self.current_pipeline)}')
else:
self.log.message(f'Final pipeline ensemble: {len(self.current_pipeline.pipelines)} pipelines')

return self.current_pipeline
finally:
Expand Down Expand Up @@ -258,6 +271,8 @@ def tune(self,
"""
if self.current_pipeline is None:
raise ValueError(NOT_FITTED_ERR_MSG)
if isinstance(self.current_pipeline, PipelineEnsemble):
raise ValueError('Tuning for pipeline ensembles is not supported yet.')

with fedot_composer_timer.launch_tuning('post'):
tune_plan = build_tune_execution_plan(
Expand Down Expand Up @@ -618,15 +633,18 @@ def _run_sampling_stage_if_necessary(self):
)

def _train_pipeline_on_full_dataset(self, recommendations: Optional[dict],
full_train_not_preprocessed: Union[InputData, MultiModalData]):
full_train_not_preprocessed: Union[InputData, InputDataList, MultiModalData]):
"""Applies training procedure for obtained pipeline if dataset was clipped
"""

if recommendations is not None:
# if data was cut we need to refit pipeline on full data
self.data_processor.accept_and_apply_recommendations(full_train_not_preprocessed,
{k: v for k, v in recommendations.items()
if k != 'cut'})
cleaned_recommendations = {k: v for k, v in recommendations.items() if k != 'cut'}
if isinstance(full_train_not_preprocessed, list):
for chunk_data in full_train_not_preprocessed:
self.data_processor.accept_and_apply_recommendations(chunk_data, cleaned_recommendations)
else:
self.data_processor.accept_and_apply_recommendations(full_train_not_preprocessed, cleaned_recommendations)
self.current_pipeline.fit(
full_train_not_preprocessed,
n_jobs=self.params.n_jobs
Expand Down
21 changes: 19 additions & 2 deletions fedot/api/sampling_stage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
from fedot.api.sampling_stage.config import SamplingConfig, validate_sampling_config
from fedot.api.sampling_stage.config import (
SamplingChunkingConfig,
SamplingConfig,
SamplingConfigBase,
SamplingSubsetConfig,
validate_sampling_config,
)
from fedot.api.sampling_stage.executor import SamplingStageExecutor, SamplingStageOutput
from fedot.api.sampling_stage.providers import SamplingProvider, SamplingProviderResult, SamplingZooProvider
from fedot.api.sampling_stage.providers import (
SamplingProvider,
SamplingProviderResult,
SamplingSubsetResult,
SamplingChunkingResult,
SamplingZooProvider,
)

__all__ = [
'SamplingConfig',
'SamplingConfigBase',
'SamplingChunkingConfig',
'SamplingSubsetConfig',
'SamplingProvider',
'SamplingProviderResult',
'SamplingSubsetResult',
'SamplingChunkingResult',
'SamplingStageExecutor',
'SamplingStageOutput',
'SamplingZooProvider',
Expand Down
Loading
Loading