Skip to content

Commit 6964ff0

Browse files
authored
feat: add support for escrow delegates (#330)
1 parent 3225f75 commit 6964ff0

File tree

4 files changed

+40
-60
lines changed

4 files changed

+40
-60
lines changed

extensions/business/deeploy/deeploy_const.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ class DEEPLOY_KEYS:
5252
PLUGIN_INSTANCE_ID = "instance_id"
5353
# Auth result keys
5454
SENDER = "sender"
55-
SENDER_ORACLES = "sender_oracles"
56-
SENDER_NODES_COUNT = "sender_nodes_count"
57-
SENDER_TOTAL_COUNT = "sender_total_count"
55+
SENDER_ESCROW = "sender_escrow"
56+
ESCROW_OWNER = "escrow_owner"
5857

5958
# Config keys
6059
DATE_UPDATED = "date_updated"

extensions/business/deeploy/deeploy_manager_api.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ def get_apps(
122122
sender, inputs = self.deeploy_verify_and_get_inputs(request)
123123
auth_result = self.deeploy_get_auth_result(inputs)
124124

125-
apps = self._get_online_apps(owner=sender)
126-
127-
# TODO: (Vitalii) filter apps by the sender address (OWNER)
125+
apps = self._get_online_apps(owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
128126

129127
result = {
130128
DEEPLOY_KEYS.STATUS : DEEPLOY_STATUS.SUCCESS,
@@ -206,7 +204,7 @@ def _process_pipeline_request(
206204
raise ValueError(msg)
207205

208206
# check payment
209-
is_valid = self.deeploy_check_payment_and_job_owner(inputs, sender, is_create=is_create, debug=self.cfg_deeploy_verbose > 1)
207+
is_valid = self.deeploy_check_payment_and_job_owner(inputs, auth_result[DEEPLOY_KEYS.ESCROW_OWNER], is_create=is_create, debug=self.cfg_deeploy_verbose > 1)
210208
if not is_valid:
211209
msg = f"{DEEPLOY_ERRORS.PAYMENT1}: The request job is not paid, or the job is not sent by the job owner."
212210
raise ValueError(msg)
@@ -225,7 +223,7 @@ def _process_pipeline_request(
225223
else:
226224
# Discover the live deployment so we can validate node affinity and reuse existing specs.
227225
pipeline_context = self._gather_running_pipeline_context(
228-
owner=sender,
226+
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
229227
app_id=app_id,
230228
job_id=job_id,
231229
)
@@ -277,7 +275,7 @@ def _process_pipeline_request(
277275
self._ensure_plugin_instance_ids(
278276
inputs,
279277
discovered_plugin_instances=discovered_plugin_instances,
280-
owner=sender,
278+
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
281279
app_id=app_id,
282280
job_id=job_id,
283281
)
@@ -300,7 +298,7 @@ def _process_pipeline_request(
300298
self.delete_pipeline_from_nodes(
301299
app_id=app_id,
302300
job_id=job_id,
303-
owner=sender,
301+
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
304302
discovered_instances=discovered_plugin_instances,
305303
)
306304

@@ -331,7 +329,7 @@ def _process_pipeline_request(
331329
)
332330

333331
dct_status, str_status = self.check_and_deploy_pipelines(
334-
sender=sender,
332+
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
335333
inputs=inputs,
336334
app_id=app_id,
337335
app_alias=app_alias,
@@ -629,27 +627,27 @@ def scale_up_job_workers(self,
629627
is_confirmable_job = inputs.chainstore_response
630628

631629
# check payment
632-
is_valid = self.deeploy_check_payment_and_job_owner(inputs, sender, is_create=False, debug=self.cfg_deeploy_verbose > 1)
630+
is_valid = self.deeploy_check_payment_and_job_owner(inputs, auth_result[DEEPLOY_KEYS.ESCROW_OWNER], is_create=False, debug=self.cfg_deeploy_verbose > 1)
633631
if not is_valid:
634632
msg = f"{DEEPLOY_ERRORS.PAYMENT1}: The request job is not paid, or the job is not sent by the job owner."
635633
raise ValueError(msg)
636634

637-
running_apps_for_job = self._get_online_apps(job_id=job_id, owner=sender)
635+
running_apps_for_job = self._get_online_apps(job_id=job_id, owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
638636

639637
# todo: check the count of running workers and compare with the amount of allowed workers count from blockchain.
640638

641639
self.P(f"Discovered running apps for job: {self.json_dumps(running_apps_for_job)}")
642640

643641
if not running_apps_for_job or not len(running_apps_for_job):
644-
msg = f"{DEEPLOY_ERRORS.NODES3}: No running workers found for provided job_id and owner '{sender}'."
642+
msg = f"{DEEPLOY_ERRORS.NODES3}: No running workers found for provided job_id and owner '{auth_result[DEEPLOY_KEYS.ESCROW_OWNER]}'."
645643
raise ValueError(msg)
646644

647645
update_nodes = list(running_apps_for_job.keys())
648646
new_nodes = self._check_nodes_availability(inputs)
649647

650648
dct_status, str_status = self.scale_up_job(new_nodes=new_nodes,
651649
update_nodes=update_nodes,
652-
sender=sender,
650+
owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER],
653651
job_id=job_id,
654652
running_apps_for_job=running_apps_for_job)
655653

@@ -718,7 +716,7 @@ def delete_pipeline(self,
718716
job_id = inputs.get(DEEPLOY_KEYS.JOB_ID, None)
719717
app_id = inputs.get(DEEPLOY_KEYS.APP_ID, None)
720718

721-
discovered_instances = self.delete_pipeline_from_nodes(app_id=app_id, job_id=job_id, owner=sender)
719+
discovered_instances = self.delete_pipeline_from_nodes(app_id=app_id, job_id=job_id, owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
722720
request_payload = {
723721
DEEPLOY_KEYS.STATUS: DEEPLOY_STATUS.SUCCESS,
724722
DEEPLOY_KEYS.TARGETS: discovered_instances,
@@ -778,7 +776,7 @@ def send_instance_command(self,
778776
# Validate the request fields.
779777
self._validate_send_instance_command_request(inputs)
780778

781-
self.send_instance_command_to_nodes(inputs, owner=sender)
779+
self.send_instance_command_to_nodes(inputs, owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
782780

783781
result = {
784782
DEEPLOY_KEYS.REQUEST : {
@@ -831,7 +829,7 @@ def send_app_command(self,
831829
# Validate the request fields.
832830
self._validate_send_app_command_request(inputs)
833831

834-
discovered_pipelines = self.discover_and_send_instance_command(inputs, owner=sender)
832+
discovered_pipelines = self.discover_and_send_instance_command(inputs, owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER])
835833
targets = []
836834
for discovered_pipeline in discovered_pipelines:
837835
targets.append([discovered_pipeline[DEEPLOY_PLUGIN_DATA.NODE],

extensions/business/deeploy/deeploy_mixin.py

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -80,27 +80,11 @@ def _check_plugin_signature(self, signature: str):
8080

8181
def __check_allowed_wallet(self, inputs):
8282
sender = inputs.get(BASE_CT.BCctbase.ETH_SENDER)
83-
eth_nodes = self.bc.get_wallet_nodes(sender)
84-
if len(eth_nodes) == 0:
85-
raise ValueError("No nodes found for wallet {}".format(sender))
86-
eth_oracles = self.bc.get_eth_oracles()
87-
if len(eth_oracles) == 0:
88-
raise ValueError("No oracles found - this is a critical issue!")
89-
oracle_found = False
90-
wallet_oracles = []
91-
wallet_nodes = []
92-
for node in eth_nodes:
93-
if node in eth_oracles:
94-
oracle_found = True
95-
wallet_oracles.append(node)
96-
else:
97-
wallet_nodes.append(node)
98-
#endif
99-
#endfor each node
100-
if not oracle_found:
101-
raise ValueError("No oracles found for wallet {}".format(sender))
102-
inputs.wallet_nodes = wallet_nodes
103-
inputs.wallet_oracles = wallet_oracles
83+
escrow_details = self.bc.get_user_escrow_details(sender)
84+
if not escrow_details.get('isActive', False):
85+
raise ValueError("Wallet {} has no active escrow".format(sender))
86+
inputs.wallet_escrow = escrow_details['escrowAddress']
87+
inputs.wallet_escrow_owner = escrow_details['escrowOwner']
10488
return inputs
10589

10690
def __check_is_oracle(self, inputs):
@@ -112,7 +96,7 @@ def __check_is_oracle(self, inputs):
11296
raise ValueError("Sender {} is not an oracle".format(sender))
11397
return True
11498

115-
def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, sender, job_app_type=None, dct_deeploy_specs=None):
99+
def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, owner, job_app_type=None, dct_deeploy_specs=None):
116100
"""
117101
Create new pipelines on each node and set CSTORE `response_key` for the "callback" action
118102
"""
@@ -223,7 +207,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
223207
app_alias=app_alias,
224208
pipeline_type=app_type,
225209
node_address=addr,
226-
owner=sender,
210+
owner=owner,
227211
url=inputs.pipeline_input_uri,
228212
plugins=node_plugins,
229213
is_deeployed=True,
@@ -243,7 +227,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
243227
cleaned_response_keys = prepared_response_keys if inputs.chainstore_response else {}
244228
return cleaned_response_keys
245229

246-
def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, sender, discovered_plugin_instances, dct_deeploy_specs = None, job_app_type=None):
230+
def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, owner, discovered_plugin_instances, dct_deeploy_specs = None, job_app_type=None):
247231
"""
248232
Create new pipelines on each node and set CSTORE `response_key` for the "callback" action
249233
"""
@@ -447,7 +431,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type,
447431
app_alias=app_alias,
448432
pipeline_type=app_type,
449433
node_address=addr,
450-
owner=sender,
434+
owner=owner,
451435
url=inputs.pipeline_input_uri,
452436
plugins=node_plugins,
453437
is_deeployed=True,
@@ -694,7 +678,7 @@ def deeploy_verify_and_get_inputs(self, request: dict, require_sender_is_oracle:
694678
if addr.lower() != sender.lower():
695679
raise ValueError("Invalid signature: recovered {} != {}".format(addr, sender))
696680

697-
# Check if the sender is allowed to create pipelines
681+
# Check if the sender is allowed to make the request
698682
if require_sender_is_oracle:
699683
self.__check_is_oracle(inputs)
700684
else:
@@ -1098,9 +1082,8 @@ def deeploy_get_auth_result(self, inputs):
10981082
result = {
10991083
DEEPLOY_KEYS.SENDER: sender,
11001084
DEEPLOY_KEYS.NONCE: self.deeploy_get_nonce(inputs.nonce),
1101-
DEEPLOY_KEYS.SENDER_ORACLES: inputs.wallet_oracles,
1102-
DEEPLOY_KEYS.SENDER_NODES_COUNT: len(inputs.wallet_nodes),
1103-
DEEPLOY_KEYS.SENDER_TOTAL_COUNT: len(inputs.wallet_nodes) + len(inputs.wallet_oracles),
1085+
DEEPLOY_KEYS.SENDER_ESCROW: inputs.wallet_escrow,
1086+
DEEPLOY_KEYS.ESCROW_OWNER: inputs.wallet_escrow_owner,
11041087
}
11051088
return result
11061089

@@ -1211,12 +1194,12 @@ def _organize_requested_plugins(self, inputs):
12111194
return plugins_by_instance_id, plugins_by_signature, new_plugin_configs
12121195
# TODO: END FIXME
12131196

1214-
def deeploy_check_payment_and_job_owner(self, inputs, sender, is_create, debug=False):
1197+
def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create, debug=False):
12151198
"""
12161199
Check if the payment is valid for the given job.
12171200
"""
12181201
self.Pd(f"=== deeploy_check_payment_and_job_owner ===")
1219-
self.Pd(f" sender: {sender}")
1202+
self.Pd(f" owner: {owner}")
12201203
self.Pd(f" is_create: {is_create}")
12211204
self.Pd(f" debug: {debug}")
12221205

@@ -1229,7 +1212,7 @@ def deeploy_check_payment_and_job_owner(self, inputs, sender, is_create, debug=F
12291212
return True
12301213

12311214
job_id = inputs.get(DEEPLOY_KEYS.JOB_ID, None)
1232-
self.Pd(f"Checking payment for job {job_id} by sender {sender}{' (debug mode)' if debug else ''}")
1215+
self.Pd(f"Checking payment for job {job_id} by owner {owner}{' (debug mode)' if debug else ''}")
12331216

12341217
if not job_id:
12351218
self.Pd(" No job_id provided - validation failed")
@@ -1251,8 +1234,8 @@ def deeploy_check_payment_and_job_owner(self, inputs, sender, is_create, debug=F
12511234
self.Pd(f" Job owner: {job_owner}")
12521235
self.Pd(f" Start timestamp: {start_timestamp}")
12531236

1254-
is_valid = (sender == job_owner) if sender and job_owner else False
1255-
self.Pd(f" Owner match: {is_valid} (sender={sender}, owner={job_owner})")
1237+
is_valid = (owner == job_owner) if owner and job_owner else False
1238+
self.Pd(f" Owner match: {is_valid} (owner={owner}, job_owner={job_owner})")
12561239

12571240
if is_create and start_timestamp:
12581241
self.Pd(f" Job already started (timestamp={start_timestamp}) but is_create=True - invalidating")
@@ -1741,7 +1724,7 @@ def deeploy_prepare_plugins(self, inputs):
17411724
plugins = [plugin]
17421725
return plugins
17431726

1744-
def check_and_deploy_pipelines(self, sender, inputs, app_id, app_alias, app_type, update_nodes, new_nodes, discovered_plugin_instances=[], dct_deeploy_specs=None, job_app_type=None, dct_deeploy_specs_create=None):
1727+
def check_and_deploy_pipelines(self, owner, inputs, app_id, app_alias, app_type, update_nodes, new_nodes, discovered_plugin_instances=[], dct_deeploy_specs=None, job_app_type=None, dct_deeploy_specs_create=None):
17451728
"""
17461729
Validate the inputs and deploy the pipeline on the target nodes.
17471730
"""
@@ -1754,10 +1737,10 @@ def check_and_deploy_pipelines(self, sender, inputs, app_id, app_alias, app_type
17541737
# Phase 2: Launch the pipeline on each node and set CSTORE `response_key`` for the "callback" action
17551738
response_keys = {}
17561739
if len(update_nodes) > 0:
1757-
update_response_keys = self.__update_pipeline_on_nodes(update_nodes, inputs, app_id, app_alias, app_type, sender, discovered_plugin_instances, dct_deeploy_specs, job_app_type=job_app_type)
1740+
update_response_keys = self.__update_pipeline_on_nodes(update_nodes, inputs, app_id, app_alias, app_type, owner, discovered_plugin_instances, dct_deeploy_specs, job_app_type=job_app_type)
17581741
response_keys.update(update_response_keys)
17591742
if len(new_nodes) > 0:
1760-
new_response_keys = self.__create_pipeline_on_nodes(new_nodes, inputs, app_id, app_alias, app_type, sender, job_app_type=job_app_type, dct_deeploy_specs=dct_deeploy_specs_create)
1743+
new_response_keys = self.__create_pipeline_on_nodes(new_nodes, inputs, app_id, app_alias, app_type, owner, job_app_type=job_app_type, dct_deeploy_specs=dct_deeploy_specs_create)
17611744
response_keys.update(new_response_keys)
17621745

17631746
# Phase 3: Wait until all the responses are received via CSTORE and compose status response
@@ -1770,7 +1753,7 @@ def check_and_deploy_pipelines(self, sender, inputs, app_id, app_alias, app_type
17701753

17711754
return dct_status, str_status
17721755

1773-
def scale_up_job(self, new_nodes, update_nodes, job_id, sender, running_apps_for_job):
1756+
def scale_up_job(self, new_nodes, update_nodes, job_id, owner, running_apps_for_job):
17741757
"""
17751758
Scale up the job workers.
17761759
"""
@@ -1800,7 +1783,7 @@ def scale_up_job(self, new_nodes, update_nodes, job_id, sender, running_apps_for
18001783
# Start pipelines on nodes.
18011784
self._start_create_update_pipelines(create_pipelines=create_pipelines,
18021785
update_pipelines=update_pipelines,
1803-
sender=sender)
1786+
owner=owner)
18041787

18051788
dct_status, str_status = self._get_pipeline_responses(chainstore_response_keys, 300)
18061789

@@ -2267,7 +2250,7 @@ def prepare_create_update_pipelines(self, base_pipeline, new_nodes, update_nodes
22672250

22682251
return create_pipelines, update_pipelines, prepared_response_keys
22692252

2270-
def _start_create_update_pipelines(self, create_pipelines, update_pipelines, sender):
2253+
def _start_create_update_pipelines(self, create_pipelines, update_pipelines, owner):
22712254
"""
22722255
Start the create and update pipelines.
22732256
"""
@@ -2283,7 +2266,7 @@ def _start_create_update_pipelines(self, create_pipelines, update_pipelines, sen
22832266
name=pipeline['app_id'],
22842267
pipeline_type=pipeline['pipeline_type'],
22852268
node_address=node,
2286-
owner=sender,
2269+
owner=owner,
22872270
url=pipeline.get('url'),
22882271
plugins=pipeline['plugins'],
22892272
is_deeployed=True,

ver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__VER__ = '2.9.940'
1+
__VER__ = '2.9.941'

0 commit comments

Comments
 (0)