Skip to content

Commit b7eb0d5

Browse files
Merge pull request #918 from tulibraries/ocrmypdf-file-updates-devo1302-devo-1339-devo-1340-devo-1301
Ocrmypdf file updates devo1302 devo 1339 devo 1340 devo 1301
2 parents ca7604c + 4ea477d commit b7eb0d5

File tree

2 files changed

+105
-19
lines changed

2 files changed

+105
-19
lines changed

centralized_metadata/ocrmypdf_processing_dag.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Airflow DAG to run OCRmyPDF across a target directory."""
22
import logging
3+
import shutil
34
import subprocess
45
from datetime import timedelta
56
from pathlib import Path
@@ -14,6 +15,8 @@
1415
DEFAULT_PDF_DIRECTORY = str(Path(DEFAULT_SHARE_ROOT) / DEFAULT_RELATIVE_PATH)
1516
SHARE_ROOT_VARIABLE = "OCR_PDF_SHARE_ROOT"
1617
RELATIVE_PATH_VARIABLE = "OCR_PDF_RELATIVE_PATH"
18+
DEFAULT_SCHEDULE_INTERVAL = "@weekly"
19+
SCHEDULE_INTERVAL_VARIABLE = "OPTIMIZE_PDF_SCHEDULE_INTERVAL"
1720

1821
DEFAULT_ARGS = {
1922
"owner": "airflow",
@@ -65,10 +68,11 @@ def process_pdfs(**context):
6568

6669
if not pdf_files:
6770
logging.info("No PDF files found in %s", pdf_directory)
68-
return "no-pdfs-found"
71+
return []
6972

73+
processed_files = []
7074
for pdf_file in pdf_files:
71-
output_pdf = pdf_file.with_name(f"{pdf_file.stem}_ocr.pdf")
75+
output_pdf = pdf_file.with_name(f"{pdf_file.stem}_opti.pdf")
7276
command = [
7377
"ocrmypdf",
7478
"--optimize",
@@ -78,16 +82,55 @@ def process_pdfs(**context):
7882
]
7983
logging.info("Running command: %s", " ".join(command))
8084
subprocess.run(command, check=True) # Raises CalledProcessError on failure.
81-
82-
return f"processed-{len(pdf_files)}-pdfs"
85+
processed_files.append(
86+
{"original": str(pdf_file), "optimized": str(output_pdf)}
87+
)
88+
89+
return processed_files
90+
91+
92+
def move_processed_pdfs(**context):
93+
"""Move optimized and original PDFs into their destination directories."""
94+
ti = context["ti"]
95+
processed_files = ti.xcom_pull(task_ids="run_ocrmypdf") or []
96+
if not processed_files:
97+
logging.info("No processed files to move")
98+
return "no-files-to-move"
99+
100+
moved_count = 0
101+
for file_info in processed_files:
102+
original_pdf = Path(file_info["original"])
103+
optimized_pdf = Path(file_info["optimized"])
104+
pdf_directory = original_pdf.parent
105+
optimized_directory = pdf_directory / "Optimized"
106+
originals_directory = pdf_directory / "Originals"
107+
optimized_directory.mkdir(parents=True, exist_ok=True)
108+
originals_directory.mkdir(parents=True, exist_ok=True)
109+
110+
optimized_destination = optimized_directory / optimized_pdf.name
111+
originals_destination = originals_directory / original_pdf.name
112+
logging.info(
113+
"Moving %s to %s and %s to %s",
114+
optimized_pdf,
115+
optimized_destination,
116+
original_pdf,
117+
originals_destination,
118+
)
119+
shutil.move(str(optimized_pdf), str(optimized_destination))
120+
shutil.move(str(original_pdf), str(originals_destination))
121+
moved_count += 1
122+
123+
return f"moved-{moved_count}-pdfs"
83124

84125

85126
DAG = airflow.DAG(
86127
"ocrmypdf_batch",
87128
default_args=DEFAULT_ARGS,
88129
catchup=False,
89130
max_active_runs=1,
90-
schedule_interval=None,
131+
schedule_interval=Variable.get(
132+
SCHEDULE_INTERVAL_VARIABLE, default_var=DEFAULT_SCHEDULE_INTERVAL
133+
),
91134
)
92135

93136
RUN_OCR = PythonOperator(
@@ -97,3 +140,12 @@ def process_pdfs(**context):
97140
params={"pdf_directory": DEFAULT_PDF_DIRECTORY},
98141
dag=DAG,
99142
)
143+
144+
MOVE_PROCESSED_FILES = PythonOperator(
145+
task_id="move_processed_pdfs",
146+
python_callable=move_processed_pdfs,
147+
provide_context=True,
148+
dag=DAG,
149+
)
150+
151+
RUN_OCR.set_downstream(MOVE_PROCESSED_FILES)

tests/ocrmypdf_processing_dag_test.py

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,44 @@ def test_dag_loads(self):
1515
"""Ensure the DAG is registered with the expected ID and task."""
1616
self.assertEqual(DAG.dag_id, "ocrmypdf_batch")
1717
task_ids = [task.task_id for task in DAG.tasks]
18-
self.assertEqual(task_ids, ["run_ocrmypdf"])
18+
self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs"])
1919

2020
@mock.patch("centralized_metadata.ocrmypdf_processing_dag.subprocess.run")
2121
def test_process_pdfs_runs_command_per_file(self, mock_run):
2222
"""Touch PDFs and verify we issue an OCR command for each."""
2323
with tempfile.TemporaryDirectory() as tmp_dir:
24-
pdf_one = os.path.join(tmp_dir, "file_one.pdf")
25-
pdf_two = os.path.join(tmp_dir, "file_two.pdf")
24+
pdf_one = os.path.realpath(os.path.join(tmp_dir, "file_one.pdf"))
25+
pdf_two = os.path.realpath(os.path.join(tmp_dir, "file_two.pdf"))
2626
open(pdf_one, "wb").close()
2727
open(pdf_two, "wb").close()
2828

2929
result = process_pdfs(params={"pdf_directory": tmp_dir})
3030

31-
self.assertEqual(result, "processed-2-pdfs")
31+
self.assertEqual(
32+
result,
33+
[
34+
{
35+
"original": pdf_one,
36+
"optimized": os.path.realpath(
37+
os.path.join(tmp_dir, "file_one_opti.pdf")
38+
),
39+
},
40+
{
41+
"original": pdf_two,
42+
"optimized": os.path.realpath(
43+
os.path.join(tmp_dir, "file_two_opti.pdf")
44+
),
45+
},
46+
],
47+
)
3248
self.assertEqual(mock_run.call_count, 2)
3349

3450
expected_files = [Path(pdf_one), Path(pdf_two)]
3551
for call_args, expected_file in zip(mock_run.call_args_list, expected_files):
3652
command = call_args.args[0]
3753
self.assertEqual(command[:3], ["ocrmypdf", "--optimize", "1"])
3854
self.assertEqual(Path(command[3]).name, expected_file.name)
39-
self.assertEqual(
40-
Path(command[4]).name, f"{expected_file.stem}_ocr.pdf"
41-
)
55+
self.assertEqual(Path(command[4]).name, f"{expected_file.stem}_opti.pdf")
4256
self.assertTrue(call_args.kwargs.get("check"))
4357

4458
@mock.patch("centralized_metadata.ocrmypdf_processing_dag.subprocess.run")
@@ -47,30 +61,42 @@ def test_process_pdfs_handles_empty_directory(self, mock_run):
4761
with tempfile.TemporaryDirectory() as tmp_dir:
4862
result = process_pdfs(params={"pdf_directory": tmp_dir})
4963

50-
self.assertEqual(result, "no-pdfs-found")
64+
self.assertEqual(result, [])
5165
mock_run.assert_not_called()
5266

5367
@mock.patch("centralized_metadata.ocrmypdf_processing_dag.subprocess.run")
5468
def test_process_pdfs_prefers_dag_run_conf(self, mock_run):
5569
"""dag_run.conf should override params and defaults."""
5670
with tempfile.TemporaryDirectory() as tmp_dir:
57-
pdf_path = os.path.join(tmp_dir, "file.pdf")
71+
pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf"))
5872
open(pdf_path, "wb").close()
5973

6074
dag_run_mock = mock.Mock()
6175
dag_run_mock.conf = {"pdf_directory": tmp_dir}
6276

63-
result = process_pdfs(dag_run=dag_run_mock, params={"pdf_directory": "/unused"})
64-
65-
self.assertEqual(result, "processed-1-pdfs")
77+
result = process_pdfs(
78+
dag_run=dag_run_mock, params={"pdf_directory": "/unused"}
79+
)
80+
81+
self.assertEqual(
82+
result,
83+
[
84+
{
85+
"original": pdf_path,
86+
"optimized": os.path.realpath(
87+
os.path.join(tmp_dir, "file_opti.pdf")
88+
),
89+
}
90+
],
91+
)
6692
mock_run.assert_called_once()
6793

6894
@mock.patch("centralized_metadata.ocrmypdf_processing_dag.Variable.get")
6995
@mock.patch("centralized_metadata.ocrmypdf_processing_dag.subprocess.run")
7096
def test_process_pdfs_uses_share_root_and_relative_path(self, mock_run, mock_variable_get):
7197
"""Variable-based root + relative path should resolve to final directory."""
7298
with tempfile.TemporaryDirectory() as tmp_dir:
73-
share_root = tmp_dir
99+
share_root = os.path.realpath(tmp_dir)
74100
relative_path = "incoming"
75101
target_dir = Path(share_root) / relative_path
76102
target_dir.mkdir()
@@ -88,7 +114,15 @@ def fake_variable_get(key, default_var=None):
88114

89115
result = process_pdfs()
90116

91-
self.assertEqual(result, "processed-1-pdfs")
117+
self.assertEqual(
118+
result,
119+
[
120+
{
121+
"original": str(pdf_path.resolve()),
122+
"optimized": str((target_dir / "variable_opti.pdf").resolve()),
123+
}
124+
],
125+
)
92126
command = mock_run.call_args.args[0]
93127
self.assertEqual(Path(command[3]).resolve(), pdf_path.resolve())
94128

0 commit comments

Comments
 (0)