diff --git a/src/data_designer/config/exports.py b/src/data_designer/config/exports.py index 367710f5..eb976659 100644 --- a/src/data_designer/config/exports.py +++ b/src/data_designer/config/exports.py @@ -32,7 +32,11 @@ UniformDistribution, UniformDistributionParams, ) -from data_designer.config.processors import DropColumnsProcessorConfig, ProcessorType +from data_designer.config.processors import ( + DropColumnsProcessorConfig, + ProcessorType, + SchemaTransformProcessorConfig, +) from data_designer.config.sampler_constraints import ColumnInequalityConstraint, ScalarInequalityConstraint from data_designer.config.sampler_params import ( BernoulliMixtureSamplerParams, @@ -69,6 +73,7 @@ def get_config_exports() -> list[str]: return [ + SchemaTransformProcessorConfig.__name__, BernoulliMixtureSamplerParams.__name__, BernoulliSamplerParams.__name__, BinomialSamplerParams.__name__, diff --git a/src/data_designer/config/preview_results.py b/src/data_designer/config/preview_results.py index bfccb68d..ba983b5f 100644 --- a/src/data_designer/config/preview_results.py +++ b/src/data_designer/config/preview_results.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import Optional +from typing import Optional, Union import pandas as pd @@ -19,6 +19,7 @@ def __init__( config_builder: DataDesignerConfigBuilder, dataset: Optional[pd.DataFrame] = None, analysis: Optional[DatasetProfilerResults] = None, + processor_artifacts: Optional[dict[str, Union[list[str], str]]] = None, ): """Creates a new instance with results from a Data Designer preview run. @@ -26,7 +27,9 @@ def __init__( config_builder: Data Designer configuration builder. dataset: Dataset of the preview run. analysis: Analysis of the preview run. + processor_artifacts: Artifacts generated by the processors. """ - self.dataset: pd.DataFrame | None = dataset - self.analysis: DatasetProfilerResults | None = analysis + self.dataset: Optional[pd.DataFrame] = dataset + self.analysis: Optional[DatasetProfilerResults] = analysis + self.processor_artifacts: Optional[dict[str, Union[list[str], str]]] = processor_artifacts self._config_builder = config_builder diff --git a/src/data_designer/config/processors.py b/src/data_designer/config/processors.py index 6cd0b995..1b3bfa03 100644 --- a/src/data_designer/config/processors.py +++ b/src/data_designer/config/processors.py @@ -1,25 +1,32 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import json from abc import ABC from enum import Enum -from typing import Literal +from typing import Any, Literal from pydantic import Field, field_validator from data_designer.config.base import ConfigBase from data_designer.config.dataset_builders import BuildStage +from data_designer.config.errors import InvalidConfigError SUPPORTED_STAGES = [BuildStage.POST_BATCH] class ProcessorType(str, Enum): DROP_COLUMNS = "drop_columns" + SCHEMA_TRANSFORM = "schema_transform" class ProcessorConfig(ConfigBase, ABC): + name: str = Field( + description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.", + ) build_stage: BuildStage = Field( - ..., description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}" + default=BuildStage.POST_BATCH, + description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}", ) @field_validator("build_stage") @@ -34,8 +41,45 @@ def validate_build_stage(cls, v: BuildStage) -> BuildStage: def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs) -> ProcessorConfig: if processor_type == ProcessorType.DROP_COLUMNS: return DropColumnsProcessorConfig(**kwargs) + elif processor_type == ProcessorType.SCHEMA_TRANSFORM: + return SchemaTransformProcessorConfig(**kwargs) class DropColumnsProcessorConfig(ProcessorConfig): column_names: list[str] processor_type: Literal[ProcessorType.DROP_COLUMNS] = ProcessorType.DROP_COLUMNS + + +class SchemaTransformProcessorConfig(ProcessorConfig): + template: dict[str, Any] = Field( + ..., + description=""" + Dictionary specifying columns and templates to use in the new dataset with transformed schema. + + Each key is a new column name, and each value is an object containing Jinja2 templates - for instance, a string or a list of strings. + Values must be JSON-serializable. + + Example: + + ```python + template = { + "list_of_strings": ["{{ col1 }}", "{{ col2 }}"], + "uppercase_string": "{{ col1 | upper }}", + "lowercase_string": "{{ col2 | lower }}", + } + ``` + + The above templates will create an new dataset with three columns: "list_of_strings", "uppercase_string", and "lowercase_string". + References to columns "col1" and "col2" in the templates will be replaced with the actual values of the columns in the dataset. + """, + ) + processor_type: Literal[ProcessorType.SCHEMA_TRANSFORM] = ProcessorType.SCHEMA_TRANSFORM + + @field_validator("template") + def validate_template(cls, v: dict[str, Any]) -> dict[str, Any]: + try: + json.dumps(v) + except TypeError as e: + if "not JSON serializable" in str(e): + raise InvalidConfigError("Template must be JSON serializable") + return v diff --git a/src/data_designer/config/utils/validation.py b/src/data_designer/config/utils/validation.py index b25ec8b1..d30a067a 100644 --- a/src/data_designer/config/utils/validation.py +++ b/src/data_designer/config/utils/validation.py @@ -18,7 +18,10 @@ from data_designer.config.column_types import ColumnConfigT, DataDesignerColumnType, column_type_is_llm_generated from data_designer.config.processors import ProcessorConfig, ProcessorType from data_designer.config.utils.constants import RICH_CONSOLE_THEME -from data_designer.config.utils.misc import can_run_data_designer_locally +from data_designer.config.utils.misc import ( + can_run_data_designer_locally, + get_prompt_template_keywords, +) from data_designer.config.validator_params import ValidatorType @@ -63,6 +66,7 @@ def validate_data_designer_config( violations.extend(validate_expression_references(columns=columns, allowed_references=allowed_references)) violations.extend(validate_columns_not_all_dropped(columns=columns)) violations.extend(validate_drop_columns_processor(columns=columns, processor_configs=processor_configs)) + violations.extend(validate_schema_transform_processor(columns=columns, processor_configs=processor_configs)) if not can_run_data_designer_locally(): violations.extend(validate_local_only_columns(columns=columns)) return violations @@ -271,7 +275,7 @@ def validate_drop_columns_processor( columns: list[ColumnConfigT], processor_configs: list[ProcessorConfig], ) -> list[Violation]: - all_column_names = set([c.name for c in columns]) + all_column_names = {c.name for c in columns} for processor_config in processor_configs: if processor_config.processor_type == ProcessorType.DROP_COLUMNS: invalid_columns = set(processor_config.column_names) - all_column_names @@ -288,6 +292,33 @@ def validate_drop_columns_processor( return [] +def validate_schema_transform_processor( + columns: list[ColumnConfigT], + processor_configs: list[ProcessorConfig], +) -> list[Violation]: + violations = [] + + all_column_names = {c.name for c in columns} + for processor_config in processor_configs: + if processor_config.processor_type == ProcessorType.SCHEMA_TRANSFORM: + for col, template in processor_config.template.items(): + template_keywords = get_prompt_template_keywords(template) + invalid_keywords = set(template_keywords) - all_column_names + if len(invalid_keywords) > 0: + invalid_keywords = ", ".join([f"'{k}'" for k in invalid_keywords]) + message = f"Ancillary dataset processor attempts to reference columns {invalid_keywords} in the template for '{col}', but the columns are not defined in the dataset." + violations.append( + Violation( + column=None, + type=ViolationType.INVALID_REFERENCE, + message=message, + level=ViolationLevel.ERROR, + ) + ) + + return violations + + def validate_expression_references( columns: list[ColumnConfigT], allowed_references: list[str], diff --git a/src/data_designer/config/utils/visualization.py b/src/data_designer/config/utils/visualization.py index 643c7ff3..1f843c86 100644 --- a/src/data_designer/config/utils/visualization.py +++ b/src/data_designer/config/utils/visualization.py @@ -72,6 +72,9 @@ def _record_sampler_dataset(self) -> pd.DataFrame: else: raise DatasetSampleDisplayError("No valid dataset found in results object.") + def _has_processor_artifacts(self) -> bool: + return hasattr(self, "processor_artifacts") and self.processor_artifacts is not None + def display_sample_record( self, index: Optional[int] = None, @@ -79,6 +82,7 @@ def display_sample_record( hide_seed_columns: bool = False, syntax_highlighting_theme: str = "dracula", background_color: Optional[str] = None, + processors_to_display: Optional[list[str]] = None, ) -> None: """Display a sample record from the Data Designer dataset preview. @@ -90,6 +94,7 @@ def display_sample_record( documentation from `rich` for information about available themes. background_color: Background color to use for the record. See the `Syntax` documentation from `rich` for information about available background colors. + processors_to_display: List of processors to display the artifacts for. If None, all processors will be displayed. """ i = index or self._display_cycle_index @@ -99,8 +104,25 @@ def display_sample_record( except IndexError: raise DatasetSampleDisplayError(f"Index {i} is out of bounds for dataset of length {num_records}.") + processor_data_to_display = None + if self._has_processor_artifacts() and len(self.processor_artifacts) > 0: + if processors_to_display is None: + processors_to_display = list(self.processor_artifacts.keys()) + + if len(processors_to_display) > 0: + processor_data_to_display = {} + for processor in processors_to_display: + if ( + isinstance(self.processor_artifacts[processor], list) + and len(self.processor_artifacts[processor]) == num_records + ): + processor_data_to_display[processor] = self.processor_artifacts[processor][i] + else: + processor_data_to_display[processor] = self.processor_artifacts[processor] + display_sample_record( record=record, + processor_data_to_display=processor_data_to_display, config_builder=self._config_builder, background_color=background_color, syntax_highlighting_theme=syntax_highlighting_theme, @@ -134,6 +156,7 @@ def create_rich_histogram_table( def display_sample_record( record: Union[dict, pd.Series, pd.DataFrame], config_builder: DataDesignerConfigBuilder, + processor_data_to_display: Optional[dict[str, Union[list[str], str]]] = None, background_color: Optional[str] = None, syntax_highlighting_theme: str = "dracula", record_index: Optional[int] = None, @@ -230,6 +253,15 @@ def display_sample_record( table.add_row(*row) render_list.append(pad_console_element(table, (1, 0, 1, 0))) + if processor_data_to_display and len(processor_data_to_display) > 0: + for processor_name, processor_data in processor_data_to_display.items(): + table = Table(title=f"Processor Outputs: {processor_name}", **table_kws) + table.add_column("Name") + table.add_column("Value") + for col, value in processor_data.items(): + table.add_row(col, convert_to_row_element(value)) + render_list.append(pad_console_element(table, (1, 0, 1, 0))) + if record_index is not None: index_label = Text(f"[index: {record_index}]", justify="center") render_list.append(index_label) diff --git a/src/data_designer/engine/dataset_builders/artifact_storage.py b/src/data_designer/engine/dataset_builders/artifact_storage.py index f8e675bd..152ac13d 100644 --- a/src/data_designer/engine/dataset_builders/artifact_storage.py +++ b/src/data_designer/engine/dataset_builders/artifact_storage.py @@ -25,6 +25,7 @@ class BatchStage(StrEnum): PARTIAL_RESULT = "partial_results_path" FINAL_RESULT = "final_dataset_path" DROPPED_COLUMNS = "dropped_columns_dataset_path" + PROCESSORS_OUTPUTS = "processors_outputs_path" class ArtifactStorage(BaseModel): @@ -33,6 +34,7 @@ class ArtifactStorage(BaseModel): final_dataset_folder_name: str = "parquet-files" partial_results_folder_name: str = "tmp-partial-parquet-files" dropped_columns_folder_name: str = "dropped-columns-parquet-files" + processors_outputs_folder_name: str = "processors-files" @property def artifact_path_exists(self) -> bool: @@ -70,6 +72,10 @@ def metadata_file_path(self) -> Path: def partial_results_path(self) -> Path: return self.base_dataset_path / self.partial_results_folder_name + @property + def processors_outputs_path(self) -> Path: + return self.base_dataset_path / self.processors_outputs_folder_name + @field_validator("artifact_path") def validate_artifact_path(cls, v: Union[Path, str]) -> Path: v = Path(v) @@ -84,6 +90,7 @@ def validate_folder_names(self): self.final_dataset_folder_name, self.partial_results_folder_name, self.dropped_columns_folder_name, + self.processors_outputs_folder_name, ] for name in folder_names: @@ -169,9 +176,10 @@ def write_batch_to_parquet_file( batch_number: int, dataframe: pd.DataFrame, batch_stage: BatchStage, + subfolder: str | None = None, ) -> Path: file_path = self.create_batch_file_path(batch_number, batch_stage=batch_stage) - self.write_parquet_file(file_path.name, dataframe, batch_stage) + self.write_parquet_file(file_path.name, dataframe, batch_stage, subfolder=subfolder) return file_path def write_parquet_file( @@ -179,9 +187,11 @@ def write_parquet_file( parquet_file_name: str, dataframe: pd.DataFrame, batch_stage: BatchStage, + subfolder: str | None = None, ) -> Path: - self.mkdir_if_needed(self._get_stage_path(batch_stage)) - file_path = self._get_stage_path(batch_stage) / parquet_file_name + subfolder = subfolder or "" + self.mkdir_if_needed(self._get_stage_path(batch_stage) / subfolder) + file_path = self._get_stage_path(batch_stage) / subfolder / parquet_file_name dataframe.to_parquet(file_path, index=False) return file_path diff --git a/src/data_designer/engine/dataset_builders/column_wise_builder.py b/src/data_designer/engine/dataset_builders/column_wise_builder.py index 822f87c8..beeedd6f 100644 --- a/src/data_designer/engine/dataset_builders/column_wise_builder.py +++ b/src/data_designer/engine/dataset_builders/column_wise_builder.py @@ -244,6 +244,7 @@ def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> di processors[BuildStage.POST_BATCH].append( # as post-batch by default DropColumnsProcessor( config=DropColumnsProcessorConfig( + name="default_drop_columns_processor", column_names=columns_to_drop, build_stage=BuildStage.POST_BATCH, ), diff --git a/src/data_designer/engine/processing/processors/drop_columns.py b/src/data_designer/engine/processing/processors/drop_columns.py index bc996a9e..d5a137fe 100644 --- a/src/data_designer/engine/processing/processors/drop_columns.py +++ b/src/data_designer/engine/processing/processors/drop_columns.py @@ -17,7 +17,7 @@ class DropColumnsProcessor(Processor[DropColumnsProcessorConfig]): @staticmethod def metadata() -> ConfigurableTaskMetadata: return ConfigurableTaskMetadata( - name="drop_columns", + name="drop_columns_processor", description="Drop columns from the input dataset.", required_resources=None, ) diff --git a/src/data_designer/engine/processing/processors/registry.py b/src/data_designer/engine/processing/processors/registry.py index dadcbc33..575b6a3c 100644 --- a/src/data_designer/engine/processing/processors/registry.py +++ b/src/data_designer/engine/processing/processors/registry.py @@ -5,9 +5,11 @@ from data_designer.config.processors import ( DropColumnsProcessorConfig, ProcessorType, + SchemaTransformProcessorConfig, ) from data_designer.engine.processing.processors.base import Processor from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor +from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor from data_designer.engine.registry.base import TaskRegistry @@ -16,5 +18,6 @@ class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ... def create_default_processor_registry() -> ProcessorRegistry: registry = ProcessorRegistry() + registry.register(ProcessorType.SCHEMA_TRANSFORM, SchemaTransformProcessor, SchemaTransformProcessorConfig, False) registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False) return registry diff --git a/src/data_designer/engine/processing/processors/schema_transform.py b/src/data_designer/engine/processing/processors/schema_transform.py new file mode 100644 index 00000000..2927177f --- /dev/null +++ b/src/data_designer/engine/processing/processors/schema_transform.py @@ -0,0 +1,53 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging + +import pandas as pd + +from data_designer.config.processors import SchemaTransformProcessorConfig +from data_designer.engine.configurable_task import ConfigurableTaskMetadata +from data_designer.engine.dataset_builders.artifact_storage import BatchStage +from data_designer.engine.processing.ginja.environment import WithJinja2UserTemplateRendering +from data_designer.engine.processing.processors.base import Processor +from data_designer.engine.processing.utils import deserialize_json_values + +logger = logging.getLogger(__name__) + + +class SchemaTransformProcessor(WithJinja2UserTemplateRendering, Processor[SchemaTransformProcessorConfig]): + @staticmethod + def metadata() -> ConfigurableTaskMetadata: + return ConfigurableTaskMetadata( + name="schema_transform_processor", + description="Generate dataset with transformed schema using a Jinja2 template.", + required_resources=None, + ) + + @property + def template_as_str(self) -> str: + return json.dumps(self.config.template) + + def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None) -> pd.DataFrame: + self.prepare_jinja2_template_renderer(self.template_as_str, data.columns.to_list()) + formatted_records = [ + json.loads(self.render_template(deserialize_json_values(record)).replace("\n", "\\n")) + for record in data.to_dict(orient="records") + ] + formatted_data = pd.DataFrame(formatted_records) + if current_batch_number is not None: + self.artifact_storage.write_batch_to_parquet_file( + batch_number=current_batch_number, + dataframe=formatted_data, + batch_stage=BatchStage.PROCESSORS_OUTPUTS, + subfolder=self.config.name, + ) + else: + self.artifact_storage.write_parquet_file( + parquet_file_name=f"{self.config.name}.parquet", + dataframe=formatted_data, + batch_stage=BatchStage.PROCESSORS_OUTPUTS, + ) + + return data diff --git a/src/data_designer/interface/data_designer.py b/src/data_designer/interface/data_designer.py index 44eed160..0458dc60 100644 --- a/src/data_designer/interface/data_designer.py +++ b/src/data_designer/interface/data_designer.py @@ -249,6 +249,17 @@ def preview( except Exception as e: raise DataDesignerProfilingError(f"🛑 Error profiling preview dataset: {e}") + if builder.artifact_storage.processors_outputs_path.exists(): + processor_artifacts = { + processor_config.name: pd.read_parquet( + builder.artifact_storage.processors_outputs_path / f"{processor_config.name}.parquet", + dtype_backend="pyarrow", + ).to_dict(orient="records") + for processor_config in config_builder.get_processor_configs() + } + else: + processor_artifacts = {} + if ( len(processed_dataset) > 0 and isinstance(analysis, DatasetProfilerResults) @@ -259,6 +270,7 @@ def preview( return PreviewResults( dataset=processed_dataset, analysis=analysis, + processor_artifacts=processor_artifacts, config_builder=config_builder, ) diff --git a/src/data_designer/interface/results.py b/src/data_designer/interface/results.py index 5c768904..263173a7 100644 --- a/src/data_designer/interface/results.py +++ b/src/data_designer/interface/results.py @@ -3,12 +3,15 @@ from __future__ import annotations +from pathlib import Path + import pandas as pd from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.config.config_builder import DataDesignerConfigBuilder from data_designer.config.utils.visualization import WithRecordSamplerMixin from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage +from data_designer.engine.dataset_builders.errors import ArtifactStorageError class DatasetCreationResults(WithRecordSamplerMixin): @@ -53,3 +56,36 @@ def load_dataset(self) -> pd.DataFrame: A pandas DataFrame containing the full generated dataset. """ return self.artifact_storage.load_dataset() + + def load_processor_dataset(self, processor_name: str) -> pd.DataFrame: + """Load the dataset generated by a processor. + + This only works for processors that write their artifacts in Parquet format. + + Args: + processor_name: The name of the processor to load the dataset from. + + Returns: + A pandas DataFrame containing the dataset generated by the processor. + """ + try: + dataset = self.artifact_storage.read_parquet_files( + self.artifact_storage.processors_outputs_path / processor_name + ) + except Exception as e: + raise ArtifactStorageError(f"Failed to load dataset for processor {processor_name}: {e}") + + return dataset + + def get_path_to_processor_artifacts(self, processor_name: str) -> Path: + """Get the path to the artifacts generated by a processor. + + Args: + processor_name: The name of the processor to load the artifact from. + + Returns: + The path to the artifacts. + """ + if not self.artifact_storage.processors_outputs_path.exists(): + raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.") + return self.artifact_storage.processors_outputs_path / processor_name diff --git a/tests/config/test_processors.py b/tests/config/test_processors.py index 7282758c..4dfa0514 100644 --- a/tests/config/test_processors.py +++ b/tests/config/test_processors.py @@ -5,16 +5,20 @@ from pydantic import ValidationError from data_designer.config.dataset_builders import BuildStage +from data_designer.config.errors import InvalidConfigError from data_designer.config.processors import ( DropColumnsProcessorConfig, ProcessorConfig, ProcessorType, + SchemaTransformProcessorConfig, get_processor_config_from_kwargs, ) def test_drop_columns_processor_config_creation(): - config = DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"]) + config = DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] + ) assert config.build_stage == BuildStage.POST_BATCH assert config.column_names == ["col1", "col2"] @@ -25,15 +29,19 @@ def test_drop_columns_processor_config_creation(): def test_drop_columns_processor_config_validation(): # Test unsupported stage raises error with pytest.raises(ValidationError, match="Invalid dataset builder stage"): - DropColumnsProcessorConfig(build_stage=BuildStage.PRE_BATCH, column_names=["col1"]) + DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.PRE_BATCH, column_names=["col1"] + ) # Test missing required field raises error with pytest.raises(ValidationError, match="Field required"): - DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH) + DropColumnsProcessorConfig(name="drop_columns_processor", build_stage=BuildStage.POST_BATCH) def test_drop_columns_processor_config_serialization(): - config = DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"]) + config = DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] + ) # Serialize to dict config_dict = config.model_dump() @@ -46,13 +54,78 @@ def test_drop_columns_processor_config_serialization(): assert config_restored.column_names == config.column_names +def test_schema_transform_processor_config_creation(): + config = SchemaTransformProcessorConfig( + name="output_format_processor", + build_stage=BuildStage.POST_BATCH, + template={"text": "{{ col1 }}"}, + ) + + assert config.build_stage == BuildStage.POST_BATCH + assert config.template == {"text": "{{ col1 }}"} + assert config.processor_type == ProcessorType.SCHEMA_TRANSFORM + assert isinstance(config, ProcessorConfig) + + +def test_schema_transform_processor_config_validation(): + # Test unsupported stage raises error + with pytest.raises(ValidationError, match="Invalid dataset builder stage"): + SchemaTransformProcessorConfig( + name="schema_transform_processor", + build_stage=BuildStage.PRE_BATCH, + template={"text": "{{ col1 }}"}, + ) + + # Test missing required field raises error + with pytest.raises(ValidationError, match="Field required"): + SchemaTransformProcessorConfig(name="schema_transform_processor", build_stage=BuildStage.POST_BATCH) + + # Test invalid template raises error + with pytest.raises(InvalidConfigError, match="Template must be JSON serializable"): + SchemaTransformProcessorConfig( + name="schema_transform_processor", build_stage=BuildStage.POST_BATCH, template={"text": {1, 2, 3}} + ) + + +def test_schema_transform_processor_config_serialization(): + config = SchemaTransformProcessorConfig( + name="schema_transform_processor", + build_stage=BuildStage.POST_BATCH, + template={"text": "{{ col1 }}"}, + ) + + # Serialize to dict + config_dict = config.model_dump() + assert config_dict["build_stage"] == "post_batch" + assert config_dict["template"] == {"text": "{{ col1 }}"} + + # Deserialize from dict + config_restored = SchemaTransformProcessorConfig.model_validate(config_dict) + assert config_restored.build_stage == config.build_stage + assert config_restored.template == config.template + + def test_get_processor_config_from_kwargs(): # Test successful creation - config = get_processor_config_from_kwargs( - ProcessorType.DROP_COLUMNS, build_stage=BuildStage.POST_BATCH, column_names=["col1"] + config_drop_columns = get_processor_config_from_kwargs( + ProcessorType.DROP_COLUMNS, + name="drop_columns_processor", + build_stage=BuildStage.POST_BATCH, + column_names=["col1"], + ) + assert isinstance(config_drop_columns, DropColumnsProcessorConfig) + assert config_drop_columns.column_names == ["col1"] + assert config_drop_columns.processor_type == ProcessorType.DROP_COLUMNS + + config_schema_transform = get_processor_config_from_kwargs( + ProcessorType.SCHEMA_TRANSFORM, + name="output_format_processor", + build_stage=BuildStage.POST_BATCH, + template={"text": "{{ col1 }}"}, ) - assert isinstance(config, DropColumnsProcessorConfig) - assert config.column_names == ["col1"] + assert isinstance(config_schema_transform, SchemaTransformProcessorConfig) + assert config_schema_transform.template == {"text": "{{ col1 }}"} + assert config_schema_transform.processor_type == ProcessorType.SCHEMA_TRANSFORM # Test with unknown processor type returns None from enum import Enum @@ -61,6 +134,6 @@ class UnknownProcessorType(str, Enum): UNKNOWN = "unknown" result = get_processor_config_from_kwargs( - UnknownProcessorType.UNKNOWN, build_stage=BuildStage.POST_BATCH, column_names=["col1"] + UnknownProcessorType.UNKNOWN, name="unknown_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1"] ) assert result is None diff --git a/tests/config/utils/test_validation.py b/tests/config/utils/test_validation.py index 0b5c5684..2045e650 100644 --- a/tests/config/utils/test_validation.py +++ b/tests/config/utils/test_validation.py @@ -14,7 +14,10 @@ ) from data_designer.config.dataset_builders import BuildStage from data_designer.config.models import ImageContext, ModalityDataType -from data_designer.config.processors import DropColumnsProcessorConfig +from data_designer.config.processors import ( + DropColumnsProcessorConfig, + SchemaTransformProcessorConfig, +) from data_designer.config.utils.code_lang import CodeLang from data_designer.config.utils.validation import ( Violation, @@ -26,6 +29,7 @@ validate_data_designer_config, validate_expression_references, validate_prompt_templates, + validate_schema_transform_processor, ) from data_designer.config.validator_params import CodeValidatorParams @@ -98,9 +102,15 @@ COLUMNS = VALID_COLUMNS + INVALID_COLUMNS PROCESSOR_CONFIGS = [ DropColumnsProcessorConfig( + name="drop_columns_processor", column_names=["inexistent_column"], build_stage=BuildStage.POST_BATCH, - ) + ), + SchemaTransformProcessorConfig( + name="schema_transform_processor_invalid_reference", + template={"text": "{{ invalid_reference }}"}, + build_stage=BuildStage.POST_BATCH, + ), ] ALLOWED_REFERENCE = [c.name for c in COLUMNS] @@ -110,12 +120,14 @@ @patch("data_designer.config.utils.validation.validate_expression_references") @patch("data_designer.config.utils.validation.validate_columns_not_all_dropped") @patch("data_designer.config.utils.validation.validate_drop_columns_processor") +@patch("data_designer.config.utils.validation.validate_schema_transform_processor") def test_validate_data_designer_config( mock_validate_columns_not_all_dropped, mock_validate_expression_references, mock_validate_code_validation, mock_validate_prompt_templates, mock_validate_drop_columns_processor, + mock_validate_schema_transform_processor, ): mock_validate_columns_not_all_dropped.return_value = [ Violation( @@ -157,13 +169,23 @@ def test_validate_data_designer_config( level=ViolationLevel.ERROR, ) ] + mock_validate_schema_transform_processor.return_value = [ + Violation( + column="text", + type=ViolationType.INVALID_REFERENCE, + message="Ancillary dataset processor attempts to reference columns 'invalid_reference' in the template for 'text', but the columns are not defined in the dataset.", + level=ViolationLevel.ERROR, + ) + ] + violations = validate_data_designer_config(COLUMNS, PROCESSOR_CONFIGS, ALLOWED_REFERENCE) - assert len(violations) == 5 + assert len(violations) == 6 mock_validate_columns_not_all_dropped.assert_called_once() mock_validate_expression_references.assert_called_once() mock_validate_code_validation.assert_called_once() mock_validate_prompt_templates.assert_called_once() mock_validate_drop_columns_processor.assert_called_once() + mock_validate_schema_transform_processor.assert_called_once() def test_validate_prompt_templates(): @@ -248,6 +270,18 @@ def test_validate_expression_references(): assert violations[0].type == ViolationType.EXPRESSION_REFERENCE_MISSING +def test_validate_schema_transform_processor(): + violations = validate_schema_transform_processor(COLUMNS, PROCESSOR_CONFIGS) + assert len(violations) == 1 + assert violations[0].type == ViolationType.INVALID_REFERENCE + assert violations[0].column is None + assert ( + violations[0].message + == "Ancillary dataset processor attempts to reference columns 'invalid_reference' in the template for 'text', but the columns are not defined in the dataset." + ) + assert violations[0].level == ViolationLevel.ERROR + + @patch("data_designer.config.utils.validation.Console.print") def test_rich_print_violations(mock_console_print): rich_print_violations([]) diff --git a/tests/engine/dataset_builders/test_column_wise_builder.py b/tests/engine/dataset_builders/test_column_wise_builder.py index 5572af44..4e62c478 100644 --- a/tests/engine/dataset_builders/test_column_wise_builder.py +++ b/tests/engine/dataset_builders/test_column_wise_builder.py @@ -28,7 +28,11 @@ def stub_test_column_configs(): @pytest.fixture def stub_test_processor_configs(): - return [DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH, column_names=["column_to_drop"])] + return [ + DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["column_to_drop"] + ) + ] @pytest.fixture diff --git a/tests/engine/processing/processors/test_drop_columns.py b/tests/engine/processing/processors/test_drop_columns.py index d1fda06c..c571ec96 100644 --- a/tests/engine/processing/processors/test_drop_columns.py +++ b/tests/engine/processing/processors/test_drop_columns.py @@ -14,7 +14,9 @@ @pytest.fixture def stub_processor_config(): - return DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"]) + return DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] + ) @pytest.fixture @@ -38,7 +40,7 @@ def stub_empty_dataframe(): def test_metadata(): metadata = DropColumnsProcessor.metadata() - assert metadata.name == "drop_columns" + assert metadata.name == "drop_columns_processor" assert metadata.description == "Drop columns from the input dataset." assert metadata.required_resources is None diff --git a/tests/engine/processing/processors/test_schema_transform.py b/tests/engine/processing/processors/test_schema_transform.py new file mode 100644 index 00000000..4649e875 --- /dev/null +++ b/tests/engine/processing/processors/test_schema_transform.py @@ -0,0 +1,137 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import json +from unittest.mock import Mock + +import pandas as pd +import pytest + +from data_designer.config.dataset_builders import BuildStage +from data_designer.config.processors import SchemaTransformProcessorConfig +from data_designer.engine.dataset_builders.artifact_storage import BatchStage +from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor +from data_designer.engine.resources.resource_provider import ResourceProvider + + +@pytest.fixture +def stub_processor_config() -> SchemaTransformProcessorConfig: + return SchemaTransformProcessorConfig( + build_stage=BuildStage.POST_BATCH, + template={"text": "{{ col1 }}", "value": "{{ col2 }}"}, + name="test_schema_transform", + ) + + +@pytest.fixture +def stub_processor( + stub_processor_config: SchemaTransformProcessorConfig, stub_resource_provider: ResourceProvider +) -> SchemaTransformProcessor: + stub_resource_provider.artifact_storage = Mock() + stub_resource_provider.artifact_storage.write_batch_to_parquet_file = Mock() + + processor = SchemaTransformProcessor( + config=stub_processor_config, + resource_provider=stub_resource_provider, + ) + return processor + + +@pytest.fixture +def stub_simple_dataframe() -> pd.DataFrame: + return pd.DataFrame( + { + "col1": ["hello", "world", "test", "data"], + "col2": [1, 2, 3, 4], + } + ) + + +def test_metadata() -> None: + metadata = SchemaTransformProcessor.metadata() + + assert metadata.name == "schema_transform_processor" + assert metadata.description == "Generate dataset with transformed schema using a Jinja2 template." + assert metadata.required_resources is None + + +def test_process_returns_original_dataframe( + stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame +) -> None: + result = stub_processor.process(stub_sample_dataframe, current_batch_number=0) + pd.testing.assert_frame_equal(result, stub_sample_dataframe) + + +def test_process_writes_formatted_output_to_parquet( + stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame +) -> None: + # Process the dataframe + result = stub_processor.process(stub_sample_dataframe, current_batch_number=0) + + # Verify the original dataframe is returned + pd.testing.assert_frame_equal(result, stub_sample_dataframe) + + # Verify write_batch_to_parquet_file was called with correct parameters + stub_processor.artifact_storage.write_batch_to_parquet_file.assert_called_once() + call_args = stub_processor.artifact_storage.write_batch_to_parquet_file.call_args + + assert call_args.kwargs["batch_number"] == 0 + assert call_args.kwargs["batch_stage"] == BatchStage.PROCESSORS_OUTPUTS + assert call_args.kwargs["subfolder"] == "test_schema_transform" + + # Verify the formatted dataframe has the correct structure + written_dataframe: pd.DataFrame = call_args.kwargs["dataframe"] + + assert written_dataframe is not None + assert len(written_dataframe) == 4 + assert len(written_dataframe.columns) == 2 + assert list(written_dataframe.columns) == ["text", "value"] + + # Verify the formatted content + expected_formatted_output = [ + f'{{"text": "{stub_sample_dataframe.iloc[i]["col1"]}", "value": "{stub_sample_dataframe.iloc[i]["col2"]}"}}' + for i in range(len(stub_sample_dataframe)) + ] + + for i, expected in enumerate(expected_formatted_output): + actual = json.dumps(written_dataframe.iloc[i].to_dict()) + # Parse both as JSON to compare structure (ignoring whitespace differences) + assert json.loads(actual) == json.loads(expected), f"Row {i} mismatch: {actual} != {expected}" + + +def test_process_without_batch_number_does_not_write( + stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame +) -> None: + # Process without batch number (preview mode) + result = stub_processor.process(stub_sample_dataframe, current_batch_number=None) + + # Verify the original dataframe is returned + pd.testing.assert_frame_equal(result, stub_sample_dataframe) + + # Verify write_batch_to_parquet_file was NOT called + stub_processor.artifact_storage.write_batch_to_parquet_file.assert_not_called() + + +def test_process_with_json_serialized_values(stub_processor: SchemaTransformProcessor) -> None: + # Test with JSON-serialized values in dataframe + df_with_json = pd.DataFrame( + { + "col1": ["hello", "world"], + "col2": ['{"nested": "value1"}', '{"nested": "value2"}'], + } + ) + + # Process the dataframe + stub_processor.process(df_with_json, current_batch_number=0) + written_dataframe: pd.DataFrame = stub_processor.artifact_storage.write_batch_to_parquet_file.call_args.kwargs[ + "dataframe" + ] + + # Verify the formatted dataframe was written + assert written_dataframe is not None + assert len(written_dataframe) == 2 + + # Verify that nested JSON values are properly deserialized in template rendering + first_output = written_dataframe.iloc[0].to_dict() + assert first_output["text"] == "hello" + assert first_output["value"] == "{'nested': 'value1'}" diff --git a/tests/engine/test_configurable_task.py b/tests/engine/test_configurable_task.py index 1210b448..b3306a13 100644 --- a/tests/engine/test_configurable_task.py +++ b/tests/engine/test_configurable_task.py @@ -68,6 +68,7 @@ def _initialize(self) -> None: mock_artifact_storage.final_dataset_folder_name = "final_dataset" mock_artifact_storage.partial_results_folder_name = "partial_results" mock_artifact_storage.dropped_columns_folder_name = "dropped_columns" + mock_artifact_storage.processors_outputs_folder_name = "processors_outputs" resource_provider = ResourceProvider(artifact_storage=mock_artifact_storage) task = TestTask(config=config, resource_provider=resource_provider) @@ -99,6 +100,7 @@ def _validate(self) -> None: mock_artifact_storage.final_dataset_folder_name = "final_dataset" mock_artifact_storage.partial_results_folder_name = "partial_results" mock_artifact_storage.dropped_columns_folder_name = "dropped_columns" + mock_artifact_storage.processors_outputs_folder_name = "processors_outputs" resource_provider = ResourceProvider(artifact_storage=mock_artifact_storage) task = TestTask(config=config, resource_provider=resource_provider) @@ -137,6 +139,7 @@ def _initialize(self) -> None: mock_artifact_storage.final_dataset_folder_name = "final_dataset" mock_artifact_storage.partial_results_folder_name = "partial_results" mock_artifact_storage.dropped_columns_folder_name = "dropped_columns" + mock_artifact_storage.processors_outputs_folder_name = "processors_outputs" mock_model_registry = Mock(spec=ModelRegistry) resource_provider = ResourceProvider(artifact_storage=mock_artifact_storage, model_registry=mock_model_registry) task = TestTask(config=config, resource_provider=resource_provider) diff --git a/tests/interface/test_data_designer.py b/tests/interface/test_data_designer.py index 42274ff5..c8e94000 100644 --- a/tests/interface/test_data_designer.py +++ b/tests/interface/test_data_designer.py @@ -443,7 +443,9 @@ def test_preview_with_dropped_columns( ) config_builder.add_processor( - DropColumnsProcessorConfig(build_stage=BuildStage.POST_BATCH, column_names=["category"]) + DropColumnsProcessorConfig( + name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["category"] + ) ) data_designer = DataDesigner(