@@ -208,58 +208,90 @@ def _process_pipeline_request(
208208
209209 # Get nodes based on operation type
210210 discovered_plugin_instances = []
211+ deployment_nodes = []
212+ confirmation_nodes = []
213+ deeploy_specs_for_update = None
211214 if is_create :
212- nodes = self ._check_nodes_availability (inputs )
215+ deployment_nodes = self ._check_nodes_availability (inputs )
216+ confirmation_nodes = list (deployment_nodes )
213217 else :
214- discovered_plugin_instances = self ._discover_plugin_instances (app_id = app_id , job_id = job_id , owner = sender )
215-
218+ # Discover the live deployment so we can validate node affinity and reuse existing specs.
219+ pipeline_context = self ._gather_running_pipeline_context (
220+ owner = sender ,
221+ app_id = app_id ,
222+ job_id = job_id ,
223+ )
224+ discovered_plugin_instances = pipeline_context ["discovered_instances" ]
225+ current_nodes = pipeline_context ["nodes" ]
226+ deeploy_specs_for_update = pipeline_context ["deeploy_specs" ]
216227 self .P (f"Discovered plugin instances: { self .json_dumps (discovered_plugin_instances )} " )
217- deeploy_specs_for_update = None
218- if job_app_type in (JOB_APP_TYPES .NATIVE , JOB_APP_TYPES .GENERIC , JOB_APP_TYPES .SERVICE ):
219- discovered_plugin_instances = self ._ensure_plugin_instance_ids (
220- inputs = inputs ,
221- discovered_plugin_instances = discovered_plugin_instances ,
222- owner = sender ,
223- app_id = app_id ,
224- job_id = job_id ,
225- )
226- deeploy_specs_for_update = self ._prepare_updated_deeploy_specs (
227- owner = sender ,
228- app_id = app_id ,
229- job_id = job_id ,
230- discovered_plugin_instances = discovered_plugin_instances ,
228+
229+ requested_nodes = inputs .get (DEEPLOY_KEYS .TARGET_NODES , None ) or []
230+ normalized_requested_nodes = [
231+ self ._check_and_maybe_convert_address (node ) for node in requested_nodes
232+ ] if requested_nodes else []
233+
234+ if normalized_requested_nodes :
235+ # Reject updates that request a different node set than the one currently running.
236+ if set (normalized_requested_nodes ) != set (current_nodes ):
237+ msg = (
238+ f"{ DEEPLOY_ERRORS .NODES2 } : Update request must target existing nodes { current_nodes } . "
239+ f"Received { normalized_requested_nodes } ."
240+ )
241+ raise ValueError (msg )
242+
243+ requested_nodes_count = inputs .get (DEEPLOY_KEYS .TARGET_NODES_COUNT , 0 )
244+ if requested_nodes_count and requested_nodes_count != len (current_nodes ):
245+ msg = (
246+ f"{ DEEPLOY_ERRORS .NODES2 } : Update request must keep the original number of nodes "
247+ f"({ len (current_nodes )} ). Received { requested_nodes_count } ."
231248 )
232- nodes = [ instance [ DEEPLOY_PLUGIN_DATA . NODE ] for instance in discovered_plugin_instances ]
249+ raise ValueError ( msg )
233250
234- if is_create :
235- dct_status , str_status = self .check_and_deploy_pipelines (
236- sender = sender ,
237- inputs = inputs ,
238- app_id = app_id ,
239- app_alias = app_alias ,
240- app_type = app_type ,
241- new_nodes = nodes ,
242- update_nodes = [],
243- discovered_plugin_instances = discovered_plugin_instances ,
244- job_app_type = job_app_type ,
245- )
246- else :
247- dct_status , str_status = self .check_and_deploy_pipelines (
248- sender = sender ,
249- inputs = inputs ,
251+ inputs [DEEPLOY_KEYS .TARGET_NODES ] = current_nodes
252+ inputs .target_nodes = current_nodes
253+ inputs [DEEPLOY_KEYS .TARGET_NODES_COUNT ] = len (current_nodes )
254+ inputs .target_nodes_count = len (current_nodes )
255+
256+ # TODO: Assess whether removing the running pipeline before redeploying is safe when the new launch fails.
257+ self .delete_pipeline_from_nodes (
250258 app_id = app_id ,
251- app_alias = app_alias ,
252- app_type = app_type ,
253- new_nodes = [],
254- update_nodes = nodes ,
255- discovered_plugin_instances = discovered_plugin_instances ,
256- dct_deeploy_specs = deeploy_specs_for_update ,
257- job_app_type = job_app_type ,
259+ job_id = job_id ,
260+ owner = sender ,
261+ discovered_instances = discovered_plugin_instances ,
258262 )
263+
264+ deployment_nodes = self ._check_nodes_availability (inputs )
265+ if set (deployment_nodes ) != set (current_nodes ):
266+ msg = (
267+ f"{ DEEPLOY_ERRORS .NODES2 } : Failed to validate that update runs on existing nodes. "
268+ f"Expected { current_nodes } , validated { deployment_nodes } ."
269+ )
270+ raise ValueError (msg )
271+ confirmation_nodes = list (deployment_nodes )
272+ discovered_plugin_instances = []
273+
274+ inputs [DEEPLOY_KEYS .TARGET_NODES ] = deployment_nodes
275+ inputs .target_nodes = deployment_nodes
276+ inputs [DEEPLOY_KEYS .TARGET_NODES_COUNT ] = len (deployment_nodes )
277+ inputs .target_nodes_count = len (deployment_nodes )
278+
279+ dct_status , str_status = self .check_and_deploy_pipelines (
280+ sender = sender ,
281+ inputs = inputs ,
282+ app_id = app_id ,
283+ app_alias = app_alias ,
284+ app_type = app_type ,
285+ new_nodes = deployment_nodes ,
286+ update_nodes = [],
287+ discovered_plugin_instances = discovered_plugin_instances ,
288+ dct_deeploy_specs_create = deeploy_specs_for_update ,
289+ job_app_type = job_app_type ,
290+ )
259291
260- if str_status in [DEEPLOY_STATUS .SUCCESS , DEEPLOY_STATUS .COMMAND_DELIVERED ]:
261- if (dct_status is not None and is_confirmable_job and len (nodes ) == len (dct_status )) or not is_confirmable_job :
262- eth_nodes = [self .bc .node_addr_to_eth_addr (node ) for node in nodes ]
292+ if is_create and str_status in [DEEPLOY_STATUS .SUCCESS , DEEPLOY_STATUS .COMMAND_DELIVERED ]:
293+ if (dct_status is not None and is_confirmable_job and len (confirmation_nodes ) == len (dct_status )) or not is_confirmable_job :
294+ eth_nodes = [self .bc .node_addr_to_eth_addr (node ) for node in confirmation_nodes ]
263295 eth_nodes = sorted (eth_nodes )
264296 self .bc .submit_node_update (
265297 job_id = job_id ,
@@ -483,6 +515,7 @@ def update_pipeline(
483515
484516 Notes
485517 -----
518+ - Existing pipelines are stopped and redeployed in place; requests must reference the active node set.
486519 - Updates are applied to existing plugin instances on the same nodes
487520 - For multi-plugin pipelines, all plugins are updated with new configurations
488521 - Resource validation applies the same as create operations
0 commit comments