|
25 | 25 | import shutil |
26 | 26 |
|
27 | 27 | from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version |
28 | | -from c2pa.c2pa import Stream, read_ingredient_file, read_file, sign_file, load_settings, create_signer |
| 28 | +from c2pa.c2pa import Stream, read_ingredient_file, read_file, sign_file, load_settings, create_signer, sign_file_with_callback_signer |
29 | 29 |
|
30 | 30 | # Suppress deprecation warnings |
31 | 31 | warnings.filterwarnings("ignore", category=DeprecationWarning) |
@@ -541,16 +541,16 @@ def test_builder_sign_with_duplicate_ingredient(self): |
541 | 541 | # Verify the first ingredient's title matches what we set |
542 | 542 | first_ingredient = active_manifest["ingredients"][0] |
543 | 543 | self.assertEqual(first_ingredient["title"], "Test Ingredient") |
544 | | - |
| 544 | + |
545 | 545 | # Verify subsequent labels are unique and have a double underscore with a monotonically inc. index |
546 | 546 | second_ingredient = active_manifest["ingredients"][1] |
547 | 547 | self.assertTrue(second_ingredient["label"].endswith("__1")) |
548 | 548 |
|
549 | 549 | third_ingredient = active_manifest["ingredients"][2] |
550 | 550 | self.assertTrue(third_ingredient["label"].endswith("__2")) |
551 | | - |
| 551 | + |
552 | 552 | builder.close() |
553 | | - |
| 553 | + |
554 | 554 | def test_builder_sign_with_ingredient_from_stream(self): |
555 | 555 | """Test Builder class operations with a real file using stream for ingredient.""" |
556 | 556 | # Test creating builder from JSON |
@@ -880,6 +880,80 @@ def sign_callback(data: bytes) -> bytes: |
880 | 880 | # Clean up the temporary directory |
881 | 881 | shutil.rmtree(temp_dir) |
882 | 882 |
|
| 883 | + def test_sign_file_with_callback_signer(self): |
| 884 | + """Test signing a file using the sign_file_with_callback_signer function.""" |
| 885 | + # Create a temporary directory for the test |
| 886 | + temp_dir = tempfile.mkdtemp() |
| 887 | + try: |
| 888 | + # Create a temporary output file path |
| 889 | + output_path = os.path.join(temp_dir, "signed_output_callback.jpg") |
| 890 | + |
| 891 | + # Create a real ES256 signing callback |
| 892 | + def sign_callback(data: bytes) -> bytes: |
| 893 | + """Real ES256 signing callback that creates actual signatures.""" |
| 894 | + # Load the private key from the test fixtures |
| 895 | + with open(os.path.join(self.data_dir, "es256_private.key"), "rb") as key_file: |
| 896 | + private_key_data = key_file.read() |
| 897 | + |
| 898 | + # Load the private key using cryptography |
| 899 | + private_key = serialization.load_pem_private_key( |
| 900 | + private_key_data, |
| 901 | + password=None, |
| 902 | + backend=default_backend() |
| 903 | + ) |
| 904 | + |
| 905 | + # Create the signature using ES256 (ECDSA with SHA-256) |
| 906 | + from cryptography.hazmat.primitives import hashes |
| 907 | + from cryptography.hazmat.primitives.asymmetric import ec |
| 908 | + |
| 909 | + signature = private_key.sign( |
| 910 | + data, |
| 911 | + ec.ECDSA(hashes.SHA256()) |
| 912 | + ) |
| 913 | + |
| 914 | + return signature |
| 915 | + |
| 916 | + # Import the new function |
| 917 | + from c2pa.c2pa import sign_file_with_callback_signer |
| 918 | + |
| 919 | + # Use the sign_file_with_callback_signer function |
| 920 | + manifest_bytes = sign_file_with_callback_signer( |
| 921 | + source_path=self.testPath, |
| 922 | + dest_path=output_path, |
| 923 | + manifest=self.manifestDefinition, |
| 924 | + callback=sign_callback, |
| 925 | + alg=SigningAlg.ES256, |
| 926 | + certs=self.certs.decode('utf-8'), |
| 927 | + tsa_url="http://timestamp.digicert.com" |
| 928 | + ) |
| 929 | + |
| 930 | + # Verify the output file was created |
| 931 | + self.assertTrue(os.path.exists(output_path)) |
| 932 | + |
| 933 | + # Verify the manifest bytes are binary data (not JSON text) |
| 934 | + self.assertIsInstance(manifest_bytes, bytes) |
| 935 | + self.assertGreater(len(manifest_bytes), 0) |
| 936 | + |
| 937 | + # Try to decode as UTF-8 to see if it's text-based (it shouldn't be) |
| 938 | + try: |
| 939 | + manifest_bytes.decode('utf-8') |
| 940 | + # If we get here, it's text-based, which is unexpected |
| 941 | + self.fail("Manifest bytes should be binary data, not UTF-8 text") |
| 942 | + except UnicodeDecodeError: |
| 943 | + # This is expected - manifest bytes should be binary |
| 944 | + pass |
| 945 | + |
| 946 | + # Read the signed file and verify the manifest contains expected content |
| 947 | + with open(output_path, "rb") as file: |
| 948 | + reader = Reader("image/jpeg", file) |
| 949 | + file_manifest_json = reader.json() |
| 950 | + self.assertIn("Python Test", file_manifest_json) |
| 951 | + self.assertNotIn("validation_status", file_manifest_json) |
| 952 | + |
| 953 | + finally: |
| 954 | + # Clean up the temporary directory |
| 955 | + shutil.rmtree(temp_dir) |
| 956 | + |
883 | 957 | class TestStream(unittest.TestCase): |
884 | 958 | def setUp(self): |
885 | 959 | # Create a temporary file for testing |
|
0 commit comments