Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f2e7f66
scaffold
andreatgretel Nov 11, 2025
992862a
linting
andreatgretel Nov 11, 2025
b204a30
tests
andreatgretel Nov 12, 2025
7aea832
addressing comments pt1
andreatgretel Nov 13, 2025
e7eb679
advancing in the discussed direction
andreatgretel Nov 18, 2025
41c1929
lint plus tests
andreatgretel Nov 20, 2025
70ec396
cleaning up and adding preview
andreatgretel Nov 20, 2025
d2da10c
name is mandatory
andreatgretel Nov 20, 2025
96e974a
checking membership of set instead of list
andreatgretel Dec 1, 2025
bd9760b
addressing comments pt1
andreatgretel Dec 1, 2025
f158499
lint
andreatgretel Dec 1, 2025
d4f1850
changing example to desired UX
andreatgretel Dec 3, 2025
5cff7f1
moving to different UX
andreatgretel Dec 4, 2025
63f9b35
fixing tests, linting
andreatgretel Dec 8, 2025
bae9884
addressing comment
andreatgretel Dec 8, 2025
7fb42c3
fixes, validating Jinja template
andreatgretel Dec 9, 2025
16c0686
typo
andreatgretel Dec 9, 2025
17891b3
fixing behavior, saving preview to file instead of memory
andreatgretel Dec 9, 2025
5addb45
fixing tests
andreatgretel Dec 9, 2025
4c81a4d
rolling back count since names are required
andreatgretel Dec 9, 2025
c5257b4
addressing comments
andreatgretel Dec 10, 2025
22b1031
linting
andreatgretel Dec 10, 2025
3c8d188
addressing more comments
andreatgretel Dec 10, 2025
48dcea6
more comments
andreatgretel Dec 10, 2025
3dfa7bb
lint
andreatgretel Dec 10, 2025
a9a6168
renaming
andreatgretel Dec 10, 2025
8e6bc66
lint
andreatgretel Dec 10, 2025
e836832
comments
andreatgretel Dec 10, 2025
48e04fe
Optional instead of None
andreatgretel Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/data_designer/config/exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
UniformDistribution,
UniformDistributionParams,
)
from data_designer.config.processors import DropColumnsProcessorConfig, ProcessorType
from data_designer.config.processors import (
DropColumnsProcessorConfig,
ProcessorType,
SchemaTransformProcessorConfig,
)
from data_designer.config.sampler_constraints import ColumnInequalityConstraint, ScalarInequalityConstraint
from data_designer.config.sampler_params import (
BernoulliMixtureSamplerParams,
Expand Down Expand Up @@ -69,6 +73,7 @@

def get_config_exports() -> list[str]:
return [
SchemaTransformProcessorConfig.__name__,
BernoulliMixtureSamplerParams.__name__,
BernoulliSamplerParams.__name__,
BinomialSamplerParams.__name__,
Expand Down
9 changes: 6 additions & 3 deletions src/data_designer/config/preview_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from __future__ import annotations

from typing import Optional
from typing import Optional, Union

import pandas as pd

Expand All @@ -19,14 +19,17 @@ def __init__(
config_builder: DataDesignerConfigBuilder,
dataset: Optional[pd.DataFrame] = None,
analysis: Optional[DatasetProfilerResults] = None,
processor_artifacts: Optional[dict[str, Union[list[str], str]]] = None,
):
"""Creates a new instance with results from a Data Designer preview run.

Args:
config_builder: Data Designer configuration builder.
dataset: Dataset of the preview run.
analysis: Analysis of the preview run.
processor_artifacts: Artifacts generated by the processors.
"""
self.dataset: pd.DataFrame | None = dataset
self.analysis: DatasetProfilerResults | None = analysis
self.dataset: Optional[pd.DataFrame] = dataset
self.analysis: Optional[DatasetProfilerResults] = analysis
self.processor_artifacts: Optional[dict[str, Union[list[str], str]]] = processor_artifacts
self._config_builder = config_builder
48 changes: 46 additions & 2 deletions src/data_designer/config/processors.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import json
from abc import ABC
from enum import Enum
from typing import Literal
from typing import Any, Literal

from pydantic import Field, field_validator

from data_designer.config.base import ConfigBase
from data_designer.config.dataset_builders import BuildStage
from data_designer.config.errors import InvalidConfigError

SUPPORTED_STAGES = [BuildStage.POST_BATCH]


class ProcessorType(str, Enum):
DROP_COLUMNS = "drop_columns"
SCHEMA_TRANSFORM = "schema_transform"


class ProcessorConfig(ConfigBase, ABC):
name: str = Field(
description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.",
)
build_stage: BuildStage = Field(
..., description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}"
default=BuildStage.POST_BATCH,
description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}",
)

