@@ -110,7 +110,7 @@ def __check_is_oracle(self, inputs):
110110 raise ValueError ("Sender {} is not an oracle" .format (sender ))
111111 return True
112112
113- def __create_pipeline_on_nodes (self , nodes , inputs , app_id , app_alias , app_type , sender , job_app_type = None ):
113+ def __create_pipeline_on_nodes (self , nodes , inputs , app_id , app_alias , app_type , sender , job_app_type = None , dct_deeploy_specs = None ):
114114 """
115115 Create new pipelines on each node and set CSTORE `response_key` for the "callback" action
116116 """
@@ -124,18 +124,34 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
124124 response_keys = self .defaultdict (list )
125125
126126 ts = self .time ()
127- dct_deeploy_specs = {
128- DEEPLOY_KEYS .JOB_ID : job_id ,
129- DEEPLOY_KEYS .PROJECT_ID : project_id ,
130- DEEPLOY_KEYS .PROJECT_NAME : project_name ,
131- DEEPLOY_KEYS .NR_TARGET_NODES : len (nodes ),
132- DEEPLOY_KEYS .CURRENT_TARGET_NODES : nodes ,
133- DEEPLOY_KEYS .JOB_TAGS : job_tags ,
134- DEEPLOY_KEYS .DATE_CREATED : ts ,
135- DEEPLOY_KEYS .DATE_UPDATED : ts ,
136- DEEPLOY_KEYS .SPARE_NODES : spare_nodes ,
137- DEEPLOY_KEYS .ALLOW_REPLICATION_IN_THE_WILD : allow_replication_in_the_wild ,
138- }
127+ if dct_deeploy_specs :
128+ dct_deeploy_specs = self .deepcopy (dct_deeploy_specs )
129+ dct_deeploy_specs [DEEPLOY_KEYS .DATE_UPDATED ] = ts
130+ if DEEPLOY_KEYS .DATE_CREATED not in dct_deeploy_specs :
131+ dct_deeploy_specs [DEEPLOY_KEYS .DATE_CREATED ] = ts
132+ else :
133+ dct_deeploy_specs = {
134+ DEEPLOY_KEYS .NR_TARGET_NODES : len (nodes ),
135+ DEEPLOY_KEYS .CURRENT_TARGET_NODES : nodes ,
136+ DEEPLOY_KEYS .JOB_TAGS : job_tags ,
137+ DEEPLOY_KEYS .DATE_CREATED : ts ,
138+ DEEPLOY_KEYS .DATE_UPDATED : ts ,
139+ DEEPLOY_KEYS .SPARE_NODES : spare_nodes ,
140+ DEEPLOY_KEYS .ALLOW_REPLICATION_IN_THE_WILD : allow_replication_in_the_wild ,
141+ }
142+
143+ if job_id is not None or DEEPLOY_KEYS .JOB_ID not in dct_deeploy_specs :
144+ dct_deeploy_specs [DEEPLOY_KEYS .JOB_ID ] = job_id
145+ if project_id is not None or DEEPLOY_KEYS .PROJECT_ID not in dct_deeploy_specs :
146+ dct_deeploy_specs [DEEPLOY_KEYS .PROJECT_ID ] = project_id
147+ if project_name is not None or DEEPLOY_KEYS .PROJECT_NAME not in dct_deeploy_specs :
148+ dct_deeploy_specs [DEEPLOY_KEYS .PROJECT_NAME ] = project_name
149+ dct_deeploy_specs [DEEPLOY_KEYS .NR_TARGET_NODES ] = len (nodes )
150+ dct_deeploy_specs [DEEPLOY_KEYS .CURRENT_TARGET_NODES ] = nodes
151+ dct_deeploy_specs [DEEPLOY_KEYS .JOB_TAGS ] = job_tags
152+ dct_deeploy_specs [DEEPLOY_KEYS .SPARE_NODES ] = spare_nodes
153+ dct_deeploy_specs [DEEPLOY_KEYS .ALLOW_REPLICATION_IN_THE_WILD ] = allow_replication_in_the_wild
154+
139155 detected_job_app_type = job_app_type or self .deeploy_detect_job_app_type (plugins )
140156 if detected_job_app_type in JOB_APP_TYPES_ALL :
141157 dct_deeploy_specs [DEEPLOY_KEYS .JOB_APP_TYPE ] = detected_job_app_type
@@ -419,6 +435,46 @@ def _prepare_updated_deeploy_specs(self, owner, app_id, job_id, discovered_plugi
419435 refreshed_specs [DEEPLOY_KEYS .DATE_UPDATED ] = self .time ()
420436 return refreshed_specs
421437
438+ def _gather_running_pipeline_context (self , owner , app_id = None , job_id = None ):
439+ """
440+ Collect information about currently running pipeline instances for a job/app.
441+
442+ Ensures follow-up operations keep parity with the active deployment state.
443+
444+ Returns
445+ -------
446+ dict
447+ {
448+ 'discovered_instances': list,
449+ 'nodes': list[str],
450+ 'deeploy_specs': dict | None,
451+ }
452+ """
453+ discovered_instances = self ._discover_plugin_instances (app_id = app_id , job_id = job_id , owner = owner )
454+ nodes = []
455+ for instance in discovered_instances :
456+ node_addr = instance .get (DEEPLOY_PLUGIN_DATA .NODE )
457+ if node_addr and node_addr not in nodes :
458+ nodes .append (node_addr )
459+
460+ if not nodes :
461+ msg = f"{ DEEPLOY_ERRORS .NODES3 } : No running workers found for provided "
462+ msg += f"{ f'app_id { app_id } ' if app_id else f'job_id { job_id } ' } and owner '{ owner } '."
463+ raise ValueError (msg )
464+
465+ deeploy_specs = self ._prepare_updated_deeploy_specs (
466+ owner = owner ,
467+ app_id = app_id ,
468+ job_id = job_id ,
469+ discovered_plugin_instances = discovered_instances ,
470+ )
471+
472+ return {
473+ "discovered_instances" : discovered_instances ,
474+ "nodes" : nodes ,
475+ "deeploy_specs" : deeploy_specs ,
476+ }
477+
422478 def __prepare_plugins_for_update (self , inputs , discovered_plugin_instances ):
423479 """
424480 Prepare plugins for update using discovered instances instead of creating new ones
@@ -1333,7 +1389,7 @@ def deeploy_prepare_plugins(self, inputs):
13331389 plugins = [plugin ]
13341390 return plugins
13351391
1336- def check_and_deploy_pipelines (self , sender , inputs , app_id , app_alias , app_type , update_nodes , new_nodes , discovered_plugin_instances = [], dct_deeploy_specs = None , job_app_type = None ):
1392+ def check_and_deploy_pipelines (self , sender , inputs , app_id , app_alias , app_type , update_nodes , new_nodes , discovered_plugin_instances = [], dct_deeploy_specs = None , job_app_type = None , dct_deeploy_specs_create = None ):
13371393 """
13381394 Validate the inputs and deploy the pipeline on the target nodes.
13391395 """
@@ -1349,7 +1405,7 @@ def check_and_deploy_pipelines(self, sender, inputs, app_id, app_alias, app_type
13491405 update_response_keys = self .__update_pipeline_on_nodes (update_nodes , inputs , app_id , app_alias , app_type , sender , discovered_plugin_instances , dct_deeploy_specs , job_app_type = job_app_type )
13501406 response_keys .update (update_response_keys )
13511407 if len (new_nodes ) > 0 :
1352- new_response_keys = self .__create_pipeline_on_nodes (new_nodes , inputs , app_id , app_alias , app_type , sender , job_app_type = job_app_type )
1408+ new_response_keys = self .__create_pipeline_on_nodes (new_nodes , inputs , app_id , app_alias , app_type , sender , job_app_type = job_app_type , dct_deeploy_specs = dct_deeploy_specs_create )
13531409 response_keys .update (new_response_keys )
13541410
13551411 # Phase 3: Wait until all the responses are received via CSTORE and compose status response
@@ -1873,8 +1929,9 @@ def check_running_pipelines_and_add_to_r1fs(self):
18731929
18741930 return netmon_job_ids
18751931
1876- def delete_pipeline_from_nodes (self , app_id = None , job_id = None , owner = None , allow_missing = False ):
1877- discovered_instances = self ._discover_plugin_instances (app_id = app_id , job_id = job_id , owner = owner )
1932+ def delete_pipeline_from_nodes (self , app_id = None , job_id = None , owner = None , allow_missing = False , discovered_instances = None ):
1933+ if discovered_instances is None :
1934+ discovered_instances = self ._discover_plugin_instances (app_id = app_id , job_id = job_id , owner = owner )
18781935
18791936 if len (discovered_instances ) == 0 :
18801937 if allow_missing :
0 commit comments