Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 71 additions & 74 deletions centralized_metadata/optimize_pdf_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@
"{{ dag_run.logical_date }} {{ ti.log_url }}"
),
)
slackpostonsuccess = send_slack_notification(
channel="infra_alerts",
username="airflow",
text=(
":tada: Task succeeded: {{ dag.dag_id }} {{ ti.task_id }} "
"{{ dag_run.logical_date }} {{ ti.log_url }}"
),
)


def _post_to_teams(title, text, theme_color):
Expand Down Expand Up @@ -115,33 +107,14 @@ def teamspostonsuccess(context):
)


def _run_failure_callbacks(context):
for callback in (slackpostonfail, teamspostonfail):
try:
callback(context)
except Exception as exc: # pylint: disable=broad-exception-caught
logging.exception(
"Failure callback %r raised an exception: %s", callback, exc
)


def _run_success_callbacks(context):
for callback in (slackpostonsuccess, teamspostonsuccess):
try:
callback(context)
except Exception as exc: # pylint: disable=broad-exception-caught
logging.exception(
"Success callback %r raised an exception: %s", callback, exc
)


DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": pendulum.datetime(2024, 1, 1, tz="UTC"),
"email_on_failure": False,
"email_on_retry": False,
"on_failure_callback": _run_failure_callbacks,
"on_failure_callback": [slackpostonfail,teamspostonfail],
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
Expand Down Expand Up @@ -174,6 +147,59 @@ def _resolve_pdf_directory(context):
return configured_directory.resolve()


def run_and_stream(command, prefix=None):
"""
Run a subprocess command, stream output to logs in real time,
and raise CalledProcessError if the command fails.
"""
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
) as process:
if process.stdout is None:
raise RuntimeError("stdout pipe was not created")

for line in process.stdout:
line = line.rstrip()
if prefix:
logging.info("[%s] %s", prefix, line)
else:
logging.info("%s", line)

return_code = process.wait()

if return_code != 0:
raise subprocess.CalledProcessError(return_code, command)


def _move_processed_pair(original_pdf: Path, optimized_pdf: Path):
"""
Move the optimized and source PDFs into their destination folders.
Returns paths to the moved files.
"""
pdf_directory = original_pdf.parent
optimized_directory = pdf_directory / "Optimized"
originals_directory = pdf_directory / "Originals"
optimized_directory.mkdir(parents=True, exist_ok=True)
originals_directory.mkdir(parents=True, exist_ok=True)

optimized_destination = optimized_directory / optimized_pdf.name
originals_destination = originals_directory / original_pdf.name
logging.info(
"Moving %s to %s and %s to %s",
optimized_pdf,
optimized_destination,
original_pdf,
originals_destination,
)
shutil.move(str(optimized_pdf), str(optimized_destination))
shutil.move(str(original_pdf), str(originals_destination))
return originals_destination, optimized_destination


def process_pdfs(**context):
"""Iterate through the directory and run OCRmyPDF with optimize level 1."""
pdf_directory = _resolve_pdf_directory(context)
Expand All @@ -189,59 +215,38 @@ def process_pdfs(**context):
return []

processed_files = []

for pdf_file in pdf_files:
output_pdf = pdf_file.with_name(f"{pdf_file.stem}_opti.pdf")

if output_pdf.exists():
logging.info("Removing existing optimized PDF: %s", output_pdf)
output_pdf.unlink()

command = [
"ocrmypdf",
"--skip-text",
"--optimize",
"1",
str(pdf_file),
str(output_pdf),
]
logging.info("Running command: %s", " ".join(command))
subprocess.run(command, check=True) # Raises CalledProcessError on failure.
processed_files.append(
{"original": str(pdf_file), "optimized": str(output_pdf)}
)

return processed_files
logging.info("Running command: %s", " ".join(command))

run_and_stream(command, prefix=pdf_file.name)
originals_destination, optimized_destination = _move_processed_pair(
pdf_file, output_pdf
)

def move_processed_pdfs(**context):
"""Move optimized and original PDFs into their destination directories."""
ti = context["ti"]
processed_files = ti.xcom_pull(task_ids="run_ocrmypdf") or []
if not processed_files:
logging.info("No processed files to move")
return "no-files-to-move"