@field_validator("build_stage")
Expand All @@ -34,8 +41,45 @@ def validate_build_stage(cls, v: BuildStage) -> BuildStage:
def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs) -> ProcessorConfig:
if processor_type == ProcessorType.DROP_COLUMNS:
return DropColumnsProcessorConfig(**kwargs)
elif processor_type == ProcessorType.SCHEMA_TRANSFORM:
return SchemaTransformProcessorConfig(**kwargs)


class DropColumnsProcessorConfig(ProcessorConfig):
column_names: list[str]
processor_type: Literal[ProcessorType.DROP_COLUMNS] = ProcessorType.DROP_COLUMNS


class SchemaTransformProcessorConfig(ProcessorConfig):
template: dict[str, Any] = Field(
...,
description="""
Dictionary specifying columns and templates to use in the new dataset with transformed schema.

Each key is a new column name, and each value is an object containing Jinja2 templates - for instance, a string or a list of strings.
Values must be JSON-serializable.

Example:

```python
template = {
"list_of_strings": ["{{ col1 }}", "{{ col2 }}"],
"uppercase_string": "{{ col1 | upper }}",
"lowercase_string": "{{ col2 | lower }}",
}
```

The above templates will create an new dataset with three columns: "list_of_strings", "uppercase_string", and "lowercase_string".
References to columns "col1" and "col2" in the templates will be replaced with the actual values of the columns in the dataset.
""",
)
processor_type: Literal[ProcessorType.SCHEMA_TRANSFORM] = ProcessorType.SCHEMA_TRANSFORM

@field_validator("template")
def validate_template(cls, v: dict[str, Any]) -> dict[str, Any]:
try:
json.dumps(v)
except TypeError as e:
if "not JSON serializable" in str(e):
raise InvalidConfigError("Template must be JSON serializable")
return v
35 changes: 33 additions & 2 deletions src/data_designer/config/utils/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from data_designer.config.column_types import ColumnConfigT, DataDesignerColumnType, column_type_is_llm_generated
from data_designer.config.processors import ProcessorConfig, ProcessorType
from data_designer.config.utils.constants import RICH_CONSOLE_THEME
from data_designer.config.utils.misc import can_run_data_designer_locally
from data_designer.config.utils.misc import (
can_run_data_designer_locally,
get_prompt_template_keywords,
)
from data_designer.config.validator_params import ValidatorType


