Skip to content

Commit b7d6f2f

Browse files
refactor: improve and simplify sampling integration
1 parent 66be6fa commit b7d6f2f

File tree

10 files changed

+129
-349
lines changed

10 files changed

+129
-349
lines changed

fedot/api/api_utils/api_composer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import gc
3+
import time
34
from copy import deepcopy
45
from typing import List, Optional, Sequence, Tuple, Union
56

@@ -118,15 +119,24 @@ def obtain_ensemble_model(self, train_data_list: InputDataList) -> \
118119
pipelines: List[Pipeline] = []
119120
best_models: List[Sequence[Pipeline]] = []
120121
histories: List[OptHistory] = []
122+
initial_timeout = self.params.timeout
123+
started_at = time.perf_counter()
121124

122125
for chunk_data in train_data_list:
126+
if initial_timeout is not None:
127+
elapsed_minutes = (time.perf_counter() - started_at) / 60.0
128+
remaining_minutes = max(0.0, initial_timeout - elapsed_minutes)
129+
remaining_chunks = max(1, len(train_data_list) - len(pipelines))
130+
self.params.timeout = remaining_minutes / remaining_chunks
131+
123132
pipeline, best_pipeline_candidates, history = self.obtain_model(chunk_data)
124133
if pipeline is None:
125134
raise ValueError('No models were found for one of the chunks')
126135
pipelines.append(pipeline)
127136
best_models.append(best_pipeline_candidates)
128137
histories.append(history)
129138

139+
self.params.timeout = initial_timeout
130140
ensemble = PipelineEnsemble(pipelines)
131141
return ensemble, best_models, histories
132142

fedot/api/api_utils/assumptions/assumption_rules.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class PresetSpec:
3232

3333
def default_repository_name_for_data(data) -> str:
3434
if data.data_type == DataTypesEnum.multi_ts:
35-
return RepositoryKind.ALL.value
36-
return RepositoryKind.MODEL.value
35+
return RepositoryKind.all.value
36+
return RepositoryKind.model.value
3737

3838

3939
def required_operations_for_data(data, data_type: DataTypesEnum) -> Tuple[str, ...]:

fedot/api/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def fit(self,
203203
api_preprocessor=self.data_processor.preprocessor,
204204
).fit()
205205
else:
206-
if isinstance(self.train_data, InputDataList):
206+
if isinstance(self.train_data, list) and all(isinstance(x, InputData) for x in self.train_data):
207207
self.current_pipeline, self.best_models, self.history = \
208208
self.api_composer.obtain_ensemble_model(self.train_data)
209209
else:

fedot/api/sampling_stage/executor.py

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,41 +29,38 @@ def __init__(self,
2929
sampling_config: Dict[str, Any],
3030
task_type: TaskTypesEnum,
3131
total_timeout_minutes: Optional[float],
32-
log: Optional[LoggerAdapter] = None,
33-
provider: Optional[SamplingProvider] = None):
32+
log: Optional[LoggerAdapter] = None):
3433
self.config: SamplingConfig = validate_sampling_config(sampling_config)
3534
if self.config is None:
3635
raise ValueError('Sampling stage config must not be None when executor is created.')
3736

3837
self.task_type = task_type
3938
self.total_timeout_minutes = total_timeout_minutes
4039
self.log = log or default_log(self)
41-
self.provider = provider
40+
self.provider = self._create_provider(self.config.provider)
4241

4342
def execute(self, train_data: InputData) -> SamplingStageOutput:
4443
self._validate_task_compatibility(train_data)
4544

4645
started_at = time.perf_counter()
4746
budget_seconds = self._compute_budget_seconds()
4847

49-
provider = self.provider or self._create_provider(self.config.provider)
5048
if self.config.strategy_kind == 'chunking':
51-
return self._execute_chunking(train_data, provider, started_at, budget_seconds)
49+
return self._execute_chunking(train_data, started_at, budget_seconds)
5250
elif self.config.strategy_kind == 'subset':
53-
return self.execute_subset(train_data, provider, started_at, budget_seconds)
51+
return self._execute_subset(train_data, started_at, budget_seconds)
5452
else:
5553
raise ValueError(f'Unknown strategy_kind: {self.config.strategy_kind}')
5654

