@@ -169,6 +169,9 @@ def _process_pipeline_request(
169169 job_id = inputs .get (DEEPLOY_KEYS .JOB_ID , None )
170170 is_confirmable_job = inputs .chainstore_response
171171
172+ pipeline_params = self ._extract_pipeline_params (inputs )
173+ inputs [DEEPLOY_KEYS .PIPELINE_PARAMS ] = pipeline_params
174+
172175 # Validate plugins array structure and required fields for each plugin
173176 plugins_array = inputs .get (DEEPLOY_KEYS .PLUGINS )
174177 if plugins_array :
@@ -276,6 +279,22 @@ def _process_pipeline_request(
276279 inputs [DEEPLOY_KEYS .TARGET_NODES_COUNT ] = len (deployment_nodes )
277280 inputs .target_nodes_count = len (deployment_nodes )
278281
282+ if deeploy_specs_for_update is not None and not isinstance (deeploy_specs_for_update , dict ):
283+ msg = (
284+ f"{ DEEPLOY_ERRORS .REQUEST3 } . Unexpected 'deeploy_specs' payload type "
285+ f"{ type (deeploy_specs_for_update ).__name__ } ."
286+ )
287+ raise ValueError (msg )
288+ deeploy_specs_payload = (
289+ self .deepcopy (deeploy_specs_for_update )
290+ if isinstance (deeploy_specs_for_update , dict )
291+ else {}
292+ )
293+ deeploy_specs_payload = self ._ensure_deeploy_specs_job_config (
294+ deeploy_specs_payload ,
295+ pipeline_params = pipeline_params ,
296+ )
297+
279298 dct_status , str_status = self .check_and_deploy_pipelines (
280299 sender = sender ,
281300 inputs = inputs ,
@@ -285,7 +304,7 @@ def _process_pipeline_request(
285304 new_nodes = deployment_nodes ,
286305 update_nodes = [],
287306 discovered_plugin_instances = discovered_plugin_instances ,
288- dct_deeploy_specs_create = deeploy_specs_for_update ,
307+ dct_deeploy_specs_create = deeploy_specs_payload ,
289308 job_app_type = job_app_type ,
290309 )
291310
@@ -317,6 +336,8 @@ def _process_pipeline_request(
317336 plugins_array = inputs .get (DEEPLOY_KEYS .PLUGINS )
318337 if plugins_array :
319338 dct_request ['plugins_count' ] = len (plugins_array )
339+ # if pipeline_params:
340+ # dct_request[DEEPLOY_KEYS.PIPELINE_PARAMS] = pipeline_params
320341
321342 result = {
322343 DEEPLOY_KEYS .STATUS : str_status ,
@@ -370,6 +391,9 @@ def create_pipeline(
370391 job_tags : list
371392 Tags for filtering target nodes
372393 Example: ["KYB", "DC:HOSTINGER", "CT:FR|IT|RO", "REG:EU"]
394+ pipeline_params : dict, optional
395+ Additional pipeline-level parameters forwarded to the data capture thread. `null` falls back to `{}`.
396+ The provided keys are merged into the pipeline configuration at the top level.
373397
374398 nonce : str
375399 The nonce used for signing the request
@@ -486,6 +510,10 @@ def update_pipeline(
486510 job_id : int
487511 The job ID from blockchain
488512
513+ pipeline_params : dict, optional
514+ Additional pipeline-level parameters forwarded to the data capture thread. `null` falls back to `{}`.
515+ The provided keys are merged into the pipeline configuration at the top level.
516+
489517 nonce : str
490518 The nonce used for signing the request
491519
0 commit comments