Skip to content

Commit a27aeec

Browse files
author
Andrei Neagu
committed
renamed store proxy interfaces
1 parent 382d04c commit a27aeec

File tree

6 files changed

+107
-85
lines changed

6 files changed

+107
-85
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async def start_operation(
9797
schedule_data_proxy = ScheduleDataStoreProxy(
9898
store=self._store, schedule_id=schedule_id
9999
)
100-
await schedule_data_proxy.set_multiple(
100+
await schedule_data_proxy.create_or_update_multiple(
101101
{
102102
"operation_name": operation_name,
103103
"group_index": 0,
@@ -110,7 +110,7 @@ async def start_operation(
110110
schedule_id=schedule_id,
111111
operation_name=operation_name,
112112
)
113-
await operation_content_proxy.set_provided_context(initial_operation_context)
113+
await operation_content_proxy.create_or_update(initial_operation_context)
114114

115115
await enqueue_schedule_event(self.app, schedule_id)
116116
return schedule_id
@@ -127,7 +127,7 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
127127
store=self._store, schedule_id=schedule_id
128128
)
129129

130-
is_creating = await schedule_data_proxy.get("is_creating")
130+
is_creating = await schedule_data_proxy.read("is_creating")
131131

132132
if is_creating is False:
133133
_logger.warning(
@@ -136,8 +136,8 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
136136
)
137137
return
138138

139-
operation_name = await schedule_data_proxy.get("operation_name")
140-
group_index = await schedule_data_proxy.get("group_index")
139+
operation_name = await schedule_data_proxy.read("operation_name")
140+
group_index = await schedule_data_proxy.read("group_index")
141141

142142
operation = OperationRegistry.get_operation(operation_name)
143143
group = operation[group_index]
@@ -172,9 +172,9 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
172172
f"Cancelling step {step_name=} of {operation_name=} for {schedule_id=}",
173173
):
174174
with suppress(NoDataFoundError):
175-
deferred_task_uid = await step_proxy.get("deferred_task_uid")
175+
deferred_task_uid = await step_proxy.read("deferred_task_uid")
176176
await DeferredRunner.cancel(deferred_task_uid)
177-
await step_proxy.set("status", StepStatus.CANCELLED)
177+
await step_proxy.create_or_update("status", StepStatus.CANCELLED)
178178

179179
await limited_gather(
180180
*(
@@ -198,9 +198,9 @@ async def restart_operation_step_stuck_in_error(
198198
schedule_data_proxy = ScheduleDataStoreProxy(
199199
store=self._store, schedule_id=schedule_id
200200
)
201-
is_creating = await schedule_data_proxy.get("is_creating")
202-
operation_name = await schedule_data_proxy.get("operation_name")
203-
group_index = await schedule_data_proxy.get("group_index")
201+
is_creating = await schedule_data_proxy.read("is_creating")
202+
operation_name = await schedule_data_proxy.read("operation_name")
203+
group_index = await schedule_data_proxy.read("group_index")
204204

205205
operation = OperationRegistry.get_operation(operation_name)
206206
step_group = operation[group_index]
@@ -225,7 +225,7 @@ async def restart_operation_step_stuck_in_error(
225225
)
226226

227227
try:
228-
await step_proxy.get("error_traceback")
228+
await step_proxy.read("error_traceback")
229229
except NoDataFoundError as exc:
230230
raise StepNotInErrorStateError(step_name=step_name) from exc
231231

@@ -237,7 +237,7 @@ async def restart_operation_step_stuck_in_error(
237237
if in_manual_intervention:
238238
requires_manual_intervention: bool = False
239239
with suppress(NoDataFoundError):
240-
requires_manual_intervention = await step_proxy.get(
240+
requires_manual_intervention = await step_proxy.read(
241241
"requires_manual_intervention"
242242
)
243243

@@ -330,9 +330,9 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
330330
store=self._store, schedule_id=schedule_id
331331
)
332332

333-
operation_name = await schedule_data_proxy.get("operation_name")
334-
is_creating = await schedule_data_proxy.get("is_creating")
335-
group_index = await schedule_data_proxy.get("group_index")
333+
operation_name = await schedule_data_proxy.read("operation_name")
334+
is_creating = await schedule_data_proxy.read("is_creating")
335+
group_index = await schedule_data_proxy.read("group_index")
336336

337337
operation = OperationRegistry.get_operation(operation_name)
338338
step_group = operation[group_index]
@@ -434,7 +434,7 @@ async def _advance_as_repeating(
434434
steps_stauses = await get_steps_statuses(step_proxies)
435435
if any(status == StepStatus.CANCELLED for status in steps_stauses.values()):
436436
# NOTE:
437-
await schedule_data_proxy.set("is_creating", value=False)
437+
await schedule_data_proxy.create_or_update("is_creating", value=False)
438438
await enqueue_schedule_event(self.app, schedule_id)
439439
return
440440

@@ -477,7 +477,9 @@ async def _advance_as_creating(
477477
next_group_index = group_index + 1
478478
# does a next group exist?
479479
_ = operation[next_group_index]
480-
await schedule_data_proxy.set("group_index", value=next_group_index)
480+
await schedule_data_proxy.create_or_update(
481+
"group_index", value=next_group_index
482+
)
481483
await enqueue_schedule_event(self.app, schedule_id)
482484
except IndexError:
483485

@@ -504,7 +506,9 @@ async def _advance_as_creating(
504506
step_name=step.get_step_name(),
505507
is_creating=True,
506508
)
507-
await step_proxy.set("requires_manual_intervention", value=True)
509+
await step_proxy.create_or_update(
510+
"requires_manual_intervention", value=True
511+
)
508512
manual_intervention_step_names.add(step.get_step_name())
509513

510514
if manual_intervention_step_names:
@@ -528,7 +532,7 @@ async def _advance_as_creating(
528532
logging.DEBUG,
529533
f"{operation_name=} was not successfull: {steps_statuses=}, moving to revert",
530534
):
531-
await schedule_data_proxy.set("is_creating", value=False)
535+
await schedule_data_proxy.create_or_update("is_creating", value=False)
532536
await enqueue_schedule_event(self.app, schedule_id)
533537
return
534538

@@ -564,7 +568,9 @@ async def _advance_as_reverting(
564568
return
565569

566570
# 1b) -> move to previous group
567-
await schedule_data_proxy.set("group_index", value=previous_group_index)
571+
await schedule_data_proxy.create_or_update(
572+
"group_index", value=previous_group_index
573+
)
568574
await enqueue_schedule_event(self.app, schedule_id)
569575
return
570576

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core_utils.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def are_any_steps_in_a_progress_status(
5555

5656
async def _get_step_status(step_proxy: StepStoreProxy) -> tuple[StepName, StepStatus]:
5757
try:
58-
status = await step_proxy.get("status")
58+
status = await step_proxy.read("status")
5959
except NoDataFoundError:
6060
status = StepStatus.UNKNOWN
6161

@@ -86,7 +86,7 @@ async def start_and_mark_as_started(
8686
is_creating=is_creating,
8787
expected_steps_count=expected_steps_count,
8888
)
89-
await step_proxy.set_multiple(
89+
await step_proxy.create_or_update_multiple(
9090
{"deferred_created": True, "status": StepStatus.SCHEDULED}
9191
)
9292

@@ -119,7 +119,7 @@ async def get_step_error_traceback(
119119
step_name=step_name,
120120
is_creating=False,
121121
)
122-
return step_name, await step_proxy.get("error_traceback")
122+
return step_name, await step_proxy.read("error_traceback")
123123

124124

125125
def get_group_step_proxies(
@@ -148,7 +148,7 @@ async def _get_was_step_started(
148148
step_proxy: StepStoreProxy,
149149
) -> tuple[bool, StepStoreProxy]:
150150
try:
151-
was_stated = (await step_proxy.get("deferred_created")) is True
151+
was_stated = (await step_proxy.read("deferred_created")) is True
152152
except NoDataFoundError:
153153
was_stated = False
154154

@@ -208,7 +208,7 @@ async def cleanup_after_finishing(
208208

209209
async def get_requires_manual_intervention(step_proxy: StepStoreProxy) -> bool:
210210
try:
211-
return await step_proxy.get("requires_manual_intervention")
211+
return await step_proxy.read("requires_manual_intervention")
212212
except NoDataFoundError:
213213
return False
214214

@@ -220,7 +220,7 @@ async def set_unexpected_opration_state(
220220
message: str,
221221
) -> None:
222222
schedule_data_proxy = ScheduleDataStoreProxy(store=store, schedule_id=schedule_id)
223-
await schedule_data_proxy.set_multiple(
223+
await schedule_data_proxy.create_or_update_multiple(
224224
{
225225
"operation_error_type": operation_error_type,
226226
"operation_error_message": message,

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async def get_timeout(cls, context: DeferredContext) -> timedelta:
158158

159159
@classmethod
160160
async def on_created(cls, task_uid: TaskUID, context: DeferredContext) -> None:
161-
await get_step_store_proxy(context).set_multiple(
161+
await get_step_store_proxy(context).create_or_update_multiple(
162162
{"deferred_task_uid": task_uid, "status": StepStatus.CREATED}
163163
)
164164

@@ -167,14 +167,16 @@ async def run(cls, context: DeferredContext) -> None:
167167
app = context["app"]
168168
is_creating = context["is_creating"]
169169

170-
await get_step_store_proxy(context).set("status", StepStatus.RUNNING)
170+
await get_step_store_proxy(context).create_or_update(
171+
"status", StepStatus.RUNNING
172+
)
171173

172174
step = _get_step(context)
173175

174176
operation_context_proxy = get_operation_context_proxy(context)
175177

176178
if is_creating:
177-
required_context = await operation_context_proxy.get_required_context(
179+
required_context = await operation_context_proxy.read(
178180
*step.get_create_requires_context_keys()
179181
)
180182
_raise_if_any_context_value_is_none(required_context)
@@ -187,7 +189,7 @@ async def run(cls, context: DeferredContext) -> None:
187189
provided_operation_context, create_provides_keys
188190
)
189191
else:
190-
required_context = await operation_context_proxy.get_required_context(
192+
required_context = await operation_context_proxy.read(
191193
*step.get_revert_requires_context_keys()
192194
)
193195
_raise_if_any_context_value_is_none(required_context)
@@ -200,27 +202,31 @@ async def run(cls, context: DeferredContext) -> None:
200202
provided_operation_context, revert_provides_keys
201203
)
202204

203-
await operation_context_proxy.set_provided_context(provided_operation_context)
205+
await operation_context_proxy.create_or_update(provided_operation_context)
204206

205207
@classmethod
206208
async def on_result(cls, result: None, context: DeferredContext) -> None:
207209
_ = result
208-
await get_step_store_proxy(context).set("status", StepStatus.SUCCESS)
210+
await get_step_store_proxy(context).create_or_update(
211+
"status", StepStatus.SUCCESS
212+
)
209213

210214
await _enqueue_schedule_event_if_group_is_done(context)
211215

212216
@classmethod
213217
async def on_finished_with_error(
214218
cls, error: TaskResultError, context: DeferredContext
215219
) -> None:
216-
await get_step_store_proxy(context).set_multiple(
220+
await get_step_store_proxy(context).create_or_update_multiple(
217221
{"status": StepStatus.FAILED, "error_traceback": error.format_error()}
218222
)
219223

220224
await _enqueue_schedule_event_if_group_is_done(context)
221225

222226
@classmethod
223227
async def on_cancelled(cls, context: DeferredContext) -> None:
224-
await get_step_store_proxy(context).set("status", StepStatus.CANCELLED)
228+
await get_step_store_proxy(context).create_or_update(
229+
"status", StepStatus.CANCELLED
230+
)
225231

226232
await _enqueue_schedule_event_if_group_is_done(context)

0 commit comments

Comments
 (0)