Expand Down Expand Up @@ -63,6 +66,7 @@ def validate_data_designer_config(
violations.extend(validate_expression_references(columns=columns, allowed_references=allowed_references))
violations.extend(validate_columns_not_all_dropped(columns=columns))
violations.extend(validate_drop_columns_processor(columns=columns, processor_configs=processor_configs))
violations.extend(validate_schema_transform_processor(columns=columns, processor_configs=processor_configs))
if not can_run_data_designer_locally():
violations.extend(validate_local_only_columns(columns=columns))
return violations
Expand Down Expand Up @@ -271,7 +275,7 @@ def validate_drop_columns_processor(
columns: list[ColumnConfigT],
processor_configs: list[ProcessorConfig],
) -> list[Violation]:
all_column_names = set([c.name for c in columns])
all_column_names = {c.name for c in columns}
for processor_config in processor_configs:
if processor_config.processor_type == ProcessorType.DROP_COLUMNS:
invalid_columns = set(processor_config.column_names) - all_column_names
Expand All @@ -288,6 +292,33 @@ def validate_drop_columns_processor(
return []


def validate_schema_transform_processor(
columns: list[ColumnConfigT],
processor_configs: list[ProcessorConfig],
) -> list[Violation]:
violations = []

all_column_names = {c.name for c in columns}
for processor_config in processor_configs:
if processor_config.processor_type == ProcessorType.SCHEMA_TRANSFORM:
for col, template in processor_config.template.items():
template_keywords = get_prompt_template_keywords(template)
invalid_keywords = set(template_keywords) - all_column_names
if len(invalid_keywords) > 0:
invalid_keywords = ", ".join([f"'{k}'" for k in invalid_keywords])
message = f"Ancillary dataset processor attempts to reference columns {invalid_keywords} in the template for '{col}', but the columns are not defined in the dataset."
violations.append(
Violation(
column=None,
type=ViolationType.INVALID_REFERENCE,
message=message,
level=ViolationLevel.ERROR,
)
)

return violations


def validate_expression_references(
columns: list[ColumnConfigT],
allowed_references: list[str],
Expand Down
32 changes: 32 additions & 0 deletions src/data_designer/config/utils/visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,17 @@ def _record_sampler_dataset(self) -> pd.DataFrame:
else:
raise DatasetSampleDisplayError("No valid dataset found in results object.")

def _has_processor_artifacts(self) -> bool:
return hasattr(self, "processor_artifacts") and self.processor_artifacts is not None

def display_sample_record(
self,
index: Optional[int] = None,
*,
hide_seed_columns: bool = False,
syntax_highlighting_theme: str = "dracula",
background_color: Optional[str] = None,
processors_to_display: Optional[list[str]] = None,
) -> None:
"""Display a sample record from the Data Designer dataset preview.

Expand All @@ -90,6 +94,7 @@ def display_sample_record(
documentation from `rich` for information about available themes.
background_color: Background color to use for the record. See the `Syntax`
documentation from `rich` for information about available background colors.
processors_to_display: List of processors to display the artifacts for. If None, all processors will be displayed.
"""
i = index or self._display_cycle_index

Expand All @@ -99,8 +104,25 @@ def display_sample_record(
except IndexError:
raise DatasetSampleDisplayError(f"Index {i} is out of bounds for dataset of length {num_records}.")

processor_data_to_display = None
if self._has_processor_artifacts() and len(self.processor_artifacts) > 0:
if processors_to_display is None:
processors_to_display = list(self.processor_artifacts.keys())

if len(processors_to_display) > 0:
processor_data_to_display = {}
for processor in processors_to_display:
if (
isinstance(self.processor_artifacts[processor], list)
and len(self.processor_artifacts[processor]) == num_records
):
processor_data_to_display[processor] = self.processor_artifacts[processor][i]
else:
processor_data_to_display[processor] = self.processor_artifacts[processor]

display_sample_record(
record=record,
processor_data_to_display=processor_data_to_display,
config_builder=self._config_builder,
background_color=background_color,
syntax_highlighting_theme=syntax_highlighting_theme,
Expand Down Expand Up @@ -134,6 +156,7 @@ def create_rich_histogram_table(
def display_sample_record(
record: Union[dict, pd.Series, pd.DataFrame],
config_builder: DataDesignerConfigBuilder,
processor_data_to_display: Optional[dict[str, Union[list[str], str]]] = None,
background_color: Optional[str] = None,
syntax_highlighting_theme: str = "dracula",
record_index: Optional[int] = None,
Expand Down Expand Up @@ -230,6 +253,15 @@ def display_sample_record(
table.add_row(*row)
render_list.append(pad_console_element(table, (1, 0, 1, 0)))

if processor_data_to_display and len(processor_data_to_display) > 0:
for processor_name, processor_data in processor_data_to_display.items():
table = Table(title=f"Processor Outputs: {processor_name}", **table_kws)
table.add_column("Name")
table.add_column("Value")
for col, value in processor_data.items():
table.add_row(col, convert_to_row_element(value))
render_list.append(pad_console_element(table, (1, 0, 1, 0)))

if record_index is not None:
index_label = Text(f"[index: {record_index}]", justify="center")
render_list.append(index_label)
Expand Down
16 changes: 13 additions & 3 deletions src/data_designer/engine/dataset_builders/artifact_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BatchStage(StrEnum):
PARTIAL_RESULT = "partial_results_path"
FINAL_RESULT = "final_dataset_path"
DROPPED_COLUMNS = "dropped_columns_dataset_path"
PROCESSORS_OUTPUTS = "processors_outputs_path"


class ArtifactStorage(BaseModel):
Expand All @@ -33,6 +34,7 @@ class ArtifactStorage(BaseModel):
final_dataset_folder_name: str = "parquet-files"
partial_results_folder_name: str = "tmp-partial-parquet-files"
dropped_columns_folder_name: str = "dropped-columns-parquet-files"
processors_outputs_folder_name: str = "processors-files"

@property
def artifact_path_exists(self) -> bool:
Expand Down Expand Up @@ -70,6 +72,10 @@ def metadata_file_path(self) -> Path:
def partial_results_path(self) -> Path:
return self.base_dataset_path / self.partial_results_folder_name

@property
def processors_outputs_path(self) -> Path:
return self.base_dataset_path / self.processors_outputs_folder_name

@field_validator("artifact_path")
def validate_artifact_path(cls, v: Union[Path, str]) -> Path:
v = Path(v)
Expand All @@ -84,6 +90,7 @@ def validate_folder_names(self):
self.final_dataset_folder_name,
self.partial_results_folder_name,
self.dropped_columns_folder_name,
self.processors_outputs_folder_name,
]

for name in folder_names:
Expand Down Expand Up @@ -169,19 +176,22 @@ def write_batch_to_parquet_file(
batch_number: int,
dataframe: pd.DataFrame,
batch_stage: BatchStage,
subfolder: str | None = None,
) -> Path:
file_path = self.create_batch_file_path(batch_number, batch_stage=batch_stage)
self.write_parquet_file(file_path.name, dataframe, batch_stage)
self.write_parquet_file(file_path.name, dataframe, batch_stage, subfolder=subfolder)
return file_path

def write_parquet_file(
self,
parquet_file_name: str,
dataframe: pd.DataFrame,
batch_stage: BatchStage,
subfolder: str | None = None,
) -> Path:
self.mkdir_if_needed(self._get_stage_path(batch_stage))
file_path = self._get_stage_path(batch_stage) / parquet_file_name
subfolder = subfolder or ""
self.mkdir_if_needed(self._get_stage_path(batch_stage) / subfolder)
file_path = self._get_stage_path(batch_stage) / subfolder / parquet_file_name
dataframe.to_parquet(file_path, index=False)
return file_path

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> di
processors[BuildStage.POST_BATCH].append( # as post-batch by default
DropColumnsProcessor(
config=DropColumnsProcessorConfig(
name="default_drop_columns_processor",
column_names=columns_to_drop,
build_stage=BuildStage.POST_BATCH,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DropColumnsProcessor(Processor[DropColumnsProcessorConfig]):
@staticmethod
def metadata() -> ConfigurableTaskMetadata:
return ConfigurableTaskMetadata(
name="drop_columns",
name="drop_columns_processor",
description="Drop columns from the input dataset.",
required_resources=None,
)
Expand Down
3 changes: 3 additions & 0 deletions src/data_designer/engine/processing/processors/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from data_designer.config.processors import (
DropColumnsProcessorConfig,
ProcessorType,
SchemaTransformProcessorConfig,
)
from data_designer.engine.processing.processors.base import Processor
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor
from data_designer.engine.registry.base import TaskRegistry


Expand All @@ -16,5 +18,6 @@ class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ...

def create_default_processor_registry() -> ProcessorRegistry:
registry = ProcessorRegistry()
registry.register(ProcessorType.SCHEMA_TRANSFORM, SchemaTransformProcessor, SchemaTransformProcessorConfig, False)
registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False)
return registry
Loading