Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 49 additions & 38 deletions src/c2pa/c2pa.py
Original file line number Diff line number Diff line change
Expand Up @@ -1725,19 +1725,20 @@ def to_archive(self, stream: Any):
raise C2paError(
self._error_messages['archive_error'].format("Unknown error"))

def sign(
def _sign_internal(
self,
signer: Signer,
format: str,
source: Any,
dest: Any = None) -> Optional[bytes]:
"""Sign the builder's content and write to a destination stream.
source_stream: Stream,
dest_stream: Stream) -> tuple[int, Optional[bytes]]:
"""Core signing logic shared between sign() and sign_file() methods,
to use same native calls but expose different API surface.

Args:
format: The MIME type or extension of the content
source: The source stream (any Python stream-like object)
dest: The destination stream (any Python stream-like object)
signer: The signer to use
format: The MIME type or extension of the content
source_stream: The source stream
dest_stream: The destination stream

Returns:
A tuple of (size of C2PA data, optional manifest bytes)
Expand All @@ -1748,10 +1749,6 @@ def sign(
if not self._builder:
raise C2paError(self._error_messages['closed_error'])

# Convert Python streams to Stream objects
source_stream = Stream(source)
dest_stream = Stream(dest)

try:
format_str = format.encode('utf-8')
manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)()
Expand All @@ -1777,12 +1774,40 @@ def sign(
manifest_bytes = bytes(manifest_bytes_ptr[:size])
_lib.c2pa_manifest_bytes_free(manifest_bytes_ptr)

return manifest_bytes
return result, manifest_bytes
finally:
# Ensure both streams are cleaned up
source_stream.close()
dest_stream.close()

def sign(
self,
signer: Signer,
format: str,
source: Any,
dest: Any = None) -> Optional[bytes]:
"""Sign the builder's content and write to a destination stream.

Args:
format: The MIME type or extension of the content
source: The source stream (any Python stream-like object)
dest: The destination stream (any Python stream-like object)
signer: The signer to use

Returns:
The manifest bytes if available, None otherwise

Raises:
C2paError: If there was an error during signing
"""
# Convert Python streams to Stream objects
source_stream = Stream(source)
dest_stream = Stream(dest)

# Use the core signing logic
_, manifest_bytes = self._sign_internal(signer, format, source_stream, dest_stream)
return manifest_bytes

def sign_file(self,
source_path: Union[str,
Path],
Expand All @@ -1795,41 +1820,27 @@ def sign_file(self,
Args:
source_path: Path to the source file
dest_path: Path to write the signed file to
signer: The signer to use

Returns:
A tuple of (size of C2PA data, optional manifest bytes)

Raises:
C2paError: If there was an error during signing
"""
if not self._builder:
raise C2paError(self._error_messages['closed_error'])

source_path_str = str(source_path).encode('utf-8')
dest_path_str = str(dest_path).encode('utf-8')
manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)()

result = _lib.c2pa_builder_sign_file(
self._builder,
source_path_str,
dest_path_str,
signer._signer,
ctypes.byref(manifest_bytes_ptr)
)
# Get the MIME type from the file extension
mime_type = mimetypes.guess_type(str(source_path))[0]
if not mime_type:
raise C2paError.NotSupported(f"Could not determine MIME type for file: {source_path}")

if result < 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)

manifest_bytes = None
if manifest_bytes_ptr:
# Convert the manifest bytes to a Python bytes object
size = result
manifest_bytes = bytes(manifest_bytes_ptr[:size])
_lib.c2pa_manifest_bytes_free(manifest_bytes_ptr)
# Open source and destination files
with open(source_path, 'rb') as source_file, open(dest_path, 'wb') as dest_file:
# Convert Python streams to Stream objects
source_stream = Stream(source_file)
dest_stream = Stream(dest_file)

return result, manifest_bytes
# Use the core signing logic
return self._sign_internal(signer, mime_type, source_stream, dest_stream)


def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]:
Expand Down
42 changes: 38 additions & 4 deletions tests/test_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def test_builder_double_close(self):
# Verify builder is closed
with self.assertRaises(Error):
builder.set_no_embed()

def test_builder_add_ingredient_on_closed_builder(self):
"""Test that exception is raised when trying to add ingredient after close."""
builder = Builder(self.manifestDefinition)
Expand Down Expand Up @@ -709,12 +709,46 @@ def test_builder_set_remote_url_no_embed(self):
output.seek(0)
with self.assertRaises(Error) as e:
Reader("image/jpeg", output)

self.assertIn("http://this_does_not_exist/foo.jpg", e.exception.message)

# Return back to default settings
load_settings(r'{"verify": { "remote_manifest_fetch": true} }')


def test_sign_file(self):
"""Test signing a file using the sign_file method."""
import tempfile
import shutil

# Create a temporary directory for the test
temp_dir = tempfile.mkdtemp()
try:
# Create a temporary output file path
output_path = os.path.join(temp_dir, "signed_output.jpg")

# Use the sign_file method
builder = Builder(self.manifestDefinition)
result, manifest_bytes = builder.sign_file(
source_path=self.testPath,
dest_path=output_path,
signer=self.signer
)

# Verify the output file was created
self.assertTrue(os.path.exists(output_path))

# Read the signed file and verify the manifest
with open(output_path, "rb") as file:
reader = Reader("image/jpeg", file)
json_data = reader.json()
self.assertIn("Python Test", json_data)
self.assertNotIn("validation_status", json_data)

finally:
# Clean up the temporary directory
shutil.rmtree(temp_dir)


class TestStream(unittest.TestCase):
def setUp(self):
# Create a temporary file for testing
Expand Down