Skip to content

Commit a7ecd5e

Browse files
committed
Fix command fails when file to optimize has been OCRed.
* Also adds better logging. * Also cleans up notifications. Depends on: - [ ] https://git.temple.edu/tulibraries/charts/-/merge_requests/513
1 parent acbf532 commit a7ecd5e

File tree

2 files changed

+65
-49
lines changed

2 files changed

+65
-49
lines changed

centralized_metadata/optimize_pdf_dag.py

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,6 @@
3232
"{{ dag_run.logical_date }} {{ ti.log_url }}"
3333
),
3434
)
35-
slackpostonsuccess = send_slack_notification(
36-
channel="infra_alerts",
37-
username="airflow",
38-
text=(
39-
":tada: Task succeeded: {{ dag.dag_id }} {{ ti.task_id }} "
40-
"{{ dag_run.logical_date }} {{ ti.log_url }}"
41-
),
42-
)
4335

4436

4537
def _post_to_teams(title, text, theme_color):
@@ -115,33 +107,14 @@ def teamspostonsuccess(context):
115107
)
116108

117109

118-
def _run_failure_callbacks(context):
119-
for callback in (slackpostonfail, teamspostonfail):
120-
try:
121-
callback(context)
122-
except Exception as exc: # pylint: disable=broad-exception-caught
123-
logging.exception(
124-
"Failure callback %r raised an exception: %s", callback, exc
125-
)
126-
127-
128-
def _run_success_callbacks(context):
129-
for callback in (slackpostonsuccess, teamspostonsuccess):
130-
try:
131-
callback(context)
132-
except Exception as exc: # pylint: disable=broad-exception-caught
133-
logging.exception(
134-
"Success callback %r raised an exception: %s", callback, exc
135-
)
136-
137110

138111
DEFAULT_ARGS = {
139112
"owner": "airflow",
140113
"depends_on_past": False,
141114
"start_date": pendulum.datetime(2024, 1, 1, tz="UTC"),
142115
"email_on_failure": False,
143116
"email_on_retry": False,
144-
"on_failure_callback": _run_failure_callbacks,
117+
"on_failure_callback": [slackpostonfail,teamspostonfail],
145118
"retries": 1,
146119
"retry_delay": timedelta(minutes=5),
147120
}
@@ -174,6 +147,34 @@ def _resolve_pdf_directory(context):
174147
return configured_directory.resolve()
175148

176149

150+
def run_and_stream(command, prefix=None):
151+
"""
152+
Run a subprocess command, stream output to logs in real time,
153+
and raise CalledProcessError if the command fails.
154+
"""
155+
with subprocess.Popen(
156+
command,
157+
stdout=subprocess.PIPE,
158+
stderr=subprocess.STDOUT,
159+
text=True,
160+
bufsize=1,
161+
) as process:
162+
if process.stdout is None:
163+
raise RuntimeError("stdout pipe was not created")
164+
165+
for line in process.stdout:
166+
line = line.rstrip()
167+
if prefix:
168+
logging.info("[%s] %s", prefix, line)
169+
else:
170+
logging.info("%s", line)
171+
172+
return_code = process.wait()
173+
174+
if return_code != 0:
175+
raise subprocess.CalledProcessError(return_code, command)
176+
177+
177178
def process_pdfs(**context):
178179
"""Iterate through the directory and run OCRmyPDF with optimize level 1."""
179180
pdf_directory = _resolve_pdf_directory(context)
@@ -189,20 +190,27 @@ def process_pdfs(**context):
189190
return []
190191

191192
processed_files = []
193+
192194
for pdf_file in pdf_files:
193195
output_pdf = pdf_file.with_name(f"{pdf_file.stem}_opti.pdf")
196+
194197
if output_pdf.exists():
195198
logging.info("Removing existing optimized PDF: %s", output_pdf)
196199
output_pdf.unlink()
200+
197201
command = [
198202
"ocrmypdf",
203+
"--skip-text",
199204
"--optimize",
200205
"1",
201206
str(pdf_file),
202207
str(output_pdf),
203208
]
209+
204210
logging.info("Running command: %s", " ".join(command))
205-
subprocess.run(command, check=True) # Raises CalledProcessError on failure.
211+
212+
run_and_stream(command, prefix=pdf_file.name)
213+
206214
processed_files.append(
207215
{"original": str(pdf_file), "optimized": str(output_pdf)}
208216
)
@@ -271,7 +279,7 @@ def move_processed_pdfs(**context):
271279

272280
SUCCESS = EmptyOperator(
273281
task_id='success',
274-
on_success_callback=_run_success_callbacks,
282+
on_success_callback=[teamspostonsuccess],
275283
trigger_rule="none_failed_min_one_success",
276284
dag=DAG,
277285
)

