Skip to content

Commit 82e8641

Browse files
committed
advancing in the discussed direction
1 parent a5b8897 commit 82e8641

File tree

10 files changed

+84
-117
lines changed

10 files changed

+84
-117
lines changed

examples/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
artifacts
2+
processor_outputs

examples/example.py

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,20 @@
11
import json
22

33
from data_designer.essentials import (
4-
BuildStage,
54
CategorySamplerParams,
65
DataDesigner,
76
DataDesignerConfigBuilder,
8-
InferenceParameters,
9-
JsonlExportProcessorConfig,
107
LLMTextColumnConfig,
11-
ModelConfig,
8+
OutputFormatProcessorConfig,
129
PersonSamplerParams,
13-
ProcessorType,
1410
SamplerColumnConfig,
1511
Score,
1612
SubcategorySamplerParams,
1713
)
1814

1915
# define model aliases
20-
model_alias_generator = "content_generator"
21-
model_configs = [
22-
ModelConfig(
23-
alias=model_alias_generator,
24-
provider="nvidia",
25-
model="deepseek-ai/deepseek-r1-distill-qwen-14b",
26-
inference_parameters=InferenceParameters(
27-
max_tokens=8000,
28-
temperature=0.7,
29-
top_p=0.95,
30-
),
31-
)
32-
]
33-
34-
config_builder = DataDesignerConfigBuilder(model_configs=model_configs)
16+
model_alias_generator = "nvidia-text"
17+
config_builder = DataDesignerConfigBuilder()
3518

3619
# ESI levels
3720
ESI_LEVELS = [
@@ -198,14 +181,9 @@
198181

199182
template_as_str = json.dumps(jsonl_entry_template)
200183
config_builder.add_processor(
201-
JsonlExportProcessorConfig(
202-
processor_type=ProcessorType.JSONL_EXPORT,
203-
build_stage=BuildStage.POST_BATCH,
184+
OutputFormatProcessorConfig(
185+
name="jsonl_output",
204186
template=template_as_str,
205-
fraction_per_file={
206-
"train.jsonl": 0.8,
207-
"validation.jsonl": 0.2,
208-
},
209187
)
210188
)
211189

@@ -214,4 +192,5 @@
214192
)
215193
preview = dd.preview(config_builder, num_records=10)
216194

217-
dd.create(config_builder, num_records=20)
195+
results = dd.create(config_builder, num_records=20)
196+
results.write_processor_outputs_to_disk("./processor_outputs", "jsonl")

src/data_designer/config/processors.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
class ProcessorType(str, Enum):
1717
DROP_COLUMNS = "drop_columns"
18-
JSONL_EXPORT = "jsonl_export"
18+
OUTPUT_FORMAT = "output_format"
1919

2020

2121
class ProcessorConfig(ConfigBase, ABC):
@@ -39,25 +39,15 @@ def validate_build_stage(cls, v: BuildStage) -> BuildStage:
3939
def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs) -> ProcessorConfig:
4040
if processor_type == ProcessorType.DROP_COLUMNS:
4141
return DropColumnsProcessorConfig(**kwargs)
42-
elif processor_type == ProcessorType.JSONL_EXPORT:
43-
return JsonlExportProcessorConfig(**kwargs)
42+
elif processor_type == ProcessorType.OUTPUT_FORMAT:
43+
return OutputFormatProcessorConfig(**kwargs)
4444

4545

4646
class DropColumnsProcessorConfig(ProcessorConfig):
4747
column_names: list[str]
4848
processor_type: Literal[ProcessorType.DROP_COLUMNS] = ProcessorType.DROP_COLUMNS
4949

5050

