Skip to content

Commit 49c377b

Browse files
committed
Update to move files as they get processed.
1 parent a7ecd5e commit 49c377b

File tree

2 files changed

+70
-53
lines changed

2 files changed

+70
-53
lines changed

centralized_metadata/optimize_pdf_dag.py

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,31 @@ def run_and_stream(command, prefix=None):
175175
raise subprocess.CalledProcessError(return_code, command)
176176

177177

178+
def _move_processed_pair(original_pdf: Path, optimized_pdf: Path):
179+
"""
180+
Move the optimized and source PDFs into their destination folders.
181+
Returns paths to the moved files.
182+
"""
183+
pdf_directory = original_pdf.parent
184+
optimized_directory = pdf_directory / "Optimized"
185+
originals_directory = pdf_directory / "Originals"
186+
optimized_directory.mkdir(parents=True, exist_ok=True)
187+
originals_directory.mkdir(parents=True, exist_ok=True)
188+
189+
optimized_destination = optimized_directory / optimized_pdf.name
190+
originals_destination = originals_directory / original_pdf.name
191+
logging.info(
192+
"Moving %s to %s and %s to %s",
193+
optimized_pdf,
194+
optimized_destination,
195+
original_pdf,
196+
originals_destination,
197+
)
198+
shutil.move(str(optimized_pdf), str(optimized_destination))
199+
shutil.move(str(original_pdf), str(originals_destination))
200+
return originals_destination, optimized_destination
201+
202+
178203
def process_pdfs(**context):
179204
"""Iterate through the directory and run OCRmyPDF with optimize level 1."""
180205
pdf_directory = _resolve_pdf_directory(context)
@@ -210,48 +235,20 @@ def process_pdfs(**context):
210235
logging.info("Running command: %s", " ".join(command))
211236

212237
run_and_stream(command, prefix=pdf_file.name)
238+
originals_destination, optimized_destination = _move_processed_pair(
239+
pdf_file, output_pdf
240+
)
213241

214242
processed_files.append(
215-
{"original": str(pdf_file), "optimized": str(output_pdf)}
243+
{
244+
"original": str(originals_destination),
245+
"optimized": str(optimized_destination),
246+
}
216247
)
217248

218249
return processed_files
219250

220251

221-
def move_processed_pdfs(**context):
222-
"""Move optimized and original PDFs into their destination directories."""
223-
ti = context["ti"]
224-
processed_files = ti.xcom_pull(task_ids="run_ocrmypdf") or []
225-
if not processed_files:
226-
logging.info("No processed files to move")
227-
return "no-files-to-move"
228-
229-
moved_count = 0
230-
for file_info in processed_files:
231-
original_pdf = Path(file_info["original"])
232-
optimized_pdf = Path(file_info["optimized"])
233-
pdf_directory = original_pdf.parent
234-
optimized_directory = pdf_directory / "Optimized"
235-
originals_directory = pdf_directory / "Originals"
236-
optimized_directory.mkdir(parents=True, exist_ok=True)
237-
originals_directory.mkdir(parents=True, exist_ok=True)
238-
239-
optimized_destination = optimized_directory / optimized_pdf.name
240-
originals_destination = originals_directory / original_pdf.name
241-
logging.info(
242-
"Moving %s to %s and %s to %s",
243-
optimized_pdf,
244-
optimized_destination,
245-
original_pdf,
246-
originals_destination,
247-
)
248-
shutil.move(str(optimized_pdf), str(optimized_destination))
249-
shutil.move(str(original_pdf), str(originals_destination))
250-
moved_count += 1
251-
252-
return f"moved-{moved_count}-pdfs"
253-
254-
255252
DAG = airflow.DAG(
256253
"ocrmypdf_batch",
257254
default_args=DEFAULT_ARGS,
@@ -270,19 +267,11 @@ def move_processed_pdfs(**context):
270267
dag=DAG,
271268
)
272269

273-
MOVE_PROCESSED_FILES = PythonOperator(
274-
task_id="move_processed_pdfs",
275-
python_callable=move_processed_pdfs,
276-
provide_context=True,
277-
dag=DAG,
278-
)
279-
280270
SUCCESS = EmptyOperator(
281271
task_id='success',
282272
on_success_callback=[teamspostonsuccess],
283273
trigger_rule="none_failed_min_one_success",
284274
dag=DAG,
285275
)
286276

287-
RUN_OCR.set_downstream(MOVE_PROCESSED_FILES)
288-
MOVE_PROCESSED_FILES.set_downstream(SUCCESS)
277+
RUN_OCR.set_downstream(SUCCESS)