tests/optimize_pdf_dag_test.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ def test_dag_loads(self):
1717
task_ids = [task.task_id for task in DAG.tasks]
1818
self.assertEqual(task_ids, ["run_ocrmypdf", "move_processed_pdfs", "success"])
1919

20-
@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
21-
def test_process_pdfs_runs_command_per_file(self, mock_run):
20+
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
21+
def test_process_pdfs_runs_command_per_file(self, mock_run_and_stream):
2222
"""Touch PDFs and verify we issue an OCR command for each."""
2323
with tempfile.TemporaryDirectory() as tmp_dir:
2424
pdf_one = os.path.realpath(os.path.join(tmp_dir, "file_one.pdf"))
@@ -45,27 +45,33 @@ def test_process_pdfs_runs_command_per_file(self, mock_run):
4545
},
4646
],
4747
)
48-
self.assertEqual(mock_run.call_count, 2)
48+
self.assertEqual(mock_run_and_stream.call_count, 2)
4949

5050
expected_files = [Path(pdf_one), Path(pdf_two)]
51-
for call_args, expected_file in zip(mock_run.call_args_list, expected_files):
51+
for call_args, expected_file in zip(
52+
mock_run_and_stream.call_args_list, expected_files
53+
):
5254
command = call_args.args[0]
53-
self.assertEqual(command[:3], ["ocrmypdf", "--optimize", "1"])
54-
self.assertEqual(Path(command[3]).name, expected_file.name)
55-
self.assertEqual(Path(command[4]).name, f"{expected_file.stem}_opti.pdf")
56-
self.assertTrue(call_args.kwargs.get("check"))
57-
58-
@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
59-
def test_process_pdfs_handles_empty_directory(self, mock_run):
55+
self.assertEqual(
56+
command[:4], ["ocrmypdf", "--skip-text", "--optimize", "1"]
57+
)
58+
self.assertEqual(Path(command[4]).name, expected_file.name)
59+
self.assertEqual(
60+
Path(command[5]).name, f"{expected_file.stem}_opti.pdf"
61+
)
62+
self.assertEqual(call_args.kwargs.get("prefix"), expected_file.name)
63+
64+
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
65+
def test_process_pdfs_handles_empty_directory(self, mock_run_and_stream):
6066
"""Confirm we short-circuit gracefully when no PDFs exist."""
6167
with tempfile.TemporaryDirectory() as tmp_dir:
6268
result = process_pdfs(params={"pdf_directory": tmp_dir})
6369

6470
self.assertEqual(result, [])
65-
mock_run.assert_not_called()
71+
mock_run_and_stream.assert_not_called()
6672

67-
@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
68-
def test_process_pdfs_prefers_dag_run_conf(self, mock_run):
73+
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
74+
def test_process_pdfs_prefers_dag_run_conf(self, mock_run_and_stream):
6975
"""dag_run.conf should override params and defaults."""
7076
with tempfile.TemporaryDirectory() as tmp_dir:
7177
pdf_path = os.path.realpath(os.path.join(tmp_dir, "file.pdf"))
@@ -89,11 +95,13 @@ def test_process_pdfs_prefers_dag_run_conf(self, mock_run):
8995
}
9096
],
9197
)
92-
mock_run.assert_called_once()
98+
mock_run_and_stream.assert_called_once()
9399

94100
@mock.patch("centralized_metadata.optimize_pdf_dag.Variable.get")
95-
@mock.patch("centralized_metadata.optimize_pdf_dag.subprocess.run")
96-
def test_process_pdfs_uses_share_root_and_relative_path(self, mock_run, mock_variable_get):
101+
@mock.patch("centralized_metadata.optimize_pdf_dag.run_and_stream")
102+
def test_process_pdfs_uses_share_root_and_relative_path(
103+
self, mock_run_and_stream, mock_variable_get
104+
):
97105
"""Variable-based root + relative path should resolve to final directory."""
98106
with tempfile.TemporaryDirectory() as tmp_dir:
99107
share_root = os.path.realpath(tmp_dir)
@@ -123,8 +131,8 @@ def fake_variable_get(key, default_var=None):
123131
}
124132
],
125133
)
126-
command = mock_run.call_args.args[0]
127-
self.assertEqual(Path(command[3]).resolve(), pdf_path.resolve())
134+
command = mock_run_and_stream.call_args.args[0]
135+
self.assertEqual(Path(command[4]).resolve(), pdf_path.resolve())
128136

129137
if __name__ == "__main__":
130138
unittest.main()

0 commit comments

Comments
 (0)