Skip to content

Commit 4edb96d

Browse files
committed
Expose shutdown features to create(...)
1 parent e8fdf9e commit 4edb96d

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

src/data_designer/engine/dataset_builders/column_wise_builder.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,19 @@ def __init__(
6060
processor_configs: list[ProcessorConfig],
6161
resource_provider: ResourceProvider,
6262
registry: DataDesignerRegistry | None = None,
63+
enable_early_shutdown: bool = True,
64+
shutdown_error_rate: float = 0.5,
65+
shutdown_error_window: int = 10,
6366
):
6467
self.batch_manager = DatasetBatchManager(resource_provider.artifact_storage)
6568
self._resource_provider = resource_provider
6669
self._records_to_drop: set[int] = set()
6770
self._registry = registry or DataDesignerRegistry()
6871
self._column_configs = column_configs
6972
self._processors: dict[BuildStage, list[Processor]] = self._initialize_processors(processor_configs)
73+
self._enable_early_shutdown = enable_early_shutdown
74+
self._shutdown_error_rate = shutdown_error_rate
75+
self._shutdown_error_window = shutdown_error_window
7076
self._validate_column_configs()
7177

7278
@property
@@ -222,6 +228,8 @@ def _fan_out_with_threads(self, generator: WithModelGeneration, max_workers: int
222228
column_name=generator.config.name,
223229
result_callback=self._worker_result_callback,
224230
error_callback=self._worker_error_callback,
231+
shutdown_error_rate=self._shutdown_error_rate if self._enable_early_shutdown else 1.0,
232+
shutdown_error_window=self._shutdown_error_window,
225233
) as executor:
226234
for i, record in self.batch_manager.iter_current_batch():
227235
executor.submit(lambda record: generator.generate(record), record, context={"index": i})

src/data_designer/interface/data_designer.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ def create(
154154
*,
155155
num_records: int = DEFAULT_NUM_RECORDS,
156156
dataset_name: str = "dataset",
157+
enable_early_shutdown: bool = True,
158+
shutdown_error_rate: float = 0.5,
159+
shutdown_error_window: int = 10,
157160
) -> DatasetCreationResults:
158161
"""Create dataset and save results to the local artifact storage.
159162
@@ -171,6 +174,15 @@ def create(
171174
a datetime stamp. For example, if the dataset name is "awesome_dataset" and a directory
172175
with the same name already exists, the dataset will be saved to a new directory
173176
with the name "awesome_dataset_2025-01-01_12-00-00".
177+
enable_early_shutdown: If True (default), dataset generation will terminate
178+
early if the error rate exceeds `shutdown_error_rate` after
179+
`shutdown_error_window` tasks complete. Set to False to disable
180+
early shutdown entirely (ignores `shutdown_error_rate` and
181+
`shutdown_error_window`).
182+
shutdown_error_rate: Error rate threshold (0.0-1.0) that triggers early
183+
shutdown. Only used when `enable_early_shutdown=True`. Default is 0.5 (50%).
184+
shutdown_error_window: Minimum number of completed tasks before error rate
185+
monitoring begins. Only used when `enable_early_shutdown=True`. Default is 10.
174186
175187
Returns:
176188
DatasetCreationResults object with methods for loading the generated dataset,
@@ -184,7 +196,13 @@ def create(
184196

185197
resource_provider = self._create_resource_provider(dataset_name, config_builder)
186198

187-
builder = self._create_dataset_builder(config_builder, resource_provider)
199+
builder = self._create_dataset_builder(
200+
config_builder,
201+
resource_provider,
202+
enable_early_shutdown=enable_early_shutdown,
203+
shutdown_error_rate=shutdown_error_rate,
204+
shutdown_error_window=shutdown_error_window,
205+
)
188206

189207
try:
190208
builder.build(num_records=num_records, buffer_size=self._buffer_size)
@@ -334,12 +352,20 @@ def _resolve_model_providers(self, model_providers: list[ModelProvider] | None)
334352
return model_providers or []
335353

336354
def _create_dataset_builder(
337-
self, config_builder: DataDesignerConfigBuilder, resource_provider: ResourceProvider
355+
self,
356+
config_builder: DataDesignerConfigBuilder,
357+
resource_provider: ResourceProvider,
358+
enable_early_shutdown: bool = True,
359+
shutdown_error_rate: float = 0.5,
360+
shutdown_error_window: int = 10,
338361
) -> ColumnWiseDatasetBuilder:
339362
return ColumnWiseDatasetBuilder(
340363
column_configs=compile_dataset_builder_column_configs(config_builder.build(raise_exceptions=True)),
341364
processor_configs=config_builder.get_processor_configs(),
342365
resource_provider=resource_provider,
366+
enable_early_shutdown=enable_early_shutdown,
367+
shutdown_error_rate=shutdown_error_rate,
368+
shutdown_error_window=shutdown_error_window,
343369
)
344370

345371
def _create_dataset_profiler(

0 commit comments

Comments
 (0)