Skip to content

Commit 29198ed

Browse files
praateekmahajanVibhuJawaCopilotayushdgthomasdhc
authored
Allow jsonl/parquet to rewrite the same file (#1068)
* fc Signed-off-by: Praateek <[email protected]> * sc Signed-off-by: Praateek <[email protected]> * Update tests/stages/text/io/writer/test_jsonl.py Co-authored-by: Vibhu Jawa <[email protected]> Signed-off-by: Praateek Mahajan <[email protected]> * Update tests/stages/text/io/writer/test_jsonl.py Co-authored-by: Copilot <[email protected]> Signed-off-by: Praateek Mahajan <[email protected]> * Update tests/stages/text/io/writer/test_parquet.py Co-authored-by: Copilot <[email protected]> Signed-off-by: Praateek Mahajan <[email protected]> * Update tests/stages/text/io/writer/test_parquet.py Co-authored-by: Copilot <[email protected]> Signed-off-by: Praateek Mahajan <[email protected]> --------- Signed-off-by: Praateek <[email protected]> Signed-off-by: Praateek Mahajan <[email protected]> Co-authored-by: Vibhu Jawa <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: Ayush Dattagupta <[email protected]> Co-authored-by: Dong Hyuk Chang <[email protected]>
1 parent a6764d8 commit 29198ed

File tree

3 files changed

+89
-5
lines changed

3 files changed

+89
-5
lines changed

nemo_curator/stages/text/io/writer/base.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,11 @@ def process(self, task: DocumentBatch) -> FileGroupTask:
9595
file_extension = self.get_file_extension()
9696
file_path = self.fs.sep.join([self._fs_path, f"{filename}.{file_extension}"])
9797

98-
# Skip if file already exists (idempotent writes)
9998
if self.fs.exists(file_path):
100-
logger.debug(f"File {file_path} already exists, skipping")
101-
else:
102-
self.write_data(task, file_path)
103-
logger.debug(f"Written {task.num_items} records to {file_path}")
99+
logger.debug(f"File {file_path} already exists, overwriting it")
100+
101+
self.write_data(task, file_path)
102+
logger.debug(f"Written {task.num_items} records to {file_path}")
104103

105104
# Create FileGroupTask with written files
106105
return FileGroupTask(

tests/stages/text/io/writer/test_jsonl.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import os
16+
import time
1617
import uuid
1718
from unittest import mock
1819

@@ -222,3 +223,46 @@ def test_jsonl_writer_overwrite_mode(self, pandas_document_batch: DocumentBatch,
222223

223224
df = pd.read_json(files[0], lines=True)
224225
pd.testing.assert_frame_equal(df, pandas_document_batch.to_pandas())
226+
227+
@pytest.mark.parametrize("consistent_filename", [True, False])
228+
def test_jsonl_writer_overwrites_existing_file(
229+
self,
230+
pandas_document_batch: DocumentBatch,
231+
consistent_filename: bool,
232+
tmpdir: str,
233+
):
234+
"""Test that JsonlWriter overwrites existing files when writing to the same path."""
235+
236+
# Create writer with specific output directory for this test
237+
output_dir = os.path.join(tmpdir, f"jsonl_{pandas_document_batch.task_id}")
238+
writer = JsonlWriter(path=output_dir)
239+
240+
# Setup
241+
writer.setup()
242+
243+
# Process
244+
if consistent_filename:
245+
source_files = [f"file_{i}.jsonl" for i in range(len(pandas_document_batch.data))]
246+
pandas_document_batch._metadata["source_files"] = source_files
247+
# We write once
248+
result1 = writer.process(pandas_document_batch)
249+
filesize_1, file_modification_time_1 = os.path.getsize(result1.data[0]), os.path.getmtime(result1.data[0])
250+
time.sleep(0.01)
251+
# Then we overwrite it
252+
result2 = writer.process(pandas_document_batch)
253+
filesize_2, file_modification_time_2 = os.path.getsize(result2.data[0]), os.path.getmtime(result2.data[0])
254+
255+
if consistent_filename:
256+
assert result1.data[0] == result2.data[0], "File path should be the same, since it'll be a hash"
257+
else:
258+
assert result1.data[0] != result2.data[0], "File path should be different, since it'll be a uuid"
259+
# When using UUIDs, files are different, so no overwrite occurs
260+
261+
assert filesize_1 == filesize_2, "File size should be the same when written twice"
262+
assert file_modification_time_1 < file_modification_time_2, (
263+
"File modification time should be newer than the first write"
264+
)
265+
266+
pd.testing.assert_frame_equal(
267+
pd.read_json(result1.data[0], lines=True), pd.read_json(result2.data[0], lines=True)
268+
)

tests/stages/text/io/writer/test_parquet.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import os
16+
import time
1617
import uuid
1718
from unittest import mock
1819

@@ -210,3 +211,43 @@ def test_parquet_writer_with_custom_file_extension(self, pandas_document_batch:
210211
assert len(result._stage_perf) >= len(pandas_document_batch._stage_perf)
211212
for original_perf in pandas_document_batch._stage_perf:
212213
assert original_perf in result._stage_perf, "Original stage performance should be preserved"
214+
215+
@pytest.mark.parametrize("consistent_filename", [True, False])
216+
def test_jsonl_writer_overwrites_existing_file(
217+
self,
218+
pandas_document_batch: DocumentBatch,
219+
consistent_filename: bool,
220+
tmpdir: str,
221+
):
222+
"""Test that ParquetWriter overwrites existing files when writing to the same path."""
223+
# Create writer with specific output directory for this test
224+
output_dir = os.path.join(tmpdir, f"jsonl_{pandas_document_batch.task_id}")
225+
writer = ParquetWriter(path=output_dir)
226+
227+
# Setup
228+
writer.setup()
229+
230+
# Process
231+
if consistent_filename:
232+
source_files = [f"file_{i}.jsonl" for i in range(len(pandas_document_batch.data))]
233+
pandas_document_batch._metadata["source_files"] = source_files
234+
# We write once
235+
result1 = writer.process(pandas_document_batch)
236+
filesize_1, file_modification_time_1 = os.path.getsize(result1.data[0]), os.path.getmtime(result1.data[0])
237+
time.sleep(0.01)
238+
# Then we overwrite it
239+
result2 = writer.process(pandas_document_batch)
240+
filesize_2, file_modification_time_2 = os.path.getsize(result2.data[0]), os.path.getmtime(result2.data[0])
241+
242+
if consistent_filename:
243+
assert result1.data[0] == result2.data[0], "File path should be the same, since it'll be a hash"
244+
else:
245+
assert result1.data[0] != result2.data[0], "File path should be different, since it'll be a uuid"
246+
# When using UUIDs, files are different, so no overwrite occurs
247+
248+
assert filesize_1 == filesize_2, "File size should be the same when written twice"
249+
assert file_modification_time_1 < file_modification_time_2, (
250+
"File modification time should be newer than the first write"
251+
)
252+
253+
pd.testing.assert_frame_equal(pd.read_parquet(result1.data[0]), pd.read_parquet(result2.data[0]))

0 commit comments

Comments
 (0)