moved_count = 0
for file_info in processed_files:
original_pdf = Path(file_info["original"])
optimized_pdf = Path(file_info["optimized"])
pdf_directory = original_pdf.parent
optimized_directory = pdf_directory / "Optimized"
originals_directory = pdf_directory / "Originals"
optimized_directory.mkdir(parents=True, exist_ok=True)
originals_directory.mkdir(parents=True, exist_ok=True)

optimized_destination = optimized_directory / optimized_pdf.name
originals_destination = originals_directory / original_pdf.name
logging.info(
"Moving %s to %s and %s to %s",
optimized_pdf,
optimized_destination,
original_pdf,
originals_destination,
processed_files.append(
{
"original": str(originals_destination),
"optimized": str(optimized_destination),
}
)
shutil.move(str(optimized_pdf), str(optimized_destination))
shutil.move(str(original_pdf), str(originals_destination))
moved_count += 1

return f"moved-{moved_count}-pdfs"
return processed_files


DAG = airflow.DAG(
Expand All @@ -262,19 +267,11 @@ def move_processed_pdfs(**context):
dag=DAG,
)

MOVE_PROCESSED_FILES = PythonOperator(
task_id="move_processed_pdfs",
python_callable=move_processed_pdfs,
provide_context=True,
dag=DAG,
)

SUCCESS = EmptyOperator(
task_id='success',
on_success_callback=_run_success_callbacks,
on_success_callback=[teamspostonsuccess],
trigger_rule="none_failed_min_one_success",
dag=DAG,
)

RUN_OCR.set_downstream(MOVE_PROCESSED_FILES)
MOVE_PROCESSED_FILES.set_downstream(SUCCESS)
RUN_OCR.set_downstream(SUCCESS)
92 changes: 64 additions & 28 deletions tests/optimize_pdf_dag_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,89 @@ def test_dag_loads(self):
"""Ensure the DAG is registered with the expected ID and task."""
self.assertEqual(DAG.dag_id, "ocrmypdf_batch")
task_ids = [task.task_id for task in DAG.tasks]
self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs", "success"])
self.assertEqual(task_ids, ["run_ocrmypdf", "success"])

@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
def test_process_pdfs_runs_command_per_file(self, mock_run):
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream):
"""Touch PDFs and verify we issue an OCR command for each."""
with tempfile.TemporaryDirectory() as tmp_dir:
pdf_one = os.path.realpath(os.path.join(tmp_dir, "file_one.pdf"))
pdf_two = os.path.realpath(os.path.join(tmp_dir, "file_two.pdf"))
open(pdf_one, "wb").close()
open(pdf_two, "wb").close()

def fake_run(command, prefix=None):
Path(command[5]).write_bytes(b"optimized")

mock_run_and_stream.side_effect = fake_run

result = process_pdfs(params={"pdf_directory": tmp_dir})

self.assertEqual(
result,
[
{
"original": pdf_one,
"original": os.path.realpath(
os.path.join(tmp_dir, "Originals", "file_one.pdf")
),
"optimized": os.path.realpath(
os.path.join(tmp_dir, "file_one_opti.pdf")
os.path.join(tmp_dir, "Optimized", "file_one_opti.pdf")
),
},
{
"original": pdf_two,
"original": os.path.realpath(
os.path.join(tmp_dir, "Originals", "file_two.pdf")
),
"optimized": os.path.realpath(
os.path.join(tmp_dir, "file_two_opti.pdf")
os.path.join(tmp_dir, "Optimized", "file_two_opti.pdf")
),
},
],
)
self.assertEqual(mock_run.call_count, 2)
self.assertEqual(mock_run_and_stream.call_count, 2)

expected_files = [Path(pdf_one), Path(pdf_two)]
for call_args, expected_file in zip(mock_run.call_args_list, expected_files):
for call_args, expected_file in zip(
mock_run_and_stream.call_args_list, expected_files
):
command = call_args.args[0]
self.assertEqual(command[:3], ["ocrmypdf", "--optimize", "1"])
self.assertEqual(Path(command[3]).name, expected_file.name)
self.assertEqual(Path(command[4]).name, f"{expected_file.stem}_opti.pdf")
self.assertTrue(call_args.kwargs.get("check"))

