Skip to content

Commit 8b478a7

Browse files
committed
support reuse id policy
Signed-off-by: Fabian Martinez <[email protected]>
1 parent f01e223 commit 8b478a7

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

examples/workflow/monitor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
6969
except Exception:
7070
pass
7171
if not status or status.runtime_status.name != 'RUNNING':
72+
# TODO update to use reuse_id_policy
7273
instance_id = wf_client.schedule_new_workflow(
7374
workflow=status_monitor_workflow,
7475
input=JobStatus(job_id=job_id, is_healthy=True),

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515

1616
from __future__ import annotations
1717
from datetime import datetime
18-
from typing import Any, Optional, TypeVar
18+
from typing import Any, Optional, TypeVar, Union
19+
1920

2021
from durabletask import client
22+
import durabletask.internal.orchestrator_service_pb2 as pb
2123

2224
from dapr.ext.workflow.workflow_state import WorkflowState
2325
from dapr.ext.workflow.workflow_context import Workflow
@@ -78,6 +80,7 @@ def schedule_new_workflow(
7880
input: Optional[TInput] = None,
7981
instance_id: Optional[str] = None,
8082
start_at: Optional[datetime] = None,
83+
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
8184
) -> str:
8285
"""Schedules a new workflow instance for execution.
8386
@@ -90,6 +93,8 @@ def schedule_new_workflow(
9093
start_at: The time when the workflow instance should start executing.
9194
If not specified or if a date-time in the past is specified, the workflow instance will
9295
be scheduled immediately.
96+
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
97+
an existing workflow instance.
9398
9499
Returns:
95100
The ID of the scheduled workflow instance.
@@ -100,9 +105,10 @@ def schedule_new_workflow(
100105
input=input,
101106
instance_id=instance_id,
102107
start_at=start_at,
108+
reuse_id_policy=Union(reuse_id_policy),
103109
)
104110
return self.__obj.schedule_new_orchestration(
105-
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
111+
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at, reuse_id_policy=Union(reuse_id_policy),
106112
)
107113

108114
def get_workflow_state(

0 commit comments

Comments
 (0)