@@ -582,3 +582,152 @@ async def test_rpc_list_my_service_history_paginated(
582582 assert len (release_history ) == 2
583583 assert release_history [0 ].version == service_version_2 , "expected newest first"
584584 assert release_history [1 ].version == service_version_1
585+
586+
587+ async def test_rpc_get_service_ports_successful_retrieval (
588+ background_sync_task_mocked : None ,
589+ mocked_director_rest_api : MockRouter ,
590+ rpc_client : RabbitMQRPCClient ,
591+ product_name : ProductName ,
592+ user_id : UserID ,
593+ app : FastAPI ,
594+ create_fake_service_data : Callable ,
595+ services_db_tables_injector : Callable ,
596+ ):
597+ """Tests successful retrieval of service ports for a specific service version"""
598+ assert app
599+
600+ # Create a service with known ports
601+ service_key = "simcore/services/comp/test-service-ports"
602+ service_version = "1.0.0"
603+
604+ # Create and inject the service
605+ fake_service = create_fake_service_data (
606+ service_key ,
607+ service_version ,
608+ team_access = None ,
609+ everyone_access = None ,
610+ product = product_name ,
611+ )
612+ await services_db_tables_injector ([fake_service ])
613+
614+ # Import the get_service_ports function from the client
615+ from servicelib .rabbitmq .rpc_interfaces .catalog .services import get_service_ports
616+
617+ # Call the RPC function to get service ports
618+ ports = await get_service_ports (
619+ rpc_client ,
620+ product_name = product_name ,
621+ user_id = user_id ,
622+ service_key = service_key ,
623+ service_version = service_version ,
624+ )
625+
626+ # Validate the response
627+ assert isinstance (ports , list )
628+ # Each port should have expected fields
629+ for port in ports :
630+ assert hasattr (port , "kind" )
631+ assert hasattr (port , "key" )
632+ assert hasattr (port , "port" )
633+
634+
635+ async def test_rpc_get_service_ports_not_found (
636+ background_sync_task_mocked : None ,
637+ mocked_director_rest_api : MockRouter ,
638+ rpc_client : RabbitMQRPCClient ,
639+ product_name : ProductName ,
640+ user_id : UserID ,
641+ app : FastAPI ,
642+ ):
643+ """Tests that appropriate error is raised when service does not exist"""
644+ assert app
645+
646+ # Import the get_service_ports function from the client
647+ from servicelib .rabbitmq .rpc_interfaces .catalog .services import get_service_ports
648+
649+ service_version = "1.0.0"
650+ non_existent_key = "simcore/services/comp/non-existent-service"
651+
652+ # Test service not found scenario
653+ with pytest .raises (CatalogItemNotFoundError , match = "non-existent-service" ):
654+ await get_service_ports (
655+ rpc_client ,
656+ product_name = product_name ,
657+ user_id = user_id ,
658+ service_key = non_existent_key ,
659+ service_version = service_version ,
660+ )
661+
662+
663+ async def test_rpc_get_service_ports_permission_denied (
664+ background_sync_task_mocked : None ,
665+ mocked_director_rest_api : MockRouter ,
666+ rpc_client : RabbitMQRPCClient ,
667+ product_name : ProductName ,
668+ user : dict [str , Any ],
669+ user_id : UserID ,
670+ app : FastAPI ,
671+ create_fake_service_data : Callable ,
672+ services_db_tables_injector : Callable ,
673+ ):
674+ """Tests that appropriate error is raised when user doesn't have permission"""
675+ assert app
676+
677+ # Import the get_service_ports function from the client
678+ from servicelib .rabbitmq .rpc_interfaces .catalog .services import get_service_ports
679+
680+ # Create a service with restricted access
681+ restricted_service_key = "simcore/services/comp/restricted-service"
682+ service_version = "1.0.0"
683+
684+ fake_restricted_service = create_fake_service_data (
685+ restricted_service_key ,
686+ service_version ,
687+ team_access = None ,
688+ everyone_access = None ,
689+ product = product_name ,
690+ )
691+
692+ # Modify access rights to restrict access
693+ if "access_rights" in fake_restricted_service :
694+ # Remove user's access if present
695+ if user ["primary_gid" ] in fake_restricted_service ["access_rights" ]:
696+ fake_restricted_service ["access_rights" ].pop (user ["primary_gid" ])
697+
698+ await services_db_tables_injector ([fake_restricted_service ])
699+
700+ # Attempt to access without permission
701+ with pytest .raises (CatalogForbiddenError ):
702+ await get_service_ports (
703+ rpc_client ,
704+ product_name = product_name ,
705+ user_id = UserID ("different-user" ), # Use a different user ID
706+ service_key = restricted_service_key ,
707+ service_version = service_version ,
708+ )
709+
710+
711+ async def test_rpc_get_service_ports_validation_error (
712+ background_sync_task_mocked : None ,
713+ mocked_director_rest_api : MockRouter ,
714+ rpc_client : RabbitMQRPCClient ,
715+ product_name : ProductName ,
716+ user_id : UserID ,
717+ app : FastAPI ,
718+ ):
719+ """Tests validation error handling for invalid service key format"""
720+ assert app
721+
722+ # Import the get_service_ports function from the client
723+ from servicelib .rabbitmq .rpc_interfaces .catalog .services import get_service_ports
724+
725+ # Test with invalid service key format
726+ with pytest .raises (ValidationError , match = "service_key" ):
727+ await get_service_ports (
728+ rpc_client ,
729+ product_name = product_name ,
730+ user_id = user_id ,
731+ service_key = "invalid-service-key-format" ,
732+ service_version = "1.0.0" ,
733+ )
0 commit comments