Skip to content

Commit a4d2b09

Browse files
Merge pull request #296 from Ratio1/develop
Develop
2 parents c6f31b7 + d94c194 commit a4d2b09

File tree

11 files changed

+828
-54
lines changed

11 files changed

+828
-54
lines changed

extensions/business/deeploy/deeploy_const.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class DEEPLOY_KEYS:
3939
CHAINSTORE_RESPONSE = "chainstore_response"
4040
PIPELINE_INPUT_TYPE = "pipeline_input_type"
4141
PIPELINE_INPUT_URI = "pipeline_input_uri"
42+
PIPELINE_PARAMS = "pipeline_params"
43+
JOB_CONFIG = "job_config"
4244
# App params keys
4345
APP_PARAMS = "app_params"
4446
APP_PARAMS_IMAGE = "IMAGE"
@@ -543,6 +545,7 @@ class JOB_APP_TYPES:
543545
# Pipeline data source
544546
"pipeline_input_type" : "void", # Pipeline TYPE
545547
"pipeline_input_uri" : None,
548+
"pipeline_params": {},
546549
"chainstore_response" : True,
547550

548551
# Plugins array (NEW FORMAT - each object is a plugin instance)

extensions/business/deeploy/deeploy_job_mixin.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ def extract_invariable_data_from_pipeline(self, pipeline: dict):
6767
}
6868
"""
6969

70+
if pipeline is None:
71+
return None
72+
if not isinstance(pipeline, dict):
73+
return None
74+
7075
# Create a copy of the pipeline and remove the TIME field
7176
extracted_data = pipeline.copy()
7277
extracted_data.pop("TIME", None)
@@ -88,9 +93,20 @@ def save_job_pipeline_in_cstore(self, pipeline: dict, job_id: int):
8893
"""
8994
result = False
9095
try:
96+
if pipeline is None:
97+
self.P(f"Skipping CSTORE save for job {job_id}: pipeline is None", color='y')
98+
return False
99+
if not isinstance(pipeline, dict):
100+
self.P(f"Skipping CSTORE save for job {job_id}: pipeline is of type {type(pipeline).__name__}", color='y')
101+
return False
102+
91103
self.P("Saving pipeline to CSTORE...")
92104

93105
sanitized_pipeline = self.extract_invariable_data_from_pipeline(pipeline)
106+
if sanitized_pipeline is None:
107+
self.P(f"Skipping CSTORE save for job {job_id}: unable to sanitize pipeline payload", color='y')
108+
return False
109+
94110
sorted_pipeline = self._recursively_sort_pipeline_data(sanitized_pipeline)
95111
cid = self._save_pipeline_to_r1fs(sorted_pipeline)
96112

extensions/business/deeploy/deeploy_manager_api.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ def _process_pipeline_request(
169169
job_id = inputs.get(DEEPLOY_KEYS.JOB_ID, None)
170170
is_confirmable_job = inputs.chainstore_response
171171

172+
pipeline_params = self._extract_pipeline_params(inputs)
173+
inputs[DEEPLOY_KEYS.PIPELINE_PARAMS] = pipeline_params
174+
172175
# Validate plugins array structure and required fields for each plugin
173176
plugins_array = inputs.get(DEEPLOY_KEYS.PLUGINS)
174177
if plugins_array:
@@ -276,6 +279,22 @@ def _process_pipeline_request(
276279
inputs[DEEPLOY_KEYS.TARGET_NODES_COUNT] = len(deployment_nodes)
277280
inputs.target_nodes_count = len(deployment_nodes)
278281

282+
if deeploy_specs_for_update is not None and not isinstance(deeploy_specs_for_update, dict):
283+
msg = (
284+
f"{DEEPLOY_ERRORS.REQUEST3}. Unexpected 'deeploy_specs' payload type "
285+
f"{type(deeploy_specs_for_update).__name__}."
286+
)
287+
raise ValueError(msg)
288+
deeploy_specs_payload = (
289+
self.deepcopy(deeploy_specs_for_update)
290+
if isinstance(deeploy_specs_for_update, dict)
291+
else {}
292+
)
293+
deeploy_specs_payload = self._ensure_deeploy_specs_job_config(
294+
deeploy_specs_payload,
295+
pipeline_params=pipeline_params,
296+
)
297+
279298
dct_status, str_status = self.check_and_deploy_pipelines(
280299
sender=sender,
281300
inputs=inputs,
@@ -285,7 +304,7 @@ def _process_pipeline_request(
285304
new_nodes=deployment_nodes,
286305
update_nodes=[],
287306
discovered_plugin_instances=discovered_plugin_instances,
288-
dct_deeploy_specs_create=deeploy_specs_for_update,
307+
dct_deeploy_specs_create=deeploy_specs_payload,
289308
job_app_type=job_app_type,
290309
)
291310

@@ -317,6 +336,8 @@ def _process_pipeline_request(
317336
plugins_array = inputs.get(DEEPLOY_KEYS.PLUGINS)
318337
if plugins_array:
319338
dct_request['plugins_count'] = len(plugins_array)
339+
# if pipeline_params:
340+
# dct_request[DEEPLOY_KEYS.PIPELINE_PARAMS] = pipeline_params
320341

321342
result = {
322343
DEEPLOY_KEYS.STATUS: str_status,
@@ -370,6 +391,9 @@ def create_pipeline(
370391
job_tags : list
371392
Tags for filtering target nodes
372393
Example: ["KYB", "DC:HOSTINGER", "CT:FR|IT|RO", "REG:EU"]
394+
pipeline_params : dict, optional
395+
Additional pipeline-level parameters forwarded to the data capture thread. `null` falls back to `{}`.
396+
The provided keys are merged into the pipeline configuration at the top level.
373397
374398
nonce : str
375399
The nonce used for signing the request
@@ -486,6 +510,10 @@ def update_pipeline(
486510
job_id : int
487511
The job ID from blockchain
488512
513+
pipeline_params : dict, optional
514+
Additional pipeline-level parameters forwarded to the data capture thread. `null` falls back to `{}`.
515+
The provided keys are merged into the pipeline configuration at the top level.
516+
489517
nonce : str
490518
The nonce used for signing the request
491519

0 commit comments

Comments
 (0)