Skip to content

Commit 5834c8c

Browse files
author
Alan Christie
committed
fix: Add get_status_of_all_step_instances_by_name implementation (and fix step replicas)
1 parent f6707c8 commit 5834c8c

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

tests/instance_launcher.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
8989
running_workflow_id=launch_parameters.running_workflow_id,
9090
step=launch_parameters.step_name,
9191
replica=launch_parameters.step_replication_number,
92+
replicas=launch_parameters.total_number_of_replicas,
9293
)
9394
assert "id" in response
9495
rwfs_id: str = response["id"]

tests/wapi_adapter.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,24 @@ def get_running_steps(
113113
return {"count": 0, "steps": []}, 0
114114

115115
def get_status_of_all_step_instances_by_name(
116-
self, *, running_workflow_id: str, step_name: str
116+
self, *, running_workflow_id: str, name: str
117117
) -> tuple[dict[str, Any], int]:
118-
# Need to implement!
119-
return {"count": 0, "status": []}, 0
118+
UnitTestWorkflowAPIAdapter.lock.acquire()
119+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
120+
running_workflow_step = Unpickler(pickle_file).load()
121+
UnitTestWorkflowAPIAdapter.lock.release()
122+
123+
steps: list[dict[str, Any]] = []
124+
for rwfs_id, record in running_workflow_step.items():
125+
if record["running_workflow"]["id"] != running_workflow_id:
126+
continue
127+
if record["name"] == name:
128+
response = record
129+
response["id"] = rwfs_id
130+
if record["replica"] == 0:
131+
_ = response.pop("replica")
132+
steps.append(response)
133+
return {"count": len(steps), "status": steps}, 0
120134

121135
def set_running_workflow_done(
122136
self,
@@ -146,6 +160,7 @@ def create_running_workflow_step(
146160
running_workflow_id: str,
147161
step: str,
148162
replica: int = 0,
163+
replicas: int = 0,
149164
prior_running_workflow_step_id: str | None = None,
150165
) -> tuple[dict[str, Any], int]:
151166
if replica:
@@ -164,6 +179,7 @@ def create_running_workflow_step(
164179
"done": False,
165180
"success": False,
166181
"replica": replica,
182+
"replicas": replicas,
167183
"variables": {},
168184
"running_workflow": {"id": running_workflow_id},
169185
}

workflow/workflow_abc.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def get_running_steps(
182182

183183
@abstractmethod
184184
def get_status_of_all_step_instances_by_name(
185-
self, *, running_workflow_id: str, step_name: str
185+
self, *, name: str, running_workflow_id: str
186186
) -> tuple[dict[str, Any], int]:
187187
"""Get a list of step execution statuses for the named step."""
188188
# Should return:
@@ -226,6 +226,8 @@ def get_running_workflow_step(
226226
# "success": False,
227227
# "error_num": 0,
228228
# "error_msg": "",
229+
# "replica": 0,
230+
# "replicas": 0,
229231
# "variables": {
230232
# "x": 1,
231233
# "y": 2,

0 commit comments

Comments
 (0)