tests/optimize_pdf_dag_test.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_dag_loads(self):
1515
"""Ensure the DAG is registered with the expected ID and task."""
1616
self.assertEqual(DAG.dag_id, "ocrmypdf_batch")
1717
task_ids = [task.task_id for task in DAG.tasks]
18-
self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs", "success"])
18+
self.assertEqual(task_ids, ["run_ocrmypdf", "success"])
1919

2020
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
2121
def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream):
@@ -26,21 +26,30 @@ def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream):
2626
open(pdf_one, "wb").close()
2727
open(pdf_two, "wb").close()
2828

29+
def fake_run(command, prefix=None):
30+
Path(command[5]).write_bytes(b"optimized")
31+
32+
mock_run_and_stream.side_effect = fake_run
33+
2934
result = process_pdfs(params={"pdf_directory": tmp_dir})
3035

3136
self.assertEqual(
3237
result,
3338
[
3439
{
35-
"original": pdf_one,
40+
"original": os.path.realpath(
41+
os.path.join(tmp_dir, "Originals", "file_one.pdf")
42+
),
3643
"optimized": os.path.realpath(
37-
os.path.join(tmp_dir, "file_one_opti.pdf")
44+
os.path.join(tmp_dir, "Optimized", "file_one_opti.pdf")
3845
),
3946
},
4047
{
41-
"original": pdf_two,
48+
"original": os.path.realpath(
49+
os.path.join(tmp_dir, "Originals", "file_two.pdf")
50+
),
4251
"optimized": os.path.realpath(
43-
os.path.join(tmp_dir, "file_two_opti.pdf")
52+
os.path.join(tmp_dir, "Optimized", "file_two_opti.pdf")
4453
),
4554
},
4655
],
@@ -61,6 +70,13 @@ def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream):
6170
)
6271
self.assertEqual(call_args.kwargs.get("prefix"), expected_file.name)
6372

73+
originals_dir = Path(tmp_dir) / "Originals"
74+
optimized_dir = Path(tmp_dir) / "Optimized"
75+
self.assertTrue((originals_dir / "file_one.pdf").exists())
76+
self.assertTrue((originals_dir / "file_two.pdf").exists())
77+
self.assertTrue((optimized_dir / "file_one_opti.pdf").exists())
78+
self.assertTrue((optimized_dir / "file_two_opti.pdf").exists())
79+
6480
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
6581
def test_process_pdfs_handles_empty_directory(self, mock_run_and_stream):
6682
"""Confirm we short-circuit gracefully when no PDFs exist."""
@@ -77,6 +93,11 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream):
7793
pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf"))
7894
open(pdf_path, "wb").close()
7995

96+
def fake_run(command, prefix=None):
97+
Path(command[5]).write_text("optimized")
98+
99+
mock_run_and_stream.side_effect = fake_run
100+
80101
dag_run_mock = mock.Mock()
81102
dag_run_mock.conf = {"pdf_directory": tmp_dir}
82103

@@ -88,9 +109,11 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream):
88109
result,
89110
[
90111
{
91-
"original": pdf_path,
112+
"original": os.path.realpath(
113+
os.path.join(tmp_dir, "Originals", "file.pdf")
114+
),
92115
"optimized": os.path.realpath(
93-
os.path.join(tmp_dir, "file_opti.pdf")
116+
os.path.join(tmp_dir, "Optimized", "file_opti.pdf")
94117
),
95118
}
96119
],
@@ -111,6 +134,11 @@ def test_process_pdfs_uses_share_root_and_relative_path(
111134
pdf_path = target_dir / "variable.pdf"
112135
pdf_path.touch()
113136

137+
def fake_run(command, prefix=None):
138+
Path(command[5]).write_bytes(b"optimized")
139+
140+
mock_run_and_stream.side_effect = fake_run
141+
114142
def fake_variable_get(key, default_var=None):
115143
if key == "OCR_PDF_SHARE_ROOT":
116144
return share_root
@@ -126,8 +154,8 @@ def fake_variable_get(key, default_var=None):
126154
result,
127155
[
128156
{
129-
"original": str(pdf_path.resolve()),
130-
"optimized": str((target_dir / "variable_opti.pdf").resolve()),
157+
"original": str((target_dir / "Originals" / "variable.pdf").resolve()),
158+
"optimized": str((target_dir / "Optimized" / "variable_opti.pdf").resolve()),
131159
}
132160
],
133161
)

0 commit comments

Comments
 (0)