Skip to content

Commit f01e223

Browse files
committed
add purge workflow function
Signed-off-by: Fabian Martinez <[email protected]>
1 parent 8399ac9 commit f01e223

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def raise_workflow_event(
208208
"""
209209
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)
210210

211-
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
211+
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True):
212212
"""Terminates a running workflow instance and updates its runtime status to
213213
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
214214
the task hub. When the task hub worker processes this message, it will update the runtime
@@ -226,9 +226,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
226226
Args:
227227
instance_id: The ID of the workflow instance to terminate.
228228
output: The optional output to set for the terminated workflow instance.
229+
recursive: The optional flag to terminate all child workflows.
229230
230231
"""
231-
return self.__obj.terminate_orchestration(instance_id, output=output)
232+
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)
232233

233234
def pause_workflow(self, instance_id: str):
234235
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
@@ -246,3 +247,12 @@ def resume_workflow(self, instance_id: str):
246247
instance_id: The instance ID of the workflow to resume.
247248
"""
248249
return self.__obj.resume_orchestration(instance_id)
250+
251+
def purge_workflow(self, instance_id: str, recursive: bool = True):
252+
"""Purge data from a workflow instance.
253+
254+
Args:
255+
instance_id: The instance ID of the workflow to purge.
256+
recursive: The optional flag to also purge data from all child workflows.
257+
"""
258+
return self.__obj.purge_orchestration(instance_id, recursive)

ext/dapr-ext-workflow/tests/test_workflow_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
mock_terminate_result = 'terminate001'
2727
mock_suspend_result = 'suspend001'
2828
mock_resume_result = 'resume001'
29+
mock_purge_result = 'purge001'
2930
mockInstanceId = 'instance001'
3031

3132

@@ -58,6 +59,9 @@ def suspend_orchestration(self, instance_id: str):
5859
def resume_orchestration(self, instance_id: str):
5960
return mock_resume_result
6061

62+
def purge_workflow(self, instance_id: str, recursive: bool = True):
63+
return mock_purge_result
64+
6165
def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
6266
return client.OrchestrationState(
6367
instance_id=instance_id,
@@ -119,3 +123,6 @@ def test_client_functions(self):
119123

120124
actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
121125
assert actual_resume_result == mock_resume_result
126+
127+
actual_purge_result = wfClient.purge_workflow(instance_id=mockInstanceId)
128+
assert actual_purge_result == mock_purge_result

0 commit comments

Comments
 (0)