Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "c2pa-python"
version = "0.11.0"
version = "0.11.1"
requires-python = ">=3.10"
description = "Python bindings for the C2PA Content Authenticity Initiative (CAI) library"
readme = { file = "README.md", content-type = "text/markdown" }
Expand Down
96 changes: 50 additions & 46 deletions src/c2pa/c2pa.py
Original file line number Diff line number Diff line change
Expand Up @@ -1725,37 +1725,35 @@ def to_archive(self, stream: Any):
raise C2paError(
self._error_messages['archive_error'].format("Unknown error"))

def sign(
def _sign_internal(
self,
signer: Signer,
format: str,
source: Any,
dest: Any = None) -> Optional[bytes]:
"""Sign the builder's content and write to a destination stream.
source_stream: Stream,
dest_stream: Stream) -> int:
"""Internal signing logic shared between sign() and sign_file() methods,
to use same native calls but expose different API surface.

Args:
format: The MIME type or extension of the content
source: The source stream (any Python stream-like object)
dest: The destination stream (any Python stream-like object)
signer: The signer to use
format: The MIME type or extension of the content
source_stream: The source stream
dest_stream: The destination stream

Returns:
A tuple of (size of C2PA data, optional manifest bytes)
Size of C2PA data

Raises:
C2paError: If there was an error during signing
"""
if not self._builder:
raise C2paError(self._error_messages['closed_error'])

# Convert Python streams to Stream objects
source_stream = Stream(source)
dest_stream = Stream(dest)

try:
format_str = format.encode('utf-8')
manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)()

# c2pa_builder_sign uses streams
result = _lib.c2pa_builder_sign(
self._builder,
format_str,
Expand All @@ -1770,66 +1768,72 @@ def sign(
if error:
raise C2paError(error)

manifest_bytes = None
if manifest_bytes_ptr:
# Convert the manifest bytes to a Python bytes object
size = result
manifest_bytes = bytes(manifest_bytes_ptr[:size])
# Free the manifest bytes pointer if it was allocated
_lib.c2pa_manifest_bytes_free(manifest_bytes_ptr)

return manifest_bytes
return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we going to return the manifest bytes if we throw them away here?

finally:
# Ensure both streams are cleaned up
source_stream.close()
dest_stream.close()

def sign(
self,
signer: Signer,
format: str,
source: Any,
dest: Any = None) -> None:
"""Sign the builder's content and write to a destination stream.

Args:
format: The MIME type or extension of the content
source: The source stream (any Python stream-like object)
dest: The destination stream (any Python stream-like object)
signer: The signer to use

Raises:
C2paError: If there was an error during signing
"""
# Convert Python streams to Stream objects
source_stream = Stream(source)
dest_stream = Stream(dest)

# Use the internal stream-base signing logic
self._sign_internal(signer, format, source_stream, dest_stream)

def sign_file(self,
source_path: Union[str,
Path],
dest_path: Union[str,
Path],
signer: Signer) -> tuple[int,
Optional[bytes]]:
signer: Signer) -> int:
"""Sign a file and write the signed data to an output file.

Args:
source_path: Path to the source file
dest_path: Path to write the signed file to
signer: The signer to use

Returns:
A tuple of (size of C2PA data, optional manifest bytes)
Size of C2PA data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign_file needs to return manifest_bytes so that they can be used for sidecar or uploading remote manifests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of returning the size of the manifest_bytes here?


Raises:
C2paError: If there was an error during signing
"""
if not self._builder:
raise C2paError(self._error_messages['closed_error'])

source_path_str = str(source_path).encode('utf-8')
dest_path_str = str(dest_path).encode('utf-8')
manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)()
# Get the MIME type from the file extension
mime_type = mimetypes.guess_type(str(source_path))[0]
if not mime_type:
raise C2paError.NotSupported(f"Could not determine MIME type for file: {source_path}")

result = _lib.c2pa_builder_sign_file(
self._builder,
source_path_str,
dest_path_str,
signer._signer,
ctypes.byref(manifest_bytes_ptr)
)

if result < 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)

manifest_bytes = None
if manifest_bytes_ptr:
# Convert the manifest bytes to a Python bytes object
size = result
manifest_bytes = bytes(manifest_bytes_ptr[:size])
_lib.c2pa_manifest_bytes_free(manifest_bytes_ptr)
# Open source and destination files
with open(source_path, 'rb') as source_file, open(dest_path, 'wb') as dest_file:
# Convert Python streams to Stream objects
source_stream = Stream(source_file)
dest_stream = Stream(dest_file)

return result, manifest_bytes
# Use the internal stream-base signing logic
return self._sign_internal(signer, mime_type, source_stream, dest_stream)


def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]:
Expand Down
54 changes: 44 additions & 10 deletions tests/test_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,13 @@ def test_remote_sign(self):
builder = Builder(self.manifestDefinition)
builder.set_no_embed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically remote signing would also set_remote_url() If remote fetch is disabled the reader will return an error indicating the RemoteUrl that should be read from. Setting no_embed on its own is mainly there for writing sidecars.

output = io.BytesIO(bytearray())
manifest_data = builder.sign(
self.signer, "image/jpeg", file, output)
result_data = builder.sign(self.signer, "image/jpeg", file, output)

output.seek(0)
reader = Reader("image/jpeg", output, manifest_data)
json_data = reader.json()
self.assertIn("Python Test", json_data)
self.assertNotIn("validation_status", json_data)
# When set_no_embed() is used, no manifest should be embedded in the file
# So reading from the file should fail
with self.assertRaises(Error):
reader = Reader("image/jpeg", output)
output.close()

def test_sign_all_files(self):
Expand Down Expand Up @@ -404,7 +404,7 @@ def test_builder_double_close(self):
# Verify builder is closed
with self.assertRaises(Error):
builder.set_no_embed()

def test_builder_add_ingredient_on_closed_builder(self):
"""Test that exception is raised when trying to add ingredient after close."""
builder = Builder(self.manifestDefinition)
Expand Down Expand Up @@ -709,12 +709,46 @@ def test_builder_set_remote_url_no_embed(self):
output.seek(0)
with self.assertRaises(Error) as e:
Reader("image/jpeg", output)

self.assertIn("http://this_does_not_exist/foo.jpg", e.exception.message)

# Return back to default settings
load_settings(r'{"verify": { "remote_manifest_fetch": true} }')


def test_sign_file(self):
"""Test signing a file using the sign_file method."""
import tempfile
import shutil

# Create a temporary directory for the test
temp_dir = tempfile.mkdtemp()
try:
# Create a temporary output file path
output_path = os.path.join(temp_dir, "signed_output.jpg")

# Use the sign_file method
builder = Builder(self.manifestDefinition)
result = builder.sign_file(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result should be manifest_bytes. Errors should throw.

source_path=self.testPath,
dest_path=output_path,
signer=self.signer
)

# Verify the output file was created
self.assertTrue(os.path.exists(output_path))

# Read the signed file and verify the manifest
with open(output_path, "rb") as file:
reader = Reader("image/jpeg", file)
json_data = reader.json()
self.assertIn("Python Test", json_data)
self.assertNotIn("validation_status", json_data)

finally:
# Clean up the temporary directory
shutil.rmtree(temp_dir)


class TestStream(unittest.TestCase):
def setUp(self):
# Create a temporary file for testing
Expand Down
103 changes: 0 additions & 103 deletions tests/test_unit_tests_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,109 +1068,6 @@ def read_manifest(reader_id):
# Verify all readers completed
self.assertEqual(active_readers, 0, "Not all readers completed")

def test_remote_sign_threaded(self):
"""Test remote signing with multiple threads in parallel"""
output1 = io.BytesIO(bytearray())
output2 = io.BytesIO(bytearray())
sign_errors = []
sign_complete = threading.Event()
manifest_data1 = None
manifest_data2 = None

def remote_sign(output_stream, manifest_def, thread_id):
nonlocal manifest_data1, manifest_data2
try:
with open(self.testPath, "rb") as file:
builder = Builder(manifest_def)
builder.set_no_embed()
manifest_data = builder.sign(
self.signer, "image/jpeg", file, output_stream)
output_stream.seek(0)

# Store manifest data for final verification
if thread_id == 1:
manifest_data1 = manifest_data
else:
manifest_data2 = manifest_data

# Verify the signed file
reader = Reader("image/jpeg", output_stream, manifest_data)
json_data = reader.json()
manifest_store = json.loads(json_data)
active_manifest = manifest_store["manifests"][manifest_store["active_manifest"]]

# Verify the correct manifest was used
if thread_id == 1:
expected_claim_generator = "python_test_1/0.0.1"
expected_author = "Tester One"
else:
expected_claim_generator = "python_test_2/0.0.1"
expected_author = "Tester Two"

self.assertEqual(
active_manifest["claim_generator"],
expected_claim_generator)

# Verify the author is correct
assertions = active_manifest["assertions"]
for assertion in assertions:
if assertion["label"] == "com.unit.test":
author_name = assertion["data"]["author"][0]["name"]
self.assertEqual(author_name, expected_author)
break

except Exception as e:
sign_errors.append(f"Thread {thread_id} error: {str(e)}")
finally:
sign_complete.set()

# Create and start two threads for concurrent remote signing
thread1 = threading.Thread(
target=remote_sign,
args=(output1, self.manifestDefinition_1, 1)
)
thread2 = threading.Thread(
target=remote_sign,
args=(output2, self.manifestDefinition_2, 2)
)

# Start both threads
thread1.start()
thread2.start()

# Wait for both threads to complete
thread1.join()
thread2.join()

# Check for errors
if sign_errors:
self.fail("\n".join(sign_errors))

# Verify the outputs are different before closing
output1.seek(0)
output2.seek(0)
reader1 = Reader("image/jpeg", output1, manifest_data1)
reader2 = Reader("image/jpeg", output2, manifest_data2)

manifest_store1 = json.loads(reader1.json())
manifest_store2 = json.loads(reader2.json())

# Get the active manifests
active_manifest1 = manifest_store1["manifests"][manifest_store1["active_manifest"]]
active_manifest2 = manifest_store2["manifests"][manifest_store2["active_manifest"]]

# Verify the manifests are different
self.assertNotEqual(
active_manifest1["claim_generator"],
active_manifest2["claim_generator"])
self.assertNotEqual(
active_manifest1["title"],
active_manifest2["title"])

# Clean up after verification
output1.close()
output2.close()

def test_archive_sign_threaded(self):
"""Test archive signing with multiple threads in parallel"""
archive1 = io.BytesIO(bytearray())
Expand Down