diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index d0505b4..e9514ed 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -1124,7 +1124,7 @@ def test_builder_sign_with_setting_no_thumbnail_and_ingredient(self): builder.close() - # Settings are global, so we reset to the default "true" here + # Settings are thread-local, so we reset to the default "true" here load_settings('{"builder": { "thumbnail": {"enabled": true}}}') def test_builder_sign_with_duplicate_ingredient(self): diff --git a/tests/test_unit_tests_threaded.py b/tests/test_unit_tests_threaded.py index 3d3b6f1..dbe0f13 100644 --- a/tests/test_unit_tests_threaded.py +++ b/tests/test_unit_tests_threaded.py @@ -22,7 +22,7 @@ import random from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version -from c2pa.c2pa import Stream +from c2pa.c2pa import Stream, load_settings PROJECT_PATH = os.getcwd() FIXTURES_FOLDER = os.path.join(os.path.dirname(__file__), "fixtures") @@ -62,6 +62,41 @@ def read_metadata(): thread1.join() thread2.join() + def test_stream_read_async(self): + """Test reading C2PA metadata from a file using async tasks""" + async def read_metadata_async(): + with open(self.testPath, "rb") as file: + reader = Reader("image/jpeg", file) + json_data = reader.json() + self.assertIn("C.jpg", json_data) + return json_data + + async def run_async_tests(): + # Create multiple async tasks + tasks = [] + num_tasks = 2 + for i in range(num_tasks): + task = asyncio.create_task(read_metadata_async()) + tasks.append(task) + + # Wait for all tasks to complete and collect results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + errors = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + errors.append(f"Async task {i} failed with exception: {str(result)}") + elif result is None: # No result indicates an error + errors.append(f"Async task {i} returned None") + + # If any errors occurred, fail the test with all error messages + if errors: + self.fail("\n".join(errors)) + + # Run the async tests + asyncio.run(run_async_tests()) + def test_stream_read_and_parse(self): def read_and_parse(): with open(self.testPath, "rb") as file: @@ -273,6 +308,33 @@ def setUp(self): ] } + # Define a V2 manifest as a dictionary + self.manifestDefinitionV2_1 = { + "claim_generator": "python_test", + "claim_generator_info": [{ + "name": "python_test", + "version": "0.0.1", + }], + # claim version 2 is the default + # "claim_version": 2, + "format": "image/jpeg", + "title": "Python Test Image V2", + "ingredients": [], + "assertions": [ + { + "label": "c2pa.actions", + "data": { + "actions": [ + { + "action": "c2pa.created", + "digitalSourceType": "http://cv.iptc.org/newscodes/digitalsourcetype/digitalCreation" + } + ] + } + } + ] + } + def test_sign_all_files(self): """Test signing all files in both fixtures directories using a thread pool""" signing_dir = os.path.join(self.data_dir, "files-for-signing-tests") @@ -1837,5 +1899,443 @@ def thread_work(thread_id): other_manifest["active_manifest"], f"Thread {thread_id} and {other_thread_id} share the same active manifest ID") + def test_builder_sign_with_setting_no_thumbnail_and_ingredient(self): + """Test Builder class operations with thumbnail disabled and ingredient added using multiple threads.""" + # Thread synchronization + thread_results = {} + completed_threads = 0 + thread_lock = threading.Lock() + settings_lock = threading.Lock() # Lock for settings changes + + def thread_work(thread_id): + nonlocal completed_threads + try: + # Create a new builder for this thread + builder = Builder.from_json(self.manifestDefinition) + assert builder._builder is not None + + # Thread-safe settings change (settings are thread-local) + with settings_lock: + # The following removes the manifest's thumbnail + load_settings('{"builder": { "thumbnail": {"enabled": false}}}') + + # Test adding ingredient + ingredient_json = f'{{ "title": "Test Ingredient Thread {thread_id}" }}' + with open(self.testPath3, 'rb') as f: + builder.add_ingredient(ingredient_json, "image/jpeg", f) + + with open(self.testPath2, "rb") as file: + output = io.BytesIO(bytearray()) + builder.sign(self.signer, "image/jpeg", file, output) + output.seek(0) + reader = Reader("image/jpeg", output) + json_data = reader.json() + manifest_data = json.loads(json_data) + + # Store results for verification + with thread_lock: + thread_results[thread_id] = { + 'manifest': manifest_data, + 'thread_id': thread_id + } + + builder.close() + + except Exception as e: + with thread_lock: + thread_results[thread_id] = { + 'error': str(e), + 'thread_id': thread_id + } + finally: + with thread_lock: + completed_threads += 1 + + # Create and start multiple threads + threads = [] + num_threads = 3 + for i in range(1, num_threads + 1): + thread = threading.Thread(target=thread_work, args=(i,)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify all threads completed + self.assertEqual(completed_threads, num_threads, f"All {num_threads} threads should have completed") + self.assertEqual(len(thread_results), num_threads, f"Should have results from all {num_threads} threads") + + # Verify results for each thread + for thread_id in range(1, num_threads + 1): + result = thread_results[thread_id] + + # Check if thread encountered an error + if 'error' in result: + self.fail(f"Thread {thread_id} failed with error: {result['error']}") + + manifest_data = result['manifest'] + + # Verify active manifest exists + self.assertIn("active_manifest", manifest_data) + active_manifest_id = manifest_data["active_manifest"] + + # Verify active manifest object exists + self.assertIn("manifests", manifest_data) + self.assertIn(active_manifest_id, manifest_data["manifests"]) + active_manifest = manifest_data["manifests"][active_manifest_id] + + # There should be no thumbnail anymore here + self.assertNotIn("thumbnail", active_manifest) + + # Verify ingredients array exists in active manifest + self.assertIn("ingredients", active_manifest) + self.assertIsInstance(active_manifest["ingredients"], list) + self.assertTrue(len(active_manifest["ingredients"]) > 0) + + # Verify the first ingredient's title matches what we set for this thread + first_ingredient = active_manifest["ingredients"][0] + expected_title = f"Test Ingredient Thread {thread_id}" + self.assertEqual(first_ingredient["title"], expected_title) + self.assertNotIn("thumbnail", first_ingredient) + + # Settings are thread-local, so we reset to the default "true" here + with settings_lock: + load_settings('{"builder": { "thumbnail": {"enabled": true}}}') + + def test_builder_sign_with_setting_no_thumbnail_and_ingredient_async(self): + """Test Builder class operations with thumbnail disabled and ingredient added using async tasks.""" + async def async_thread_work(task_id): + try: + # Create a new builder for this task + builder = Builder.from_json(self.manifestDefinition) + assert builder._builder is not None + + # The following removes the manifest's thumbnail + load_settings('{"builder": { "thumbnail": {"enabled": false}}}') + + # Test adding ingredient + ingredient_json = f'{{ "title": "Test Ingredient Async Task {task_id}" }}' + with open(self.testPath3, 'rb') as f: + builder.add_ingredient(ingredient_json, "image/jpeg", f) + + with open(self.testPath2, "rb") as file: + output = io.BytesIO(bytearray()) + builder.sign(self.signer, "image/jpeg", file, output) + output.seek(0) + reader = Reader("image/jpeg", output) + json_data = reader.json() + manifest_data = json.loads(json_data) + + # Verify active manifest exists + self.assertIn("active_manifest", manifest_data) + active_manifest_id = manifest_data["active_manifest"] + + # Verify active manifest object exists + self.assertIn("manifests", manifest_data) + self.assertIn(active_manifest_id, manifest_data["manifests"]) + active_manifest = manifest_data["manifests"][active_manifest_id] + + # There should be no thumbnail anymore here + self.assertNotIn("thumbnail", active_manifest) + + # Verify ingredients array exists in active manifest + self.assertIn("ingredients", active_manifest) + self.assertIsInstance(active_manifest["ingredients"], list) + self.assertTrue(len(active_manifest["ingredients"]) > 0) + + # Verify the first ingredient's title matches what we set for this task + first_ingredient = active_manifest["ingredients"][0] + expected_title = f"Test Ingredient Async Task {task_id}" + self.assertEqual(first_ingredient["title"], expected_title) + self.assertNotIn("thumbnail", first_ingredient) + + builder.close() + return None # Success case + + except Exception as e: + return f"Async task {task_id} error: {str(e)}" + + async def run_async_tests(): + # Create multiple async tasks + tasks = [] + num_tasks = 3 + for i in range(1, num_tasks + 1): + task = asyncio.create_task(async_thread_work(i)) + tasks.append(task) + + # Wait for all tasks to complete and collect results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + errors = [] + for i, result in enumerate(results, 1): + if isinstance(result, Exception): + errors.append(f"Async task {i} failed with exception: {str(result)}") + elif result: # Non-None result indicates an error + errors.append(result) + + # If any errors occurred, fail the test with all error messages + if errors: + self.fail("\n".join(errors)) + + # Run the async tests + asyncio.run(run_async_tests()) + + # Settings are thread-local, so we reset to the default "true" here + load_settings('{"builder": { "thumbnail": {"enabled": true}}}') + + def test_streams_sign_with_thumbnail_resource(self): + """Test Builder class operations with thumbnail resource using multiple threads.""" + # Thread synchronization + thread_results = {} + completed_threads = 0 + thread_lock = threading.Lock() + + def thread_work(thread_id): + nonlocal completed_threads + try: + with open(self.testPath2, "rb") as file: + builder = Builder(self.manifestDefinitionV2_1) + output = io.BytesIO(bytearray()) + + with open(self.testPath2, "rb") as thumbnail_file: + builder.add_resource("thumbnail", thumbnail_file) + + builder.sign(self.signer, "image/jpeg", file, output) + output.seek(0) + reader = Reader("image/jpeg", output) + json_data = reader.json() + + # Store results for verification + with thread_lock: + thread_results[thread_id] = { + 'json_data': json_data, + 'thread_id': thread_id + } + + # Verify the JSON data contains expected content + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + output.close() + + except Exception as e: + with thread_lock: + thread_results[thread_id] = { + 'error': str(e), + 'thread_id': thread_id + } + finally: + with thread_lock: + completed_threads += 1 + + # Create and start multiple threads + threads = [] + num_threads = 3 + for i in range(1, num_threads + 1): + thread = threading.Thread(target=thread_work, args=(i,)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify all threads completed + self.assertEqual(completed_threads, num_threads, f"All {num_threads} threads should have completed") + self.assertEqual(len(thread_results), num_threads, f"Should have results from all {num_threads} threads") + + # Verify results for each thread + for thread_id in range(1, num_threads + 1): + result = thread_results[thread_id] + + # Check if thread encountered an error + if 'error' in result: + self.fail(f"Thread {thread_id} failed with error: {result['error']}") + + json_data = result['json_data'] + + # Verify the JSON data contains expected content + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + def test_streams_sign_with_thumbnail_resource_async(self): + """Test Builder class operations with thumbnail resource using async tasks.""" + async def async_thread_work(task_id): + try: + with open(self.testPath2, "rb") as file: + builder = Builder(self.manifestDefinitionV2_1) + output = io.BytesIO(bytearray()) + + with open(self.testPath2, "rb") as thumbnail_file: + builder.add_resource("thumbnail", thumbnail_file) + + builder.sign(self.signer, "image/jpeg", file, output) + output.seek(0) + reader = Reader("image/jpeg", output) + json_data = reader.json() + + # Verify the JSON data contains expected content + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + + output.close() + return None # Success case + + except Exception as e: + return f"Async task {task_id} error: {str(e)}" + + async def run_async_tests(): + # Create multiple async tasks + tasks = [] + num_tasks = 3 + for i in range(1, num_tasks + 1): + task = asyncio.create_task(async_thread_work(i)) + tasks.append(task) + + # Wait for all tasks to complete and collect results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + errors = [] + for i, result in enumerate(results, 1): + if isinstance(result, Exception): + errors.append(f"Async task {i} failed with exception: {str(result)}") + elif result: # Non-None result indicates an error + errors.append(result) + + # If any errors occurred, fail the test with all error messages + if errors: + self.fail("\n".join(errors)) + + # Run the async tests + asyncio.run(run_async_tests()) + + def test_remote_sign_using_returned_bytes_V2(self): + """Test Builder class operations with remote signing using returned bytes and multiple threads.""" + # Thread synchronization + thread_results = {} + completed_threads = 0 + thread_lock = threading.Lock() + + def thread_work(thread_id): + nonlocal completed_threads + try: + with open(self.testPath, "rb") as file: + builder = Builder(self.manifestDefinitionV2_1) + builder.set_no_embed() + with io.BytesIO() as output_buffer: + manifest_data = builder.sign( + self.signer, "image/jpeg", file, output_buffer) + output_buffer.seek(0) + read_buffer = io.BytesIO(output_buffer.getvalue()) + + with Reader("image/jpeg", read_buffer, manifest_data) as reader: + manifest_data = reader.json() + + # Store results for verification + with thread_lock: + thread_results[thread_id] = { + 'manifest_data': manifest_data, + 'thread_id': thread_id + } + + # Verify the manifest data contains expected content + self.assertIn("Python Test", manifest_data) + self.assertNotIn("validation_status", manifest_data) + + except Exception as e: + with thread_lock: + thread_results[thread_id] = { + 'error': str(e), + 'thread_id': thread_id + } + finally: + with thread_lock: + completed_threads += 1 + + # Create and start multiple threads + threads = [] + num_threads = 3 + for i in range(1, num_threads + 1): + thread = threading.Thread(target=thread_work, args=(i,)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify all threads completed + self.assertEqual(completed_threads, num_threads, f"All {num_threads} threads should have completed") + self.assertEqual(len(thread_results), num_threads, f"Should have results from all {num_threads} threads") + + # Verify results for each thread + for thread_id in range(1, num_threads + 1): + result = thread_results[thread_id] + + # Check if thread encountered an error + if 'error' in result: + self.fail(f"Thread {thread_id} failed with error: {result['error']}") + + manifest_data = result['manifest_data'] + + # Verify the manifest data contains expected content + self.assertIn("Python Test", manifest_data) + self.assertNotIn("validation_status", manifest_data) + + def test_remote_sign_using_returned_bytes_V2_async(self): + """Test Builder class operations with remote signing using returned bytes and async tasks.""" + async def async_thread_work(task_id): + try: + with open(self.testPath, "rb") as file: + builder = Builder(self.manifestDefinitionV2_1) + builder.set_no_embed() + with io.BytesIO() as output_buffer: + manifest_data = builder.sign( + self.signer, "image/jpeg", file, output_buffer) + output_buffer.seek(0) + read_buffer = io.BytesIO(output_buffer.getvalue()) + + with Reader("image/jpeg", read_buffer, manifest_data) as reader: + manifest_data = reader.json() + + # Verify the manifest data contains expected content + self.assertIn("Python Test", manifest_data) + self.assertNotIn("validation_status", manifest_data) + + return None # Success case + + except Exception as e: + return f"Async task {task_id} error: {str(e)}" + + async def run_async_tests(): + # Create multiple async tasks + tasks = [] + num_tasks = 3 + for i in range(1, num_tasks + 1): + task = asyncio.create_task(async_thread_work(i)) + tasks.append(task) + + # Wait for all tasks to complete and collect results + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + errors = [] + for i, result in enumerate(results, 1): + if isinstance(result, Exception): + errors.append(f"Async task {i} failed with exception: {str(result)}") + elif result: # Non-None result indicates an error + errors.append(result) + + # If any errors occurred, fail the test with all error messages + if errors: + self.fail("\n".join(errors)) + + # Run the async tests + asyncio.run(run_async_tests()) + if __name__ == '__main__': unittest.main()