Skip to content

Commit 457a583

Browse files
committed
moving to different UX
1 parent c891b7f commit 457a583

File tree

11 files changed

+61
-91
lines changed

11 files changed

+61
-91
lines changed

examples/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
artifacts
22
processor_outputs
3+
*.jsonl
4+
*.csv

examples/example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
DataDesigner,
66
DataDesignerConfigBuilder,
77
LLMTextColumnConfig,
8-
OutputFormatProcessorConfig,
8+
AncillaryDatasetProcessorConfig,
99
PersonSamplerParams,
1010
SamplerColumnConfig,
1111
Score,
@@ -180,7 +180,7 @@
180180
}
181181

182182
config_builder.add_processor(
183-
AncillaryDatasetProcessor(
183+
AncillaryDatasetProcessorConfig(
184184
name="jsonl_output",
185185
template=jsonl_entry_template,
186186
)
@@ -194,5 +194,7 @@
194194
preview.display_sample_record()
195195

196196
results = dd.create(config_builder, num_records=20)
197-
jsonl_output = results.load_processor_artifact("jsonl_output")
198-
pd.read_parquet(jsonl_output.path_to_parquet_files).to_jsonl(desired_path, lines=True)
197+
path_to_processor_artifacts = results.get_path_to_processor_artifacts("jsonl_output")
198+
199+
import pandas as pd
200+
pd.read_parquet(path_to_processor_artifacts).to_json("./output.jsonl", orient="records", lines=True)

src/data_designer/config/processors.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from abc import ABC
55
from enum import Enum
6-
from typing import Literal
6+
from typing import Any, Literal
77

88
from pydantic import Field, field_validator
99

@@ -15,7 +15,7 @@
1515

1616
class ProcessorType(str, Enum):
1717
DROP_COLUMNS = "drop_columns"
18-
OUTPUT_FORMAT = "output_format"
18+
ANCILLARY_DATASET = "ancillary_dataset"
1919

2020

2121
class ProcessorConfig(ConfigBase, ABC):
@@ -39,18 +39,17 @@ def validate_build_stage(cls, v: BuildStage) -> BuildStage:
3939
def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs) -> ProcessorConfig:
4040
if processor_type == ProcessorType.DROP_COLUMNS:
4141
return DropColumnsProcessorConfig(**kwargs)
42-
elif processor_type == ProcessorType.OUTPUT_FORMAT:
43-
return OutputFormatProcessorConfig(**kwargs)
42+
elif processor_type == ProcessorType.ANCILLARY_DATASET:
43+
return AncillaryDatasetProcessorConfig(**kwargs)
4444

4545

4646
class DropColumnsProcessorConfig(ProcessorConfig):
4747
column_names: list[str]
4848
processor_type: Literal[ProcessorType.DROP_COLUMNS] = ProcessorType.DROP_COLUMNS
4949

5050

51-
class OutputFormatProcessorConfig(ProcessorConfig):
52-
template: str = Field(
53-
..., description="The Jinja template to use for each entry in the dataset, as a single string."
51+
class AncillaryDatasetProcessorConfig(ProcessorConfig):
52+
template: dict[str, Any] = Field(
53+
..., description="Jinja2 template to use for each column of the ancillary dataset. Keys are the column names, values are the Jinja2 templates."
5454
)
55-
extension: str = Field(default="jsonl", description="The extension of the output files, e.g. 'jsonl' or 'csv'.")
56-
processor_type: Literal[ProcessorType.OUTPUT_FORMAT] = ProcessorType.OUTPUT_FORMAT
55+
processor_type: Literal[ProcessorType.ANCILLARY_DATASET] = ProcessorType.ANCILLARY_DATASET

src/data_designer/engine/processing/processors/output_format.py renamed to src/data_designer/engine/processing/processors/ancillary_dataset.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import logging
5-
from pathlib import Path
5+
import json
6+
from typing import Any
67

78
import pandas as pd
89

9-
from data_designer.config.processors import OutputFormatProcessorConfig
10+
from data_designer.config.processors import AncillaryDatasetProcessorConfig
1011
from data_designer.engine.configurable_task import ConfigurableTaskMetadata
1112
from data_designer.engine.dataset_builders.artifact_storage import BatchStage
1213
from data_designer.engine.processing.ginja.environment import WithJinja2UserTemplateRendering
@@ -16,17 +17,21 @@
1617
logger = logging.getLogger(__name__)
1718

1819

19-
class OutputFormatProcessor(WithJinja2UserTemplateRendering, Processor[OutputFormatProcessorConfig]):
20+
class AncillaryDatasetProcessor(WithJinja2UserTemplateRendering, Processor[AncillaryDatasetProcessorConfig]):
2021
@staticmethod
2122
def metadata() -> ConfigurableTaskMetadata:
2223
return ConfigurableTaskMetadata(
23-
name="output_format",
24-
description="Format the dataset using a Jinja2 template.",
24+
name="ancillary_dataset",
25+
description="Generate an ancillary dataset using a Jinja2 template.",
2526
required_resources=None,
2627
)
2728

29+
@property
30+
def template_as_str(self) -> str:
31+
return json.dumps(self.config.template)
32+
2833
def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None) -> pd.DataFrame:
29-
self.prepare_jinja2_template_renderer(self.config.template, data.columns.to_list())
34+
self.prepare_jinja2_template_renderer(self.template_as_str, data.columns.to_list())
3035
formatted_records = [
3136
self.render_template(deserialize_json_values(record)).replace("\n", "\\n")
3237
for record in data.to_dict(orient="records")
@@ -43,16 +48,4 @@ def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None
4348
# Just preview the first record for now
4449
self.artifact_storage.processor_artifact_preview[self.config.name] = formatted_records[0]
4550

46-
return data
47-
48-
@staticmethod
49-
def write_outputs_to_disk(
50-
processor_config: OutputFormatProcessorConfig, artifacts_path: Path, output_path: Path
51-
) -> None:
52-
output_path.mkdir(parents=True, exist_ok=True)
53-
with open(output_path / f"formatted_output.{processor_config.extension}", "w") as f:
54-
for file_path in artifacts_path.glob("*.parquet"):
55-
# TODO: faster way to convert than reading and writing row by row?
56-
dataframe = pd.read_parquet(file_path)
57-
for _, row in dataframe.iterrows():
58-
f.write(row["formatted_output"] + "\n")
51+
return data

src/data_designer/engine/processing/processors/base.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,4 @@ class Processor(ConfigurableTask[TaskConfigT], ABC):
1313
def metadata() -> ConfigurableTaskMetadata: ...
1414

1515
@abstractmethod
16-
def process(self, data: DataT, *, current_batch_number: int | None = None) -> DataT: ...
17-
18-
@staticmethod
19-
@abstractmethod
20-
def write_outputs_to_disk(processor_config: TaskConfigT, artifacts_path: Path, output_path: Path) -> None: ...
16+
def process(self, data: DataT, *, current_batch_number: int | None = None) -> DataT: ...

src/data_designer/engine/processing/processors/drop_columns.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,6 @@ def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None
3434
logger.warning(f"⚠️ Cannot drop column: `{column}` not found in the dataset.")
3535
return data
3636

37-
@staticmethod
38-
def write_outputs_to_disk(
39-
processor_config: DropColumnsProcessorConfig, artifacts_path: Path, output_path: Path
40-
) -> None:
41-
pass
42-
4337
def _save_dropped_columns_if_needed(self, data: pd.DataFrame, current_batch_number: int) -> None:
4438
logger.debug("📦 Saving dropped columns to dropped-columns directory")
4539
dropped_column_parquet_file_name = self.artifact_storage.create_batch_file_path(

src/data_designer/engine/processing/processors/registry.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
from data_designer.config.base import ConfigBase
55
from data_designer.config.processors import (
6+
AncillaryDatasetProcessorConfig,
67
DropColumnsProcessorConfig,
7-
OutputFormatProcessorConfig,
88
ProcessorType,
99
)
1010
from data_designer.engine.processing.processors.base import Processor
11+
from data_designer.engine.processing.processors.ancillary_dataset import AncillaryDatasetProcessor
1112
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
12-
from data_designer.engine.processing.processors.output_format import OutputFormatProcessor
1313
from data_designer.engine.registry.base import TaskRegistry
1414

1515

@@ -18,6 +18,6 @@ class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ...
1818

1919
def create_default_processor_registry() -> ProcessorRegistry:
2020
registry = ProcessorRegistry()
21+
registry.register(ProcessorType.ANCILLARY_DATASET, AncillaryDatasetProcessor, AncillaryDatasetProcessorConfig, False)
2122
registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False)
22-
registry.register(ProcessorType.OUTPUT_FORMAT, OutputFormatProcessor, OutputFormatProcessorConfig, False)
2323
return registry

src/data_designer/essentials/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
UniformDistribution,
3535
UniformDistributionParams,
3636
)
37-
from ..config.processors import DropColumnsProcessorConfig, OutputFormatProcessorConfig, ProcessorType
37+
from ..config.processors import AncillaryDatasetProcessorConfig, DropColumnsProcessorConfig, ProcessorType
3838
from ..config.sampler_constraints import ColumnInequalityConstraint, ScalarInequalityConstraint
3939
from ..config.sampler_params import (
4040
BernoulliMixtureSamplerParams,
@@ -75,6 +75,7 @@
7575
pass
7676

7777
__all__ = [
78+
"AncillaryDatasetProcessorConfig",
7879
"BernoulliMixtureSamplerParams",
7980
"BernoulliSamplerParams",
8081
"BinomialSamplerParams",
@@ -110,7 +111,6 @@
110111
"ModalityContext",
111112
"ModalityDataType",
112113
"ModelConfig",
113-
"OutputFormatProcessorConfig",
114114
"PartitionBlock",
115115
"PersonSamplerParams",
116116
"PersonFromFakerSamplerParams",

src/data_designer/interface/results.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from data_designer.config.config_builder import DataDesignerConfigBuilder
1212
from data_designer.config.utils.visualization import WithRecordSamplerMixin
1313
from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage
14-
from data_designer.engine.processing.processors.registry import ProcessorRegistry
14+
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
1515

1616

1717
class DatasetCreationResults(WithRecordSamplerMixin):
@@ -57,31 +57,15 @@ def load_dataset(self) -> pd.DataFrame:
5757
"""
5858
return self.artifact_storage.load_dataset()
5959

60-
def write_processors_outputs_to_disk(
61-
self,
62-
processors: list[str],
63-
output_folder: Path | str,
64-
) -> None:
65-
"""Write collected artifacts from each processor to disk.
60+
def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
61+
"""Get the path to the artifacts generated by a processor.
6662
6763
Args:
68-
processors (list[str]): List of processor names to collect artifacts from.
69-
output_folder (Path | str): Path to the output folder.
64+
processor_name: The name of the processor to load the artifact from.
7065
7166
Returns:
72-
None
67+
The path to the artifacts.
7368
"""
74-
output_folder = Path(output_folder)
75-
output_folder.mkdir(parents=True, exist_ok=True)
76-
77-
processors = set(processors)
78-
for processor_config in self._config_builder.get_processor_configs():
79-
if processor_config.name not in processors:
80-
continue
81-
82-
ProcessorClass = ProcessorRegistry.get_for_config_type(type(processor_config))
83-
ProcessorClass.write_outputs_to_disk(
84-
processor_config=processor_config,
85-
artifacts_path=self.artifact_storage.processors_outputs_path / processor_config.name,
86-
output_path=output_folder / processor_config.name,
87-
)
69+
if not self.artifact_storage.processors_outputs_path.exists():
70+
raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.")
71+
return self.artifact_storage.processors_outputs_path / processor_name

tests/config/test_processors.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
from data_designer.config.dataset_builders import BuildStage
88
from data_designer.config.processors import (
9+
AncillaryDatasetProcessorConfig,
910
DropColumnsProcessorConfig,
10-
OutputFormatProcessorConfig,
1111
ProcessorConfig,
1212
ProcessorType,
1313
get_processor_config_from_kwargs,
@@ -54,7 +54,7 @@ def test_drop_columns_processor_config_serialization():
5454

5555

5656
def test_output_format_processor_config_creation():
57-
config = OutputFormatProcessorConfig(
57+
config = AncillaryDatasetProcessorConfig(
5858
name="output_format_processor",
5959
build_stage=BuildStage.POST_BATCH,
6060
template='{"text": "{{ col1 }}"}',
@@ -69,19 +69,19 @@ def test_output_format_processor_config_creation():
6969
def test_output_format_processor_config_validation():
7070
# Test unsupported stage raises error
7171
with pytest.raises(ValidationError, match="Invalid dataset builder stage"):
72-
OutputFormatProcessorConfig(
72+
AncillaryDatasetProcessorConfig(
7373
name="output_format_processor",
7474
build_stage=BuildStage.PRE_BATCH,
7575
template='{"text": "{{ col1 }}"}',
7676
)
7777

7878
# Test missing required field raises error
7979
with pytest.raises(ValidationError, match="Field required"):
80-
OutputFormatProcessorConfig(name="output_format_processor", build_stage=BuildStage.POST_BATCH)
80+
AncillaryDatasetProcessorConfig(name="output_format_processor", build_stage=BuildStage.POST_BATCH)
8181

8282

8383
def test_output_format_processor_config_serialization():
84-
config = OutputFormatProcessorConfig(
84+
config = AncillaryDatasetProcessorConfig(
8585
name="output_format_processor",
8686
build_stage=BuildStage.POST_BATCH,
8787
template='{"text": "{{ col1 }}"}',
@@ -93,7 +93,7 @@ def test_output_format_processor_config_serialization():
9393
assert config_dict["template"] == '{"text": "{{ col1 }}"}'
9494

9595
# Deserialize from dict
96-
config_restored = OutputFormatProcessorConfig.model_validate(config_dict)
96+
config_restored = AncillaryDatasetProcessorConfig.model_validate(config_dict)
9797
assert config_restored.build_stage == config.build_stage
9898
assert config_restored.template == config.template
9999

@@ -116,7 +116,7 @@ def test_get_processor_config_from_kwargs():
116116
build_stage=BuildStage.POST_BATCH,
117117
template='{"text": "{{ col1 }}"}',
118118
)
119-
assert isinstance(config_output_format, OutputFormatProcessorConfig)
119+
assert isinstance(config_output_format, AncillaryDatasetProcessorConfig)
120120
assert config_output_format.template == '{"text": "{{ col1 }}"}'
121121
assert config_output_format.processor_type == ProcessorType.OUTPUT_FORMAT
122122

0 commit comments

Comments
 (0)