57-
def execute_subset(self,
55+
def _execute_subset(self,
5856
train_data: InputData,
59-
provider: SamplingProvider,
6057
started_at: float,
6158
budget_seconds: float) -> SamplingStageOutput:
62-
effective_size_result = self._select_effective_ratio(train_data, provider, started_at, budget_seconds)
59+
effective_size_result = self._select_effective_ratio(train_data, started_at, budget_seconds)
6360

6461
self._raise_if_budget_exceeded(started_at, budget_seconds)
6562
remaining_budget = self._remaining_budget(started_at, budget_seconds)
66-
final_provider_result = provider.sample(
63+
final_provider_result = self.provider.sample(
6764
features=np.asarray(train_data.features),
6865
target=self._flatten_target(train_data.target),
6966
strategy=self.config.strategy,
@@ -106,13 +103,12 @@ def execute_subset(self,
106103

107104
def _execute_chunking(self,
108105
train_data: InputData,
109-
provider: SamplingProvider,
110106
started_at: float,
111107
budget_seconds: float) -> SamplingStageOutput:
112108
self._raise_if_budget_exceeded(started_at, budget_seconds)
113109
remaining_budget = self._remaining_budget(started_at, budget_seconds)
114110

115-
provider_result = provider.sample(
111+
provider_result = self.provider.sample(
116112
features=np.asarray(train_data.features),
117113
target=self._flatten_target(train_data.target),
118114
strategy=self.config.strategy,
@@ -124,8 +120,6 @@ def _execute_chunking(self,
124120
)
125121

126122
partitions = provider_result.partitions
127-
if not isinstance(partitions, dict) or len(partitions) == 0:
128-
raise ValueError('Chunking strategy did not return partitions.')
129123

130124
chunked_data = self._partitions_to_input_data_list(partitions, train_data)
131125
rows_after = sum(len(chunk.idx) for chunk in chunked_data)
@@ -137,8 +131,8 @@ def _execute_chunking(self,
137131
'status': 'applied',
138132
'provider': self.config.provider,
139133
'strategy': self.config.strategy,
140-
'rows_before': int(len(train_data.idx)),
141-
'rows_after': int(rows_after),
134+
'rows_before': len(train_data.idx),
135+
'rows_after': rows_after,
142136
'elapsed_seconds': elapsed_seconds,
143137
'budget_seconds': budget_seconds,
144138
'artifact_mode': self.config.artifact_mode,
@@ -169,7 +163,6 @@ def _validate_task_compatibility(self, train_data: InputData) -> None:
169163

170164
def _select_effective_ratio(self,
171165
train_data: InputData,
172-
provider: SamplingProvider,
173166
started_at: float,
174167
budget_seconds: float) -> Dict[str, Any]:
175168
train_split, valid_split = self._split_for_protocol(train_data)
@@ -183,7 +176,7 @@ def _select_effective_ratio(self,
183176

184177
for ratio in sorted_ratios:
185178
self._raise_if_budget_exceeded(started_at, budget_seconds)
186-
provider_result = provider.sample(
179+
provider_result = self.provider.sample(
187180
features=np.asarray(train_split.features),
188181
target=self._flatten_target(train_split.target),
189182
strategy=self.config.strategy,
@@ -371,15 +364,6 @@ def _subset_by_positions(data: InputData, positions: np.ndarray) -> InputData:
371364
features_names=data.features_names,
372365
)
373366

374-
@staticmethod
375-
def _take_feature_slice(features: Any, indices: Any) -> Any:
376-
if isinstance(features, pd.DataFrame):
377-
try:
378-
return features.loc[indices]
379-
except Exception:
380-
return features.iloc[indices]
381-
return features[indices]
382-
383367
@staticmethod
384368
def _take_target_slice(target: Any, indices: Any) -> Any:
385369
if target is None:
@@ -397,36 +381,33 @@ def _partitions_to_input_data_list(partitions: Dict[str, Any],
397381
input_data_list: InputDataList = []
398382

399383
for partition_name, partition_data in partitions.items():
400-
del partition_name
401384
if isinstance(partition_data, dict):
402385
X_partition = partition_data['feature']
403386
y_partition = partition_data['target']
404387

405388
if isinstance(X_partition, pd.DataFrame):
406389
indices = X_partition.index.values
390+
X_values = X_partition.to_numpy()
407391
else:
408392
indices = np.arange(len(X_partition))
393+
X_values = np.asarray(X_partition)
409394

410-
if isinstance(X_partition, pd.DataFrame):
411-
X_values = X_partition.values
412-
else:
413-
X_values = X_partition
395+
if isinstance(y_partition, (pd.Series, pd.DataFrame)):
396+
y_partition = y_partition.to_numpy()
414397
else:
415398
indices = partition_data
416-
X_values = SamplingStageExecutor._take_feature_slice(original_input_data.features, indices)
417-
y_partition = SamplingStageExecutor._take_target_slice(original_input_data.target, indices)
418-
419-
if isinstance(indices, list):
420-
indices = np.asarray(indices)
399+
if isinstance(original_input_data.features, pd.DataFrame):
400+
X_values = original_input_data.features.loc[indices].to_numpy()
401+
else:
402+
X_values = np.asarray(original_input_data.features)[indices]
403+
if isinstance(original_input_data.target, (pd.Series, pd.DataFrame)):
404+
y_partition = original_input_data.target.to_numpy()[indices]
405+
else:
406+
y_partition = original_input_data.target[indices]
421407

422408
categorical_features = None
423-
if original_input_data.categorical_features is not None and indices is not None:
424-
try:
425-
categorical_features = SamplingStageExecutor._take_feature_slice(
426-
original_input_data.categorical_features, indices
427-
)
428-
except Exception:
429-
categorical_features = None
409+
if original_input_data.categorical_idx is not None and len(original_input_data.categorical_idx) > 0:
410+
categorical_features = X_values[:, original_input_data.categorical_idx]
430411

431412
partition_input_data = InputData(
432413
idx=np.asarray(indices),

0 commit comments

Comments
 (0)