51-
class JsonlExportProcessorConfig(ProcessorConfig):
51+
class OutputFormatProcessorConfig(ProcessorConfig):
5252
template: str = Field(..., description="The template to use for each entry in the dataset, as a single string.")
53-
fraction_per_file: dict[str, float] = Field(
54-
default={"train.jsonl": 0.8, "validation.jsonl": 0.2},
55-
description="Fraction of the dataset to save in each file. The keys are the filenames and the values are the fractions.",
56-
)
57-
processor_type: Literal[ProcessorType.JSONL_EXPORT] = ProcessorType.JSONL_EXPORT
58-
59-
@field_validator("fraction_per_file")
60-
def validate_fraction_per_file(cls, v: dict[str, float]) -> dict[str, float]:
61-
if sum(v.values()) != 1:
62-
raise ValueError("The fractions must sum to 1.")
63-
return v
53+
processor_type: Literal[ProcessorType.OUTPUT_FORMAT] = ProcessorType.OUTPUT_FORMAT

src/data_designer/engine/dataset_builders/artifact_storage.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class BatchStage(StrEnum):
2323
PARTIAL_RESULT = "partial_results_path"
2424
FINAL_RESULT = "final_dataset_path"
2525
DROPPED_COLUMNS = "dropped_columns_dataset_path"
26+
PROCESSORS_OUTPUTS = "processors_outputs_path"
2627

2728

2829
class ArtifactStorage(BaseModel):
@@ -75,6 +76,7 @@ def validate_folder_names(self):
7576
self.final_dataset_folder_name,
7677
self.partial_results_folder_name,
7778
self.dropped_columns_folder_name,
79+
self.processors_outputs_folder_name,
7880
]
7981

8082
for name in folder_names:
@@ -160,19 +162,21 @@ def write_batch_to_parquet_file(
160162
batch_number: int,
161163
dataframe: pd.DataFrame,
162164
batch_stage: BatchStage,
165+
subfolder: str = "",
163166
) -> Path:
164167
file_path = self.create_batch_file_path(batch_number, batch_stage=batch_stage)
165-
self.write_parquet_file(file_path.name, dataframe, batch_stage)
168+
self.write_parquet_file(file_path.name, dataframe, batch_stage, subfolder=subfolder)
166169
return file_path
167170

168171
def write_parquet_file(
169172
self,
170173
parquet_file_name: str,
171174
dataframe: pd.DataFrame,
172175
batch_stage: BatchStage,
176+
subfolder: str = "",
173177
) -> Path:
174-
self.mkdir_if_needed(self._get_stage_path(batch_stage))
175-
file_path = self._get_stage_path(batch_stage) / parquet_file_name
178+
self.mkdir_if_needed(self._get_stage_path(batch_stage) / subfolder)
179+
file_path = self._get_stage_path(batch_stage) / subfolder / parquet_file_name
176180
dataframe.to_parquet(file_path, index=False)
177181
return file_path
178182

@@ -182,10 +186,5 @@ def write_metadata(self, metadata: dict) -> Path:
182186
json.dump(metadata, file)
183187
return self.metadata_file_path
184188

185-
def move_processor_output(self, from_path: Path, folder_name: str) -> Path:
186-
self.mkdir_if_needed(self.processors_outputs_path / folder_name)
187-
shutil.move(from_path, self.processors_outputs_path / folder_name / from_path.name)
188-
return self.processors_outputs_path / folder_name / from_path.name
189-
190189
def _get_stage_path(self, stage: BatchStage) -> Path:
191190
return getattr(self, resolve_string_enum(stage, BatchStage).value)

src/data_designer/engine/processing/processors/jsonl_export.py

