|
21 | 21 | from cryptography.hazmat.primitives import hashes, serialization |
22 | 22 | from cryptography.hazmat.primitives.asymmetric import padding |
23 | 23 | from cryptography.hazmat.backends import default_backend |
| 24 | +import tempfile |
| 25 | +import shutil |
24 | 26 |
|
25 | 27 | from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version |
26 | 28 | from c2pa.c2pa import Stream, read_ingredient_file, read_file, sign_file, load_settings, create_signer |
@@ -720,9 +722,6 @@ def test_builder_set_remote_url_no_embed(self): |
720 | 722 |
|
721 | 723 | def test_sign_file(self): |
722 | 724 | """Test signing a file using the sign_file method.""" |
723 | | - import tempfile |
724 | | - import shutil |
725 | | - |
726 | 725 | # Create a temporary directory for the test |
727 | 726 | temp_dir = tempfile.mkdtemp() |
728 | 727 | try: |
@@ -753,9 +752,6 @@ def test_sign_file(self): |
753 | 752 |
|
754 | 753 | def test_sign_file_callback_signer(self): |
755 | 754 | """Test signing a file using the sign_file method.""" |
756 | | - import tempfile |
757 | | - import shutil |
758 | | - |
759 | 755 | # Create a temporary directory for the test |
760 | 756 | temp_dir = tempfile.mkdtemp() |
761 | 757 | try: |
@@ -819,6 +815,70 @@ def sign_callback(data: bytes) -> bytes: |
819 | 815 | # Clean up the temporary directory |
820 | 816 | shutil.rmtree(temp_dir) |
821 | 817 |
|
| 818 | + def test_sign_file_callback_signer_from_callback(self): |
| 819 | + """Test signing a file using the sign_file method with Signer.from_callback.""" |
| 820 | + # Create a temporary directory for the test |
| 821 | + temp_dir = tempfile.mkdtemp() |
| 822 | + try: |
| 823 | + # Create a temporary output file path |
| 824 | + output_path = os.path.join(temp_dir, "signed_output_from_callback.jpg") |
| 825 | + |
| 826 | + # Use the sign_file method |
| 827 | + builder = Builder(self.manifestDefinition) |
| 828 | + |
| 829 | + # Create a real ES256 signing callback |
| 830 | + def sign_callback(data: bytes) -> bytes: |
| 831 | + """Real ES256 signing callback that creates actual signatures.""" |
| 832 | + # Load the private key from the test fixtures |
| 833 | + with open(os.path.join(self.data_dir, "es256_private.key"), "rb") as key_file: |
| 834 | + private_key_data = key_file.read() |
| 835 | + |
| 836 | + # Load the private key using cryptography |
| 837 | + private_key = serialization.load_pem_private_key( |
| 838 | + private_key_data, |
| 839 | + password=None, |
| 840 | + backend=default_backend() |
| 841 | + ) |
| 842 | + |
| 843 | + # Create the signature using ES256 (ECDSA with SHA-256) |
| 844 | + # For ECDSA, we use the signature_algorithm_constructor |
| 845 | + from cryptography.hazmat.primitives import hashes |
| 846 | + from cryptography.hazmat.primitives.asymmetric import ec |
| 847 | + |
| 848 | + signature = private_key.sign( |
| 849 | + data, |
| 850 | + ec.ECDSA(hashes.SHA256()) |
| 851 | + ) |
| 852 | + |
| 853 | + return signature |
| 854 | + |
| 855 | + # Create signer with callback using Signer.from_callback |
| 856 | + signer = Signer.from_callback( |
| 857 | + callback=sign_callback, |
| 858 | + alg=SigningAlg.ES256, |
| 859 | + certs=self.certs.decode('utf-8'), |
| 860 | + tsa_url="http://timestamp.digicert.com" |
| 861 | + ) |
| 862 | + |
| 863 | + result = builder.sign_file( |
| 864 | + source_path=self.testPath, |
| 865 | + dest_path=output_path, |
| 866 | + signer=signer |
| 867 | + ) |
| 868 | + |
| 869 | + # Verify the output file was created |
| 870 | + self.assertTrue(os.path.exists(output_path)) |
| 871 | + |
| 872 | + # Read the signed file and verify the manifest |
| 873 | + with open(output_path, "rb") as file: |
| 874 | + reader = Reader("image/jpeg", file) |
| 875 | + json_data = reader.json() |
| 876 | + self.assertIn("Python Test", json_data) |
| 877 | + self.assertNotIn("validation_status", json_data) |
| 878 | + |
| 879 | + finally: |
| 880 | + # Clean up the temporary directory |
| 881 | + shutil.rmtree(temp_dir) |
822 | 882 |
|
823 | 883 | class TestStream(unittest.TestCase): |
824 | 884 | def setUp(self): |
|
0 commit comments