@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
def test_process_pdfs_handles_empty_directory(self, mock_run):
self.assertEqual(
command[:4], ["ocrmypdf", "--skip-text", "--optimize", "1"]
)
self.assertEqual(Path(command[4]).name, expected_file.name)
self.assertEqual(
Path(command[5]).name, f"{expected_file.stem}_opti.pdf"
)
self.assertEqual(call_args.kwargs.get("prefix"), expected_file.name)

originals_dir = Path(tmp_dir) / "Originals"
optimized_dir = Path(tmp_dir) / "Optimized"
self.assertTrue((originals_dir / "file_one.pdf").exists())
self.assertTrue((originals_dir / "file_two.pdf").exists())
self.assertTrue((optimized_dir / "file_one_opti.pdf").exists())
self.assertTrue((optimized_dir / "file_two_opti.pdf").exists())

@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
def test_process_pdfs_handles_empty_directory(self, mock_run_and_stream):
"""Confirm we short-circuit gracefully when no PDFs exist."""
with tempfile.TemporaryDirectory() as tmp_dir:
result = process_pdfs(params={"pdf_directory": tmp_dir})

self.assertEqual(result, [])
mock_run.assert_not_called()
mock_run_and_stream.assert_not_called()

@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
def test_process_pdfs_prefers_dag_run_conf(self, mock_run):
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream):
"""dag_run.conf should override params and defaults."""
with tempfile.TemporaryDirectory() as tmp_dir:
pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf"))
open(pdf_path, "wb").close()

def fake_run(command, prefix=None):
Path(command[5]).write_text("optimized")

mock_run_and_stream.side_effect = fake_run

dag_run_mock = mock.Mock()
dag_run_mock.conf = {"pdf_directory": tmp_dir}

Expand All @@ -82,18 +109,22 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run):
result,
[
{
"original": pdf_path,
"original": os.path.realpath(
os.path.join(tmp_dir, "Originals", "file.pdf")
),
"optimized": os.path.realpath(
os.path.join(tmp_dir, "file_opti.pdf")
os.path.join(tmp_dir, "Optimized", "file_opti.pdf")
),
}
],
)
mock_run.assert_called_once()
mock_run_and_stream.assert_called_once()

@mock.patch("centralized_metadata.optimize_pdf_dag.Variable.get")
@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
def test_process_pdfs_uses_share_root_and_relative_path(self, mock_run, mock_variable_get):
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
def test_process_pdfs_uses_share_root_and_relative_path(
self, mock_run_and_stream, mock_variable_get
):
"""Variable-based root + relative path should resolve to final directory."""
with tempfile.TemporaryDirectory() as tmp_dir:
share_root = os.path.realpath(tmp_dir)
Expand All @@ -103,6 +134,11 @@ def test_process_pdfs_uses_share_root_and_relative_path(self, mock_run, mock_var
pdf_path = target_dir / "variable.pdf"
pdf_path.touch()

def fake_run(command, prefix=None):
Path(command[5]).write_bytes(b"optimized")

mock_run_and_stream.side_effect = fake_run

def fake_variable_get(key, default_var=None):
if key == "OCR_PDF_SHARE_ROOT":
return share_root
Expand All @@ -118,13 +154,13 @@ def fake_variable_get(key, default_var=None):
result,
[
{
"original": str(pdf_path.resolve()),
"optimized": str((target_dir / "variable_opti.pdf").resolve()),
"original": str((target_dir / "Originals" / "variable.pdf").resolve()),
"optimized": str((target_dir / "Optimized" / "variable_opti.pdf").resolve()),
}
],
)
command = mock_run.call_args.args[0]
self.assertEqual(Path(command[3]).resolve(), pdf_path.resolve())
command = mock_run_and_stream.call_args.args[0]
self.assertEqual(Path(command[4]).resolve(), pdf_path.resolve())

if __name__ == "__main__":
unittest.main()
Loading