diff --git a/pyproject.toml b/pyproject.toml index e2149481..afa30d5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "c2pa-python" -version = "0.11.0" +version = "0.11.1" requires-python = ">=3.10" description = "Python bindings for the C2PA Content Authenticity Initiative (CAI) library" readme = { file = "README.md", content-type = "text/markdown" } diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index d14288ac..40c15316 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -1725,22 +1725,23 @@ def to_archive(self, stream: Any): raise C2paError( self._error_messages['archive_error'].format("Unknown error")) - def sign( + def _sign_internal( self, signer: Signer, format: str, - source: Any, - dest: Any = None) -> Optional[bytes]: - """Sign the builder's content and write to a destination stream. + source_stream: Stream, + dest_stream: Stream) -> int: + """Internal signing logic shared between sign() and sign_file() methods, + to use same native calls but expose different API surface. Args: - format: The MIME type or extension of the content - source: The source stream (any Python stream-like object) - dest: The destination stream (any Python stream-like object) signer: The signer to use + format: The MIME type or extension of the content + source_stream: The source stream + dest_stream: The destination stream Returns: - A tuple of (size of C2PA data, optional manifest bytes) + Size of C2PA data Raises: C2paError: If there was an error during signing @@ -1748,14 +1749,11 @@ def sign( if not self._builder: raise C2paError(self._error_messages['closed_error']) - # Convert Python streams to Stream objects - source_stream = Stream(source) - dest_stream = Stream(dest) - try: format_str = format.encode('utf-8') manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() + # c2pa_builder_sign uses streams result = _lib.c2pa_builder_sign( self._builder, format_str, @@ -1770,66 +1768,72 @@ def sign( if error: raise C2paError(error) - manifest_bytes = None if manifest_bytes_ptr: - # Convert the manifest bytes to a Python bytes object - size = result - manifest_bytes = bytes(manifest_bytes_ptr[:size]) + # Free the manifest bytes pointer if it was allocated _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) - return manifest_bytes + return result finally: # Ensure both streams are cleaned up source_stream.close() dest_stream.close() + def sign( + self, + signer: Signer, + format: str, + source: Any, + dest: Any = None) -> None: + """Sign the builder's content and write to a destination stream. + + Args: + format: The MIME type or extension of the content + source: The source stream (any Python stream-like object) + dest: The destination stream (any Python stream-like object) + signer: The signer to use + + Raises: + C2paError: If there was an error during signing + """ + # Convert Python streams to Stream objects + source_stream = Stream(source) + dest_stream = Stream(dest) + + # Use the internal stream-base signing logic + self._sign_internal(signer, format, source_stream, dest_stream) + def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], - signer: Signer) -> tuple[int, - Optional[bytes]]: + signer: Signer) -> int: """Sign a file and write the signed data to an output file. Args: source_path: Path to the source file dest_path: Path to write the signed file to + signer: The signer to use Returns: - A tuple of (size of C2PA data, optional manifest bytes) + Size of C2PA data Raises: C2paError: If there was an error during signing """ - if not self._builder: - raise C2paError(self._error_messages['closed_error']) - - source_path_str = str(source_path).encode('utf-8') - dest_path_str = str(dest_path).encode('utf-8') - manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() + # Get the MIME type from the file extension + mime_type = mimetypes.guess_type(str(source_path))[0] + if not mime_type: + raise C2paError.NotSupported(f"Could not determine MIME type for file: {source_path}") - result = _lib.c2pa_builder_sign_file( - self._builder, - source_path_str, - dest_path_str, - signer._signer, - ctypes.byref(manifest_bytes_ptr) - ) - - if result < 0: - error = _parse_operation_result_for_error(_lib.c2pa_error()) - if error: - raise C2paError(error) - - manifest_bytes = None - if manifest_bytes_ptr: - # Convert the manifest bytes to a Python bytes object - size = result - manifest_bytes = bytes(manifest_bytes_ptr[:size]) - _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) + # Open source and destination files + with open(source_path, 'rb') as source_file, open(dest_path, 'wb') as dest_file: + # Convert Python streams to Stream objects + source_stream = Stream(source_file) + dest_stream = Stream(dest_file) - return result, manifest_bytes + # Use the internal stream-base signing logic + return self._sign_internal(signer, mime_type, source_stream, dest_stream) def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]: diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index 46247088..a2a07870 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -322,13 +322,13 @@ def test_remote_sign(self): builder = Builder(self.manifestDefinition) builder.set_no_embed() output = io.BytesIO(bytearray()) - manifest_data = builder.sign( - self.signer, "image/jpeg", file, output) + result_data = builder.sign(self.signer, "image/jpeg", file, output) + output.seek(0) - reader = Reader("image/jpeg", output, manifest_data) - json_data = reader.json() - self.assertIn("Python Test", json_data) - self.assertNotIn("validation_status", json_data) + # When set_no_embed() is used, no manifest should be embedded in the file + # So reading from the file should fail + with self.assertRaises(Error): + reader = Reader("image/jpeg", output) output.close() def test_sign_all_files(self): @@ -404,7 +404,7 @@ def test_builder_double_close(self): # Verify builder is closed with self.assertRaises(Error): builder.set_no_embed() - + def test_builder_add_ingredient_on_closed_builder(self): """Test that exception is raised when trying to add ingredient after close.""" builder = Builder(self.manifestDefinition) @@ -709,12 +709,46 @@ def test_builder_set_remote_url_no_embed(self): output.seek(0) with self.assertRaises(Error) as e: Reader("image/jpeg", output) - + self.assertIn("http://this_does_not_exist/foo.jpg", e.exception.message) - + # Return back to default settings load_settings(r'{"verify": { "remote_manifest_fetch": true} }') - + + def test_sign_file(self): + """Test signing a file using the sign_file method.""" + import tempfile + import shutil + + # Create a temporary directory for the test + temp_dir = tempfile.mkdtemp() + try: + # Create a temporary output file path + output_path = os.path.join(temp_dir, "signed_output.jpg") + + # Use the sign_file method + builder = Builder(self.manifestDefinition) + result = builder.sign_file( + source_path=self.testPath, + dest_path=output_path, + signer=self.signer + ) + + # Verify the output file was created + self.assertTrue(os.path.exists(output_path)) + + # Read the signed file and verify the manifest + with open(output_path, "rb") as file: + reader = Reader("image/jpeg", file) + json_data = reader.json() + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + finally: + # Clean up the temporary directory + shutil.rmtree(temp_dir) + + class TestStream(unittest.TestCase): def setUp(self): # Create a temporary file for testing diff --git a/tests/test_unit_tests_threaded.py b/tests/test_unit_tests_threaded.py index a9821dcb..f1b61331 100644 --- a/tests/test_unit_tests_threaded.py +++ b/tests/test_unit_tests_threaded.py @@ -1068,109 +1068,6 @@ def read_manifest(reader_id): # Verify all readers completed self.assertEqual(active_readers, 0, "Not all readers completed") - def test_remote_sign_threaded(self): - """Test remote signing with multiple threads in parallel""" - output1 = io.BytesIO(bytearray()) - output2 = io.BytesIO(bytearray()) - sign_errors = [] - sign_complete = threading.Event() - manifest_data1 = None - manifest_data2 = None - - def remote_sign(output_stream, manifest_def, thread_id): - nonlocal manifest_data1, manifest_data2 - try: - with open(self.testPath, "rb") as file: - builder = Builder(manifest_def) - builder.set_no_embed() - manifest_data = builder.sign( - self.signer, "image/jpeg", file, output_stream) - output_stream.seek(0) - - # Store manifest data for final verification - if thread_id == 1: - manifest_data1 = manifest_data - else: - manifest_data2 = manifest_data - - # Verify the signed file - reader = Reader("image/jpeg", output_stream, manifest_data) - json_data = reader.json() - manifest_store = json.loads(json_data) - active_manifest = manifest_store["manifests"][manifest_store["active_manifest"]] - - # Verify the correct manifest was used - if thread_id == 1: - expected_claim_generator = "python_test_1/0.0.1" - expected_author = "Tester One" - else: - expected_claim_generator = "python_test_2/0.0.1" - expected_author = "Tester Two" - - self.assertEqual( - active_manifest["claim_generator"], - expected_claim_generator) - - # Verify the author is correct - assertions = active_manifest["assertions"] - for assertion in assertions: - if assertion["label"] == "com.unit.test": - author_name = assertion["data"]["author"][0]["name"] - self.assertEqual(author_name, expected_author) - break - - except Exception as e: - sign_errors.append(f"Thread {thread_id} error: {str(e)}") - finally: - sign_complete.set() - - # Create and start two threads for concurrent remote signing - thread1 = threading.Thread( - target=remote_sign, - args=(output1, self.manifestDefinition_1, 1) - ) - thread2 = threading.Thread( - target=remote_sign, - args=(output2, self.manifestDefinition_2, 2) - ) - - # Start both threads - thread1.start() - thread2.start() - - # Wait for both threads to complete - thread1.join() - thread2.join() - - # Check for errors - if sign_errors: - self.fail("\n".join(sign_errors)) - - # Verify the outputs are different before closing - output1.seek(0) - output2.seek(0) - reader1 = Reader("image/jpeg", output1, manifest_data1) - reader2 = Reader("image/jpeg", output2, manifest_data2) - - manifest_store1 = json.loads(reader1.json()) - manifest_store2 = json.loads(reader2.json()) - - # Get the active manifests - active_manifest1 = manifest_store1["manifests"][manifest_store1["active_manifest"]] - active_manifest2 = manifest_store2["manifests"][manifest_store2["active_manifest"]] - - # Verify the manifests are different - self.assertNotEqual( - active_manifest1["claim_generator"], - active_manifest2["claim_generator"]) - self.assertNotEqual( - active_manifest1["title"], - active_manifest2["title"]) - - # Clean up after verification - output1.close() - output2.close() - def test_archive_sign_threaded(self): """Test archive signing with multiple threads in parallel""" archive1 = io.BytesIO(bytearray())