From a7ecd5e937e302b74a8962666dd42be6bb49c124 Mon Sep 17 00:00:00 2001 From: David Kinzer Date: Wed, 11 Mar 2026 21:41:46 -0400 Subject: [PATCH 1/2] Fix command fails when file to optimize has been OCRed. * Also adds better logging. * Also cleans up notifications. Depends on: - [ ] https://git.temple.edu/tulibraries/charts/-/merge_requests/513 --- centralized_metadata/optimize_pdf_dag.py | 68 +++++++++++++----------- tests/optimize_pdf_dag_test.py | 46 +++++++++------- 2 files changed, 65 insertions(+), 49 deletions(-) diff --git a/centralized_metadata/optimize_pdf_dag.py b/centralized_metadata/optimize_pdf_dag.py index 4d8eb17..a0f5be3 100644 --- a/centralized_metadata/optimize_pdf_dag.py +++ b/centralized_metadata/optimize_pdf_dag.py @@ -32,14 +32,6 @@ "{{ dag_run.logical_date }} {{ ti.log_url }}" ), ) -slackpostonsuccess = send_slack_notification( - channel="infra_alerts", - username="airflow", - text=( - ":tada: Task succeeded: {{ dag.dag_id }} {{ ti.task_id }} " - "{{ dag_run.logical_date }} {{ ti.log_url }}" - ), -) def _post_to_teams(title, text, theme_color): @@ -115,25 +107,6 @@ def teamspostonsuccess(context): ) -def _run_failure_callbacks(context): - for callback in (slackpostonfail, teamspostonfail): - try: - callback(context) - except Exception as exc: # pylint: disable=broad-exception-caught - logging.exception( - "Failure callback %r raised an exception: %s", callback, exc - ) - - -def _run_success_callbacks(context): - for callback in (slackpostonsuccess, teamspostonsuccess): - try: - callback(context) - except Exception as exc: # pylint: disable=broad-exception-caught - logging.exception( - "Success callback %r raised an exception: %s", callback, exc - ) - DEFAULT_ARGS = { "owner": "airflow", @@ -141,7 +114,7 @@ def _run_success_callbacks(context): "start_date": pendulum.datetime(2024, 1, 1, tz="UTC"), "email_on_failure": False, "email_on_retry": False, - "on_failure_callback": _run_failure_callbacks, + "on_failure_callback": [slackpostonfail,teamspostonfail], "retries": 1, "retry_delay": timedelta(minutes=5), } @@ -174,6 +147,34 @@ def _resolve_pdf_directory(context): return configured_directory.resolve() +def run_and_stream(command, prefix=None): + """ + Run a subprocess command, stream output to logs in real time, + and raise CalledProcessError if the command fails. + """ + with subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) as process: + if process.stdout is None: + raise RuntimeError("stdout pipe was not created") + + for line in process.stdout: + line = line.rstrip() + if prefix: + logging.info("[%s] %s", prefix, line) + else: + logging.info("%s", line) + + return_code = process.wait() + + if return_code != 0: + raise subprocess.CalledProcessError(return_code, command) + + def process_pdfs(**context): """Iterate through the directory and run OCRmyPDF with optimize level 1.""" pdf_directory = _resolve_pdf_directory(context) @@ -189,20 +190,27 @@ def process_pdfs(**context): return [] processed_files = [] + for pdf_file in pdf_files: output_pdf = pdf_file.with_name(f"{pdf_file.stem}_opti.pdf") + if output_pdf.exists(): logging.info("Removing existing optimized PDF: %s", output_pdf) output_pdf.unlink() + command = [ "ocrmypdf", + "--skip-text", "--optimize", "1", str(pdf_file), str(output_pdf), ] + logging.info("Running command: %s", " ".join(command)) - subprocess.run(command, check=True) # Raises CalledProcessError on failure. + + run_and_stream(command, prefix=pdf_file.name) + processed_files.append( {"original": str(pdf_file), "optimized": str(output_pdf)} ) @@ -271,7 +279,7 @@ def move_processed_pdfs(**context): SUCCESS = EmptyOperator( task_id='success', - on_success_callback=_run_success_callbacks, + on_success_callback=[teamspostonsuccess], trigger_rule="none_failed_min_one_success", dag=DAG, ) diff --git a/tests/optimize_pdf_dag_test.py b/tests/optimize_pdf_dag_test.py index 9b85f6b..63b1654 100644 --- a/tests/optimize_pdf_dag_test.py +++ b/tests/optimize_pdf_dag_test.py @@ -17,8 +17,8 @@ def test_dag_loads(self): task_ids = [task.task_id for task in DAG.tasks] self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs", "success"]) - @mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run") - def test_process_pdfs_runs_command_per_file(self, mock_run): + @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") + def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream): """Touch PDFs and verify we issue an OCR command for each.""" with tempfile.TemporaryDirectory() as tmp_dir: pdf_one = os.path.realpath(os.path.join(tmp_dir, "file_one.pdf")) @@ -45,27 +45,33 @@ def test_process_pdfs_runs_command_per_file(self, mock_run): }, ], ) - self.assertEqual(mock_run.call_count, 2) + self.assertEqual(mock_run_and_stream.call_count, 2) expected_files = [Path(pdf_one), Path(pdf_two)] - for call_args, expected_file in zip(mock_run.call_args_list, expected_files): + for call_args, expected_file in zip( + mock_run_and_stream.call_args_list, expected_files + ): command = call_args.args[0] - self.assertEqual(command[:3], ["ocrmypdf", "--optimize", "1"]) - self.assertEqual(Path(command[3]).name, expected_file.name) - self.assertEqual(Path(command[4]).name, f"{expected_file.stem}_opti.pdf") - self.assertTrue(call_args.kwargs.get("check")) - - @mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run") - def test_process_pdfs_handles_empty_directory(self, mock_run): + self.assertEqual( + command[:4], ["ocrmypdf", "--skip-text", "--optimize", "1"] + ) + self.assertEqual(Path(command[4]).name, expected_file.name) + self.assertEqual( + Path(command[5]).name, f"{expected_file.stem}_opti.pdf" + ) + self.assertEqual(call_args.kwargs.get("prefix"), expected_file.name) + + @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") + def test_process_pdfs_handles_empty_directory(self, mock_run_and_stream): """Confirm we short-circuit gracefully when no PDFs exist.""" with tempfile.TemporaryDirectory() as tmp_dir: result = process_pdfs(params={"pdf_directory": tmp_dir}) self.assertEqual(result, []) - mock_run.assert_not_called() + mock_run_and_stream.assert_not_called() - @mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run") - def test_process_pdfs_prefers_dag_run_conf(self, mock_run): + @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") + def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream): """dag_run.conf should override params and defaults.""" with tempfile.TemporaryDirectory() as tmp_dir: pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf")) @@ -89,11 +95,13 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run): } ], ) - mock_run.assert_called_once() + mock_run_and_stream.assert_called_once() @mock.patch("centralized_metadata.optimize_pdf_dag.Variable.get") - @mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run") - def test_process_pdfs_uses_share_root_and_relative_path(self, mock_run, mock_variable_get): + @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") + def test_process_pdfs_uses_share_root_and_relative_path( + self, mock_run_and_stream, mock_variable_get + ): """Variable-based root + relative path should resolve to final directory.""" with tempfile.TemporaryDirectory() as tmp_dir: share_root = os.path.realpath(tmp_dir) @@ -123,8 +131,8 @@ def fake_variable_get(key, default_var=None): } ], ) - command = mock_run.call_args.args[0] - self.assertEqual(Path(command[3]).resolve(), pdf_path.resolve()) + command = mock_run_and_stream.call_args.args[0] + self.assertEqual(Path(command[4]).resolve(), pdf_path.resolve()) if __name__ == "__main__": unittest.main() From 49c377b5233ca2bc7fab013e81bbbe6a07b40b93 Mon Sep 17 00:00:00 2001 From: David Kinzer Date: Wed, 11 Mar 2026 22:11:43 -0400 Subject: [PATCH 2/2] Update to move files as they get processed. --- centralized_metadata/optimize_pdf_dag.py | 77 ++++++++++-------------- tests/optimize_pdf_dag_test.py | 46 +++++++++++--- 2 files changed, 70 insertions(+), 53 deletions(-) diff --git a/centralized_metadata/optimize_pdf_dag.py b/centralized_metadata/optimize_pdf_dag.py index a0f5be3..506fc45 100644 --- a/centralized_metadata/optimize_pdf_dag.py +++ b/centralized_metadata/optimize_pdf_dag.py @@ -175,6 +175,31 @@ def run_and_stream(command, prefix=None): raise subprocess.CalledProcessError(return_code, command) +def _move_processed_pair(original_pdf: Path, optimized_pdf: Path): + """ + Move the optimized and source PDFs into their destination folders. + Returns paths to the moved files. + """ + pdf_directory = original_pdf.parent + optimized_directory = pdf_directory / "Optimized" + originals_directory = pdf_directory / "Originals" + optimized_directory.mkdir(parents=True, exist_ok=True) + originals_directory.mkdir(parents=True, exist_ok=True) + + optimized_destination = optimized_directory / optimized_pdf.name + originals_destination = originals_directory / original_pdf.name + logging.info( + "Moving %s to %s and %s to %s", + optimized_pdf, + optimized_destination, + original_pdf, + originals_destination, + ) + shutil.move(str(optimized_pdf), str(optimized_destination)) + shutil.move(str(original_pdf), str(originals_destination)) + return originals_destination, optimized_destination + + def process_pdfs(**context): """Iterate through the directory and run OCRmyPDF with optimize level 1.""" pdf_directory = _resolve_pdf_directory(context) @@ -210,48 +235,20 @@ def process_pdfs(**context): logging.info("Running command: %s", " ".join(command)) run_and_stream(command, prefix=pdf_file.name) + originals_destination, optimized_destination = _move_processed_pair( + pdf_file, output_pdf + ) processed_files.append( - {"original": str(pdf_file), "optimized": str(output_pdf)} + { + "original": str(originals_destination), + "optimized": str(optimized_destination), + } ) return processed_files -def move_processed_pdfs(**context): - """Move optimized and original PDFs into their destination directories.""" - ti = context["ti"] - processed_files = ti.xcom_pull(task_ids="run_ocrmypdf") or [] - if not processed_files: - logging.info("No processed files to move") - return "no-files-to-move" - - moved_count = 0 - for file_info in processed_files: - original_pdf = Path(file_info["original"]) - optimized_pdf = Path(file_info["optimized"]) - pdf_directory = original_pdf.parent - optimized_directory = pdf_directory / "Optimized" - originals_directory = pdf_directory / "Originals" - optimized_directory.mkdir(parents=True, exist_ok=True) - originals_directory.mkdir(parents=True, exist_ok=True) - - optimized_destination = optimized_directory / optimized_pdf.name - originals_destination = originals_directory / original_pdf.name - logging.info( - "Moving %s to %s and %s to %s", - optimized_pdf, - optimized_destination, - original_pdf, - originals_destination, - ) - shutil.move(str(optimized_pdf), str(optimized_destination)) - shutil.move(str(original_pdf), str(originals_destination)) - moved_count += 1 - - return f"moved-{moved_count}-pdfs" - - DAG = airflow.DAG( "ocrmypdf_batch", default_args=DEFAULT_ARGS, @@ -270,13 +267,6 @@ def move_processed_pdfs(**context): dag=DAG, ) -MOVE_PROCESSED_FILES = PythonOperator( - task_id="move_processed_pdfs", - python_callable=move_processed_pdfs, - provide_context=True, - dag=DAG, -) - SUCCESS = EmptyOperator( task_id='success', on_success_callback=[teamspostonsuccess], @@ -284,5 +274,4 @@ def move_processed_pdfs(**context): dag=DAG, ) -RUN_OCR.set_downstream(MOVE_PROCESSED_FILES) -MOVE_PROCESSED_FILES.set_downstream(SUCCESS) +RUN_OCR.set_downstream(SUCCESS) diff --git a/tests/optimize_pdf_dag_test.py b/tests/optimize_pdf_dag_test.py index 63b1654..322f105 100644 --- a/tests/optimize_pdf_dag_test.py +++ b/tests/optimize_pdf_dag_test.py @@ -15,7 +15,7 @@ def test_dag_loads(self): """Ensure the DAG is registered with the expected ID and task.""" self.assertEqual(DAG.dag_id, "ocrmypdf_batch") task_ids = [task.task_id for task in DAG.tasks] - self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs", "success"]) + self.assertEqual(task_ids, ["run_ocrmypdf", "success"]) @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream): @@ -26,21 +26,30 @@ def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream): open(pdf_one, "wb").close() open(pdf_two, "wb").close() + def fake_run(command, prefix=None): + Path(command[5]).write_bytes(b"optimized") + + mock_run_and_stream.side_effect = fake_run + result = process_pdfs(params={"pdf_directory": tmp_dir}) self.assertEqual( result, [ { - "original": pdf_one, + "original": os.path.realpath( + os.path.join(tmp_dir, "Originals", "file_one.pdf") + ), "optimized": os.path.realpath( - os.path.join(tmp_dir, "file_one_opti.pdf") + os.path.join(tmp_dir, "Optimized", "file_one_opti.pdf") ), }, { - "original": pdf_two, + "original": os.path.realpath( + os.path.join(tmp_dir, "Originals", "file_two.pdf") + ), "optimized": os.path.realpath( - os.path.join(tmp_dir, "file_two_opti.pdf") + os.path.join(tmp_dir, "Optimized", "file_two_opti.pdf") ), }, ], @@ -61,6 +70,13 @@ def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream): ) self.assertEqual(call_args.kwargs.get("prefix"), expected_file.name) + originals_dir = Path(tmp_dir) / "Originals" + optimized_dir = Path(tmp_dir) / "Optimized" + self.assertTrue((originals_dir / "file_one.pdf").exists()) + self.assertTrue((originals_dir / "file_two.pdf").exists()) + self.assertTrue((optimized_dir / "file_one_opti.pdf").exists()) + self.assertTrue((optimized_dir / "file_two_opti.pdf").exists()) + @mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream") def test_process_pdfs_handles_empty_directory(self, mock_run_and_stream): """Confirm we short-circuit gracefully when no PDFs exist.""" @@ -77,6 +93,11 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream): pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf")) open(pdf_path, "wb").close() + def fake_run(command, prefix=None): + Path(command[5]).write_text("optimized") + + mock_run_and_stream.side_effect = fake_run + dag_run_mock = mock.Mock() dag_run_mock.conf = {"pdf_directory": tmp_dir} @@ -88,9 +109,11 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream): result, [ { - "original": pdf_path, + "original": os.path.realpath( + os.path.join(tmp_dir, "Originals", "file.pdf") + ), "optimized": os.path.realpath( - os.path.join(tmp_dir, "file_opti.pdf") + os.path.join(tmp_dir, "Optimized", "file_opti.pdf") ), } ], @@ -111,6 +134,11 @@ def test_process_pdfs_uses_share_root_and_relative_path( pdf_path = target_dir / "variable.pdf" pdf_path.touch() + def fake_run(command, prefix=None): + Path(command[5]).write_bytes(b"optimized") + + mock_run_and_stream.side_effect = fake_run + def fake_variable_get(key, default_var=None): if key == "OCR_PDF_SHARE_ROOT": return share_root @@ -126,8 +154,8 @@ def fake_variable_get(key, default_var=None): result, [ { - "original": str(pdf_path.resolve()), - "optimized": str((target_dir / "variable_opti.pdf").resolve()), + "original": str((target_dir / "Originals" / "variable.pdf").resolve()), + "optimized": str((target_dir / "Optimized" / "variable_opti.pdf").resolve()), } ], )