Lines changed: 0 additions & 62 deletions
This file was deleted.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import logging
5+
6+
import pandas as pd
7+
8+
from data_designer.config.processors import OutputFormatProcessorConfig
9+
from data_designer.engine.configurable_task import ConfigurableTaskMetadata
10+
from data_designer.engine.dataset_builders.artifact_storage import BatchStage
11+
from data_designer.engine.processing.ginja.environment import WithJinja2UserTemplateRendering
12+
from data_designer.engine.processing.processors.base import Processor
13+
from data_designer.engine.processing.utils import deserialize_json_values
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class OutputFormatProcessor(WithJinja2UserTemplateRendering, Processor[OutputFormatProcessorConfig]):
19+
@staticmethod
20+
def metadata() -> ConfigurableTaskMetadata:
21+
return ConfigurableTaskMetadata(
22+
name="output_format",
23+
description="Format the dataset using a Jinja2 template.",
24+
required_resources=None,
25+
)
26+
27+
def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None) -> pd.DataFrame:
28+
self.prepare_jinja2_template_renderer(self.config.template, data.columns.to_list())
29+
formatted_records = [self.render_template(deserialize_json_values(record)) for record in data.to_dict(orient="records")]
30+
formatted_data = pd.DataFrame(formatted_records, columns=["formatted_output"])
31+
if current_batch_number is not None:
32+
self.artifact_storage.write_batch_to_parquet_file(
33+
batch_number=current_batch_number,
34+
dataframe=formatted_data,
35+
batch_stage=BatchStage.PROCESSORS_OUTPUTS,
36+
subfolder=self.config.name,
37+
)
38+
else:
39+
logger.warning("⚠️ Cannot write processor outputs to disk in preview mode.")
40+
41+
return data

src/data_designer/engine/processing/processors/registry.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
from data_designer.config.base import ConfigBase
55
from data_designer.config.processors import (
66
DropColumnsProcessorConfig,
7-
JsonlExportProcessorConfig,
7+
OutputFormatProcessorConfig,
88
ProcessorType,
99
)
1010
from data_designer.engine.processing.processors.base import Processor
1111
from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor
12-
from data_designer.engine.processing.processors.jsonl_export import JsonlExportProcessor
12+
from data_designer.engine.processing.processors.output_format import OutputFormatProcessor
1313
from data_designer.engine.registry.base import TaskRegistry
1414

1515

@@ -19,5 +19,5 @@ class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ...
1919
def create_default_processor_registry() -> ProcessorRegistry:
2020
registry = ProcessorRegistry()
2121
registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False)
22-
registry.register(ProcessorType.JSONL_EXPORT, JsonlExportProcessor, JsonlExportProcessorConfig, False)
22+
registry.register(ProcessorType.OUTPUT_FORMAT, OutputFormatProcessor, OutputFormatProcessorConfig, False)
2323
return registry

src/data_designer/essentials/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
UniformDistribution,
3535
UniformDistributionParams,
3636
)
37-
from ..config.processors import DropColumnsProcessorConfig, JsonlExportProcessorConfig, ProcessorType
37+
from ..config.processors import DropColumnsProcessorConfig, OutputFormatProcessorConfig, ProcessorType
3838
from ..config.sampler_constraints import ColumnInequalityConstraint, ScalarInequalityConstraint
3939
from ..config.sampler_params import (
4040
BernoulliMixtureSamplerParams,

src/data_designer/interface/results.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
from __future__ import annotations
5+
from pathlib import Path
6+
from typing import Literal
57

68
import pandas as pd
79

@@ -53,3 +55,20 @@ def load_dataset(self) -> pd.DataFrame:
5355
A pandas DataFrame containing the full generated dataset.
5456
"""
5557
return self.artifact_storage.load_dataset()
58+
59+
def write_processor_outputs_to_disk(self, output_folder: Path | str, extension: Literal["jsonl", "csv"]) -> None:
60+
"""Write the processor outputs to disk.
61+
62+
Returns:
63+
None
64+
"""
65+
output_folder = Path(output_folder)
66+
output_folder.mkdir(parents=True, exist_ok=True)
67+
for subfolder in self.artifact_storage.processors_outputs_path.iterdir():
68+
output_file_path = output_folder / f"{subfolder.name}.{extension}"
69+
with open(output_file_path, "w") as f:
70+
for file_path in subfolder.glob("*.parquet"):
71+
# TODO: faster way to convert than reading and writing row by row?
72+
dataframe = pd.read_parquet(file_path)
73+
for _, row in dataframe.iterrows():
74+
f.write(row["formatted_output"].replace("\n", "\\n") + "\n")

tests/engine/processing/processors/test_jsonl_export.py renamed to tests/engine/processing/processors/test_output_format.py

File renamed without changes.

0 commit comments

Comments
 (0)