22# SPDX-License-Identifier: Apache-2.0
33
44import json
5- from pathlib import Path
6- from unittest .mock import Mock , patch
5+ from unittest .mock import Mock
76
87import pandas as pd
98import pytest
109
1110from data_designer .config .dataset_builders import BuildStage
12- from data_designer .config .processors import JsonlExportProcessorConfig
13- from data_designer .engine .processing .processors .jsonl_export import JsonlExportProcessor
11+ from data_designer .config .processors import OutputFormatProcessorConfig
12+ from data_designer .engine .dataset_builders .artifact_storage import BatchStage
13+ from data_designer .engine .processing .processors .output_format import OutputFormatProcessor
1414
1515
1616@pytest .fixture
17- def stub_processor_config () -> JsonlExportProcessorConfig :
18- return JsonlExportProcessorConfig (
17+ def stub_processor_config () -> OutputFormatProcessorConfig :
18+ return OutputFormatProcessorConfig (
1919 build_stage = BuildStage .POST_BATCH ,
2020 template = '{"text": "{{ col1 }}", "value": "{{ col2 }}"}' ,
21- fraction_per_file = { "train.jsonl" : 0.75 , "validation.jsonl" : 0.25 } ,
21+ name = "test_output_format" ,
2222 )
2323
2424
2525@pytest .fixture
26- def stub_processor (stub_processor_config : JsonlExportProcessorConfig , tmp_path : Path ) -> JsonlExportProcessor :
26+ def stub_processor (stub_processor_config : OutputFormatProcessorConfig ) -> OutputFormatProcessor :
2727 mock_resource_provider = Mock ()
2828 mock_artifact_storage = Mock ()
29- mock_artifact_storage .move_processor_output = Mock ()
29+ mock_artifact_storage .write_batch_to_parquet_file = Mock ()
3030 mock_resource_provider .artifact_storage = mock_artifact_storage
3131
32- processor = JsonlExportProcessor (
32+ processor = OutputFormatProcessor (
3333 config = stub_processor_config ,
3434 resource_provider = mock_resource_provider ,
3535 )
@@ -47,80 +47,103 @@ def stub_simple_dataframe() -> pd.DataFrame:
4747
4848
4949def test_metadata () -> None :
50- metadata = JsonlExportProcessor .metadata ()
50+ metadata = OutputFormatProcessor .metadata ()
5151
52- assert metadata .name == "jsonl_export "
53- assert metadata .description == "Save formatted dataset as JSONL files ."
52+ assert metadata .name == "output_format "
53+ assert metadata .description == "Format the dataset using a Jinja2 template ."
5454 assert metadata .required_resources is None
5555
5656
57- def test_template_as_string (stub_processor : JsonlExportProcessor ) -> None :
58- template_str = stub_processor .config .template
59- assert isinstance (template_str , str )
60- assert template_str == '{"text": "{{ col1 }}", "value": "{{ col2 }}"}'
57+ def test_process_returns_original_dataframe (
58+ stub_processor : OutputFormatProcessor , stub_simple_dataframe : pd .DataFrame
59+ ) -> None :
60+ result = stub_processor .process (stub_simple_dataframe , current_batch_number = 0 )
61+ pd .testing .assert_frame_equal (result , stub_simple_dataframe )
62+
6163
64+ def test_process_writes_formatted_output_to_parquet (
65+ stub_processor : OutputFormatProcessor , stub_simple_dataframe : pd .DataFrame
66+ ) -> None :
67+ # Capture the formatted dataframe that is written to parquet
68+ written_dataframe : pd .DataFrame | None = None
6269
63- def test_get_stop_index_per_file ( stub_processor : JsonlExportProcessor ) -> None :
64- stub_processor . config . fraction_per_file = { "train.jsonl" : 0.8 , "val.jsonl" : 0.15 , "test.jsonl" : 0.05 }
65- result = stub_processor . _get_stop_index_per_file ( 100 )
70+ def capture_dataframe ( batch_number : int , dataframe : pd . DataFrame , batch_stage : BatchStage , subfolder : str ) -> None :
71+ nonlocal written_dataframe
72+ written_dataframe = dataframe
6673
67- assert result == { "train.jsonl" : 80 , "val.jsonl" : 95 , "test.jsonl" : 100 }
74+ stub_processor . artifact_storage . write_batch_to_parquet_file . side_effect = capture_dataframe
6875
76+ # Process the dataframe
77+ result = stub_processor .process (stub_simple_dataframe , current_batch_number = 0 )
6978
70- def test_process_returns_original_dataframe (
71- stub_processor : JsonlExportProcessor , stub_simple_dataframe : pd .DataFrame
72- ) -> None :
73- result = stub_processor .process (stub_simple_dataframe )
79+ # Verify the original dataframe is returned
7480 pd .testing .assert_frame_equal (result , stub_simple_dataframe )
7581
82+ # Verify write_batch_to_parquet_file was called with correct parameters
83+ stub_processor .artifact_storage .write_batch_to_parquet_file .assert_called_once ()
84+ call_args = stub_processor .artifact_storage .write_batch_to_parquet_file .call_args
85+
86+ assert call_args .kwargs ["batch_number" ] == 0
87+ assert call_args .kwargs ["batch_stage" ] == BatchStage .PROCESSORS_OUTPUTS
88+ assert call_args .kwargs ["subfolder" ] == "test_output_format"
89+
90+ # Verify the formatted dataframe has the correct structure
91+ assert written_dataframe is not None
92+ assert list (written_dataframe .columns ) == ["formatted_output" ]
93+ assert len (written_dataframe ) == 4
94+
95+ # Verify the formatted content
96+ expected_formatted_output = [
97+ '{"text": "hello", "value": "1"}' ,
98+ '{"text": "world", "value": "2"}' ,
99+ '{"text": "test", "value": "3"}' ,
100+ '{"text": "data", "value": "4"}' ,
101+ ]
76102
77- def test_process_writes_correct_content_to_files (
78- stub_processor : JsonlExportProcessor , stub_simple_dataframe : pd .DataFrame
79- ) -> None :
80- stub_processor .config .fraction_per_file = {"train.jsonl" : 0.75 , "validation.jsonl" : 0.25 }
81-
82- # Capture the content of the files that are written to the outputs folder
83- file_contents : dict [str , str ] = {}
103+ for i , expected in enumerate (expected_formatted_output ):
104+ actual = written_dataframe .iloc [i ]["formatted_output" ]
105+ # Parse both as JSON to compare structure (ignoring whitespace differences)
106+ assert json .loads (actual ) == json .loads (expected ), f"Row { i } mismatch: { actual } != { expected } "
84107
85- def capture_file_content (from_path : Path , folder_name : str ) -> None :
86- with open (from_path , "r" ) as f :
87- file_contents [from_path .name ] = f .read ()
88108
89- stub_processor .artifact_storage .move_processor_output .side_effect = capture_file_content
109+ def test_process_without_batch_number_does_not_write (
110+ stub_processor : OutputFormatProcessor , stub_simple_dataframe : pd .DataFrame
111+ ) -> None :
112+ # Process without batch number (preview mode)
113+ result = stub_processor .process (stub_simple_dataframe , current_batch_number = None )
90114
91- # Process the dataframe and write the files to the outputs folder
92- with patch ("data_designer.engine.processing.processors.jsonl_export.logger" ):
93- stub_processor .process (stub_simple_dataframe )
115+ # Verify the original dataframe is returned
116+ pd .testing .assert_frame_equal (result , stub_simple_dataframe )
94117
95- # Check that the files were moved with the correct names
96- assert stub_processor .artifact_storage .move_processor_output . call_count == 2
118+ # Verify write_batch_to_parquet_file was NOT called
119+ stub_processor .artifact_storage .write_batch_to_parquet_file . assert_not_called ()
97120
98- assert "train.jsonl" in file_contents
99- assert "validation.jsonl" in file_contents
100121
101- # Check that the files contain the correct content
102- train_lines = file_contents ["train.jsonl" ].strip ().split ("\n " ) if file_contents ["train.jsonl" ].strip () else []
103- validation_lines = (
104- file_contents ["validation.jsonl" ].strip ().split ("\n " ) if file_contents ["validation.jsonl" ].strip () else []
122+ def test_process_with_json_serialized_values (stub_processor : OutputFormatProcessor ) -> None :
123+ # Test with JSON-serialized values in dataframe
124+ df_with_json = pd .DataFrame (
125+ {
126+ "col1" : ["hello" , "world" ],
127+ "col2" : ['{"nested": "value1"}' , '{"nested": "value2"}' ],
128+ }
105129 )
106130
107- assert len (train_lines ) == 3 , f"Expected 3 lines in train.jsonl, got { len (train_lines )} "
108- assert len (validation_lines ) == 1 , f"Expected 1 line in validation.jsonl, got { len (validation_lines )} "
131+ written_dataframe : pd .DataFrame | None = None
109132
110- expected_train_data = [
111- { "text" : "hello" , "value" : "1" },
112- { "text" : "world" , "value" : "2" },
113- { "text" : "test" , "value" : "3" },
114- ]
133+ def capture_dataframe ( batch_number : int , dataframe : pd . DataFrame , batch_stage : BatchStage , subfolder : str ) -> None :
134+ nonlocal written_dataframe
135+ written_dataframe = dataframe
136+
137+ stub_processor . artifact_storage . write_batch_to_parquet_file . side_effect = capture_dataframe
115138
116- for i , line in enumerate (train_lines ):
117- parsed = json .loads (line )
118- assert parsed == expected_train_data [i ], f"Train line { i } mismatch: { parsed } != { expected_train_data [i ]} "
139+ # Process the dataframe
140+ stub_processor .process (df_with_json , current_batch_number = 0 )
119141
120- expected_validation_data = [{"text" : "data" , "value" : "4" }]
142+ # Verify the formatted dataframe was written
143+ assert written_dataframe is not None
144+ assert len (written_dataframe ) == 2
121145
122- for i , line in enumerate (validation_lines ):
123- parsed = json .loads (line )
124- assert parsed == expected_validation_data [i ], (
125- f"Validation line { i } mismatch: { parsed } != { expected_validation_data [i ]} "
126- )
146+ # Verify that nested JSON values are properly deserialized in template rendering
147+ first_output = json .loads (written_dataframe .iloc [0 ]["formatted_output" ])
148+ assert first_output ["text" ] == "hello"
149+ assert first_output ["value" ] == "{'nested': 'value1'}"
0 commit comments