Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion pypdf/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
StreamType,
_get_max_pdf_version_header,
deprecation_no_replacement,
logger_error,
logger_warning,
)
from .constants import AnnotationDictionaryAttributes as AA
Expand Down Expand Up @@ -1417,6 +1418,41 @@ def _resolve_links(self) -> None:
continue
new_link.patch_reference(self, new_page)

def remove_page(self, page: Union[int, PageObject, IndirectObject], clean: bool = False) -> None:
"""
Override PdfDocCommon.remove_page to perform extra cleanup when requested.

Calls base implementation and then, if clean=True and the writer was already written to,
rebuilds the internal structure to remove orphan objects leftover from previous writes.
"""
super().remove_page(page, clean=clean)

if clean and getattr(self, "_has_written", False):
try:
self._rebuild_via_io()
except Exception as exc:
logger_warning(f"Failed to rebuild PdfWriter after remove_page with clean=True: {exc}", __name__)
# No re-raise: keep remove_page robust, tests/CI should catch the error


def _rebuild_via_io(self) -> None:
"""
Rebuild the writer by serializing to an in-memory buffer and re-reading.

This removes orphan objects that persist after intermediate writes.
"""
buf = BytesIO()
self.write(buf)
buf.seek(0)

reader = PdfReader(buf)
new_writer = PdfWriter()
new_writer.clone_document_from_reader(reader)

# Replace internal state of this instance with the new clean writer's state
self.__dict__.clear()
self.__dict__.update(new_writer.__dict__)

def write_stream(self, stream: StreamType) -> None:
if hasattr(stream, "mode") and "b" not in stream.mode:
logger_warning(
Expand Down Expand Up @@ -1461,7 +1497,13 @@ def write(self, stream: Union[Path, StrByteType]) -> tuple[bool, IO[Any]]:
stream = FileIO(stream, "wb")
my_file = True

self.write_stream(stream)
try:
self.write_stream(stream)
except Exception as err:
logger_error(f"Failed to write to {stream}", __name__)
raise err
else:
self._has_written = True

if my_file:
stream.close()
Expand Down
55 changes: 55 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,6 +2485,61 @@ def test_compress_identical_objects():
assert len(out2.getvalue()) > len(out3.getvalue())


@pytest.mark.enable_socket
def test_issue_3418_remove_after_writes_leaves_orphans():
"""
Regression test for issue #3418.

Reproduces the original sequence: write after first append, write after second append,
then remove last page + compress. The expectation is that after remove+compress the
size is not larger than the original single-page output. If the bug is present,
the final size may be larger (i.e. objects were not cleaned up).
"""
url = (
"https://github.com/py-pdf/sample-files/blob/main/001-trivial/minimal-document.pdf?raw=true"
)
name = "minimal-document.pdf"
data = BytesIO(get_data_from_url(url, name=name))
reader1 = PdfReader(data)

writer = PdfWriter()

# Append first page and write it out (this is important; it mutates writer state)
writer.append(reader1)
out1 = BytesIO()
writer.write(out1)
size_one_page = out1.tell()

# Append a second (identical) page and write again
# use a fresh reader to match the original issue's steps
reader2 = PdfReader(BytesIO(get_data_from_url(url, name=name)))
writer.append(reader2)
out2 = BytesIO()
writer.write(out2)
size_two_pages = out2.tell()

assert size_two_pages > size_one_page # sanity check

# Now remove the last page and attempt to compress/clean
writer.remove_page(len(writer.pages) - 1, clean=True)
writer.compress_identical_objects(remove_orphans=True, remove_identicals=True)

out3 = BytesIO()
writer.write(out3)
size_after = out3.tell()

# Expected behavior: after removing + compressing, the size should be <= the original single-page PDF.
# This assertion should FAIL on the buggy implementation (showing the regression).
assert size_after <= size_one_page, (
"After remove + compress the PDF should be no larger than the single-page output. "
f"size_one_page={size_one_page}, size_after={size_after}"
)

# And, of course, the output must have one page
result_pdf = PdfReader(out3)
assert len(result_pdf.pages) == 1


def test_set_need_appearances_writer():
"""Minimal test for coverage"""
writer = PdfWriter()
Expand Down
Loading