|
21 | 21 | # import simcore_service_webserver.functions._functions_controller_rpc as functions_rpc |
22 | 22 | from models_library.functions_errors import ( |
23 | 23 | FunctionIDNotFoundError, |
| 24 | + FunctionJobCollectionReadAccessDeniedError, |
24 | 25 | FunctionJobIDNotFoundError, |
25 | 26 | FunctionJobReadAccessDeniedError, |
26 | 27 | FunctionReadAccessDeniedError, |
@@ -153,6 +154,15 @@ async def test_register_get_delete_function( |
153 | 154 | product_name=osparc_product_name, |
154 | 155 | ) |
155 | 156 |
|
| 157 | + with pytest.raises(FunctionReadAccessDeniedError): |
| 158 | + # Attempt to delete the function by another user |
| 159 | + await functions_rpc.delete_function( |
| 160 | + rabbitmq_rpc_client=rpc_client, |
| 161 | + function_id=registered_function.uid, |
| 162 | + user_id=other_logged_user["id"], |
| 163 | + product_name="this_is_not_osparc", |
| 164 | + ) |
| 165 | + |
156 | 166 | # Delete the function using its ID |
157 | 167 | await functions_rpc.delete_function( |
158 | 168 | rabbitmq_rpc_client=rpc_client, |
@@ -812,6 +822,7 @@ async def test_function_job_collection( |
812 | 822 | mock_function: ProjectFunction, |
813 | 823 | rpc_client: RabbitMQRPCClient, |
814 | 824 | logged_user: UserInfoDict, |
| 825 | + other_logged_user: UserInfoDict, |
815 | 826 | osparc_product_name: ProductName, |
816 | 827 | ): |
817 | 828 | # Register the function first |
@@ -867,9 +878,43 @@ async def test_function_job_collection( |
867 | 878 | ) |
868 | 879 | assert registered_collection.uid is not None |
869 | 880 |
|
870 | | - # Assert the registered collection matches the input collection |
| 881 | + # Get the function job collection |
| 882 | + retrieved_collection = await functions_rpc.get_function_job_collection( |
| 883 | + rabbitmq_rpc_client=rpc_client, |
| 884 | + function_job_collection_id=registered_collection.uid, |
| 885 | + user_id=logged_user["id"], |
| 886 | + product_name=osparc_product_name, |
| 887 | + ) |
| 888 | + assert retrieved_collection.uid == registered_collection.uid |
871 | 889 | assert registered_collection.job_ids == function_job_ids |
872 | 890 |
|
| 891 | + # Test denied access for another user |
| 892 | + with pytest.raises(FunctionJobCollectionReadAccessDeniedError): |
| 893 | + await functions_rpc.get_function_job_collection( |
| 894 | + rabbitmq_rpc_client=rpc_client, |
| 895 | + function_job_collection_id=registered_collection.uid, |
| 896 | + user_id=other_logged_user["id"], |
| 897 | + product_name=osparc_product_name, |
| 898 | + ) |
| 899 | + |
| 900 | + # Test denied access for another product |
| 901 | + with pytest.raises(FunctionJobCollectionReadAccessDeniedError): |
| 902 | + await functions_rpc.get_function_job_collection( |
| 903 | + rabbitmq_rpc_client=rpc_client, |
| 904 | + function_job_collection_id=registered_collection.uid, |
| 905 | + user_id=other_logged_user["id"], |
| 906 | + product_name="this_is_not_osparc", |
| 907 | + ) |
| 908 | + |
| 909 | + # Attempt to delete the function job collection by another user |
| 910 | + with pytest.raises(FunctionJobCollectionReadAccessDeniedError): |
| 911 | + await functions_rpc.delete_function_job_collection( |
| 912 | + rabbitmq_rpc_client=rpc_client, |
| 913 | + function_job_collection_id=registered_collection.uid, |
| 914 | + user_id=other_logged_user["id"], |
| 915 | + product_name=osparc_product_name, |
| 916 | + ) |
| 917 | + |
873 | 918 | await functions_rpc.delete_function_job_collection( |
874 | 919 | rabbitmq_rpc_client=rpc_client, |
875 | 920 | function_job_collection_id=registered_collection.uid, |
|
0 commit comments