@@ -89,73 +89,72 @@ async def remove_orphaned_services(
8989 # otherwise if some time goes it could very well be that there are new projects opened
9090 # in between and the GC would remove services that actually should be running.
9191
92- with log_catch (_logger , reraise = False ):
93- running_services = await dynamic_scheduler_service .list_dynamic_services (app )
94- if not running_services :
95- # nothing to do
96- return
97- _logger .debug (
98- "Actual running dynamic services: %s" ,
99- [(x .node_uuid , x .host ) for x in running_services ],
100- )
101- running_services_by_id : dict [NodeID , DynamicServiceGet ] = {
102- service .node_uuid : service for service in running_services
103- }
104-
105- known_opened_project_ids = await _list_opened_project_ids (registry )
106-
107- # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
108- # Why? If a service is running but the nodes form the correspondign project cannot be listed,
109- # the service will be considered as orphaned and closed.
110- potentially_running_service_ids : list [set [NodeID ]] = []
111- async for project_nodes_future in limited_as_completed (
112- (
113- list_node_ids_in_project (app , project_id )
114- for project_id in known_opened_project_ids
115- ),
116- limit = _MAX_CONCURRENT_CALLS ,
117- ):
118- try :
119- project_nodes = await project_nodes_future
120- potentially_running_service_ids .append (project_nodes )
121- except BaseException as e : # pylint:disable=broad-exception-caught
122- _logger .warning (
123- create_troubleshotting_log_kwargs (
124- (
125- "Skipping orpahn services removal, call to "
126- "`list_node_ids_in_project` raised"
127- ),
128- error = e ,
129- error_context = {
130- "running_services" : running_services ,
131- "running_services_by_id" : running_services_by_id ,
132- "known_opened_project_ids" : known_opened_project_ids ,
133- },
92+ running_services = await dynamic_scheduler_service .list_dynamic_services (app )
93+ if not running_services :
94+ # nothing to do
95+ return
96+ _logger .debug (
97+ "Actual running dynamic services: %s" ,
98+ [(x .node_uuid , x .host ) for x in running_services ],
99+ )
100+ running_services_by_id : dict [NodeID , DynamicServiceGet ] = {
101+ service .node_uuid : service for service in running_services
102+ }
103+
104+ known_opened_project_ids = await _list_opened_project_ids (registry )
105+
106+ # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
107+ # Why? If a service is running but the nodes form the correspondign project cannot be listed,
108+ # the service will be considered as orphaned and closed.
109+ potentially_running_service_ids : list [set [NodeID ]] = []
110+ async for project_nodes_future in limited_as_completed (
111+ (
112+ list_node_ids_in_project (app , project_id )
113+ for project_id in known_opened_project_ids
114+ ),
115+ limit = _MAX_CONCURRENT_CALLS ,
116+ ):
117+ try :
118+ project_nodes = await project_nodes_future
119+ potentially_running_service_ids .append (project_nodes )
120+ except BaseException as e : # pylint:disable=broad-exception-caught
121+ _logger .warning (
122+ create_troubleshotting_log_kwargs (
123+ (
124+ "Skipping orpahn services removal, call to "
125+ "`list_node_ids_in_project` raised"
134126 ),
135- exc_info = True ,
136- )
137- continue
138-
139- potentially_running_service_ids_set : set [NodeID ] = set ().union (
140- * (node_id for node_id in potentially_running_service_ids )
141- )
142- _logger .debug (
143- "Allowed service UUIDs from known opened projects: %s" ,
144- potentially_running_service_ids_set ,
145- )
146-
147- # compute the difference to find the orphaned services
148- orphaned_running_service_ids = (
149- set (running_services_by_id ) - potentially_running_service_ids_set
150- )
151- _logger .debug ("Found orphaned services: %s" , orphaned_running_service_ids )
152- # NOTE: no need to not reraise here, since we catch everything above
153- # and logged_gather first runs everything
154- await logged_gather (
155- * (
156- _remove_service (app , node_id , running_services_by_id [node_id ])
157- for node_id in orphaned_running_service_ids
158- ),
159- log = _logger ,
160- max_concurrency = _MAX_CONCURRENT_CALLS ,
161- )
127+ error = e ,
128+ error_context = {
129+ "running_services" : running_services ,
130+ "running_services_by_id" : running_services_by_id ,
131+ "known_opened_project_ids" : known_opened_project_ids ,
132+ },
133+ ),
134+ exc_info = True ,
135+ )
136+ continue
137+
138+ potentially_running_service_ids_set : set [NodeID ] = set ().union (
139+ * (node_id for node_id in potentially_running_service_ids )
140+ )
141+ _logger .debug (
142+ "Allowed service UUIDs from known opened projects: %s" ,
143+ potentially_running_service_ids_set ,
144+ )
145+
146+ # compute the difference to find the orphaned services
147+ orphaned_running_service_ids = (
148+ set (running_services_by_id ) - potentially_running_service_ids_set
149+ )
150+ _logger .debug ("Found orphaned services: %s" , orphaned_running_service_ids )
151+ # NOTE: no need to not reraise here, since we catch everything above
152+ # and logged_gather first runs everything
153+ await logged_gather (
154+ * (
155+ _remove_service (app , node_id , running_services_by_id [node_id ])
156+ for node_id in orphaned_running_service_ids
157+ ),
158+ log = _logger ,
159+ max_concurrency = _MAX_CONCURRENT_CALLS ,
160+ )
0 commit comments