Skip to content

Commit 27c5a83

Browse files
author
Alan Christie
committed
feat: Add replica to step creation (and step-by-name query)
1 parent b4be311 commit 27c5a83

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

tests/wapi_adapter.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,12 @@ def create_running_workflow_step(
136136
*,
137137
running_workflow_id: str,
138138
step: str,
139+
replica: int = 0,
139140
prior_running_workflow_step_id: str | None = None,
140141
) -> dict[str, Any]:
142+
if replica:
143+
assert replica > 0
144+
141145
UnitTestWorkflowAPIAdapter.lock.acquire()
142146
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
143147
running_workflow_step = Unpickler(pickle_file).load()
@@ -150,6 +154,7 @@ def create_running_workflow_step(
150154
"name": step,
151155
"done": False,
152156
"success": False,
157+
"replica": replica,
153158
"variables": {},
154159
"running_workflow": {"id": running_workflow_id},
155160
}
@@ -177,24 +182,28 @@ def get_running_workflow_step(
177182
return {}, 0
178183
response = running_workflow_step[running_workflow_step_id]
179184
response["id"] = running_workflow_step_id
185+
if response["replica"] == 0:
186+
_ = response.pop("replica")
180187
return response, 0
181188

182189
def get_running_workflow_step_by_name(
183-
self, *, name: str, running_workflow_id: str
190+
self, *, name: str, running_workflow_id: str, replica: int = 0
184191
) -> dict[str, Any]:
192+
if replica:
193+
assert replica > 0
185194
UnitTestWorkflowAPIAdapter.lock.acquire()
186195
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
187196
running_workflow_step = Unpickler(pickle_file).load()
188197
UnitTestWorkflowAPIAdapter.lock.release()
189198

190-
print(f"name={name} running_workflow_id={running_workflow_id}")
191199
for rwfs_id, record in running_workflow_step.items():
192-
print(f"rwfs_id={rwfs_id} record={record}")
193200
if record["running_workflow"]["id"] != running_workflow_id:
194201
continue
195-
if record["name"] == name:
202+
if record["name"] == name and record["replica"] == replica:
196203
response = record
197204
response["id"] = rwfs_id
205+
if record["replica"] == 0:
206+
_ = response.pop("replica")
198207
return response, 0
199208
return {}, 0
200209

workflow/workflow_abc.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,14 @@ def create_running_workflow_step(
205205
*,
206206
running_workflow_id: str,
207207
step: str,
208+
replica: int = 0,
208209
prior_running_workflow_step_id: str | None = None,
209210
) -> tuple[dict[str, Any], int]:
210-
"""Create a RunningWorkflowStep Record (from a RunningWorkflow)"""
211+
"""Create a RunningWorkflowStep Record (from a RunningWorkflow).
212+
If this is a replica (concurrent execution) of a step the replica
213+
value must be set to a value greater than 0. The replica is unique
214+
for a given step and is used to distinguish between running workflow steps
215+
generated from the same step name."""
211216
# Should return:
212217
# {
213218
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
@@ -244,10 +249,12 @@ def get_running_workflow_step(
244249

245250
@abstractmethod
246251
def get_running_workflow_step_by_name(
247-
self, *, name: str, running_workflow_id: str
252+
self, *, name: str, running_workflow_id: str, replica: int = 0
248253
) -> tuple[dict[str, Any], int]:
249254
"""Get a RunningWorkflowStep Record given a step name
250-
(and its RUnningWorkflow ID)"""
255+
(and its RunningWorkflow ID). For steps that may be replicated
256+
the replica, a value of 1 or higher, is used to identify the specific replica.
257+
"""
251258
# Should return:
252259
# {
253260
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",

0 commit comments

Comments
 (0)