Skip to content

Commit 200d1a1

Browse files
committed
Merge branch 'main' into filinto/asyncio
2 parents e27e93b + 06357df commit 200d1a1

File tree

5 files changed

+210
-48
lines changed

5 files changed

+210
-48
lines changed

CODEOWNERS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# These owners are the maintainers and approvers of this repo
2+
* @dapr/maintainers-python-sdk @dapr/approvers-python-sdk

durabletask/internal/helpers.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,14 +239,15 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
239239
)
240240

241241

242-
def new_schedule_task_action(
243-
id: int, name: str, encoded_input: Optional[str]
244-
) -> pb.OrchestratorAction:
242+
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None) -> pb.OrchestratorAction:
245243
return pb.OrchestratorAction(
246244
id=id,
247245
scheduleTask=pb.ScheduleTaskAction(
248-
name=name, input=get_string_value(encoded_input)
246+
name=name,
247+
input=get_string_value(encoded_input),
248+
router=router,
249249
),
250+
router=router,
250251
)
251252

252253

@@ -257,13 +258,20 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
257258

258259

259260
def new_create_sub_orchestration_action(
260-
id: int, name: str, instance_id: Optional[str], encoded_input: Optional[str]
261-
) -> pb.OrchestratorAction:
261+
id: int,
262+
name: str,
263+
instance_id: Optional[str],
264+
encoded_input: Optional[str],
265+
router: Optional[pb.TaskRouter] = None) -> pb.OrchestratorAction:
262266
return pb.OrchestratorAction(
263267
id=id,
264268
createSubOrchestration=pb.CreateSubOrchestrationAction(
265-
name=name, instanceId=instance_id, input=get_string_value(encoded_input)
269+
name=name,
270+
instanceId=instance_id,
271+
input=get_string_value(encoded_input),
272+
router=router,
266273
),
274+
router=router,
267275
)
268276

269277

durabletask/task.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> "Task[Any]":
146146
pass
147147

148148
@abstractmethod
149-
def call_activity(
150-
self,
151-
activity: Union[Activity[TInput, TOutput], str],
152-
*,
153-
input: Optional[TInput] = None,
154-
retry_policy: Optional[RetryPolicy] = None,
155-
) -> Task[TOutput]:
149+
def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
150+
input: Optional[TInput] = None,
151+
retry_policy: Optional[RetryPolicy] = None,
152+
app_id: Optional[str] = None) -> Task[TOutput]:
156153
"""Schedule an activity for execution.
157154
158155
Parameters
@@ -163,6 +160,8 @@ def call_activity(
163160
The JSON-serializable input (or None) to pass to the activity.
164161
retry_policy: Optional[RetryPolicy]
165162
The retry policy to use for this activity call.
163+
app_id: Optional[str]
164+
The app ID that will execute the activity. If not specified, the activity will be executed by the same app as the orchestrator.
166165
167166
Returns
168167
-------
@@ -172,14 +171,11 @@ def call_activity(
172171
pass
173172

174173
@abstractmethod
175-
def call_sub_orchestrator(
176-
self,
177-
orchestrator: Orchestrator[TInput, TOutput],
178-
*,
179-
input: Optional[TInput] = None,
180-
instance_id: Optional[str] = None,
181-
retry_policy: Optional[RetryPolicy] = None,
182-
) -> Task[TOutput]:
174+
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
175+
input: Optional[TInput] = None,
176+
instance_id: Optional[str] = None,
177+
retry_policy: Optional[RetryPolicy] = None,
178+
app_id: Optional[str] = None) -> Task[TOutput]:
183179
"""Schedule sub-orchestrator function for execution.
184180
185181
Parameters
@@ -193,6 +189,8 @@ def call_sub_orchestrator(
193189
random UUID will be used.
194190
retry_policy: Optional[RetryPolicy]
195191
The retry policy to use for this sub-orchestrator call.
192+
app_id: Optional[str]
193+
The app ID that will execute the sub-orchestrator. If not specified, the sub-orchestrator will be executed by the same app as the orchestrator.
196194
197195
Returns
198196
-------

durabletask/worker.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ def __init__(self, instance_id: str):
818818
self._sequence_number = 0
819819
self._current_utc_datetime = datetime(1000, 1, 1)
820820
self._instance_id = instance_id
821+
self._app_id = None
821822
self._completion_status: Optional[pb.OrchestrationStatus] = None
822823
self._received_events: dict[str, list[Any]] = {}
823824
self._pending_events: dict[str, list[task.CompletableTask]] = {}
@@ -970,6 +971,10 @@ def next_sequence_number(self) -> int:
970971
self._sequence_number += 1
971972
return self._sequence_number
972973

974+
@property
975+
def app_id(self) -> str:
976+
return self._app_id
977+
973978
@property
974979
def instance_id(self) -> str:
975980
return self._instance_id
@@ -1042,55 +1047,65 @@ def create_timer_internal(
10421047
return timer_task
10431048

10441049
def call_activity(
1045-
self,
1046-
activity: Union[task.Activity[TInput, TOutput], str],
1047-
*,
1048-
input: Optional[TInput] = None,
1049-
retry_policy: Optional[task.RetryPolicy] = None,
1050+
self,
1051+
activity: Union[task.Activity[TInput, TOutput], str],
1052+
*,
1053+
input: Optional[TInput] = None,
1054+
retry_policy: Optional[task.RetryPolicy] = None,
1055+
app_id: Optional[str] = None,
10501056
) -> task.Task[TOutput]:
10511057
id = self.next_sequence_number()
10521058

10531059
self.call_activity_function_helper(
1054-
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False
1060+
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, app_id=app_id
10551061
)
10561062
return self._pending_tasks.get(id, task.CompletableTask())
10571063

10581064
def call_sub_orchestrator(
1059-
self,
1060-
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
1061-
*,
1062-
input: Optional[TInput] = None,
1063-
instance_id: Optional[str] = None,
1064-
retry_policy: Optional[task.RetryPolicy] = None,
1065+
self,
1066+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
1067+
*,
1068+
input: Optional[TInput] = None,
1069+
instance_id: Optional[str] = None,
1070+
retry_policy: Optional[task.RetryPolicy] = None,
1071+
app_id: Optional[str] = None,
10651072
) -> task.Task[TOutput]:
10661073
id = self.next_sequence_number()
1067-
orchestrator_name = (
1068-
orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
1069-
)
1074+
if isinstance(orchestrator, str):
1075+
orchestrator_name = orchestrator
1076+
else:
1077+
orchestrator_name = task.get_name(orchestrator)
10701078
self.call_activity_function_helper(
10711079
id,
10721080
orchestrator_name,
10731081
input=input,
10741082
retry_policy=retry_policy,
10751083
is_sub_orch=True,
10761084
instance_id=instance_id,
1085+
app_id=app_id,
10771086
)
10781087
return self._pending_tasks.get(id, task.CompletableTask())
10791088

10801089
def call_activity_function_helper(
1081-
self,
1082-
id: Optional[int],
1083-
activity_function: Union[task.Activity[TInput, TOutput], str],
1084-
*,
1085-
input: Optional[TInput] = None,
1086-
retry_policy: Optional[task.RetryPolicy] = None,
1087-
is_sub_orch: bool = False,
1088-
instance_id: Optional[str] = None,
1089-
fn_task: Optional[task.CompletableTask[TOutput]] = None,
1090+
self,
1091+
id: Optional[int],
1092+
activity_function: Union[task.Activity[TInput, TOutput], str],
1093+
*,
1094+
input: Optional[TInput] = None,
1095+
retry_policy: Optional[task.RetryPolicy] = None,
1096+
is_sub_orch: bool = False,
1097+
instance_id: Optional[str] = None,
1098+
fn_task: Optional[task.CompletableTask[TOutput]] = None,
1099+
app_id: Optional[str] = None,
10901100
):
10911101
if id is None:
10921102
id = self.next_sequence_number()
10931103

1104+
router = pb.TaskRouter()
1105+
router.sourceAppID = self._app_id
1106+
if app_id is not None:
1107+
router.targetAppID = app_id
1108+
10941109
if fn_task is None:
10951110
encoded_input = shared.to_json(input) if input is not None else None
10961111
else:
@@ -1132,15 +1147,15 @@ def call_activity_function_helper(
11321147
if isinstance(activity_function, str)
11331148
else task.get_name(activity_function)
11341149
)
1135-
action = ph.new_schedule_task_action(id, name, encoded_input)
1150+
action = ph.new_schedule_task_action(id, name, encoded_input, router)
11361151
else:
11371152
if instance_id is None:
11381153
# Create a deteministic instance ID based on the parent instance ID
11391154
instance_id = f"{self.instance_id}:{id:04x}"
11401155
if not isinstance(activity_function, str):
11411156
raise ValueError("Orchestrator function name must be a string")
11421157
action = ph.new_create_sub_orchestration_action(
1143-
id, activity_function, instance_id, encoded_input
1158+
id, activity_function, instance_id, encoded_input, router
11441159
)
11451160
self._pending_actions[id] = action
11461161

@@ -1285,6 +1300,11 @@ def process_event(
12851300
if event.HasField("orchestratorStarted"):
12861301
ctx.current_utc_datetime = event.timestamp.ToDatetime()
12871302
elif event.HasField("executionStarted"):
1303+
if event.router.targetAppID:
1304+
ctx._app_id = event.router.targetAppID
1305+
else:
1306+
ctx._app_id = event.router.sourceAppID
1307+
12881308
# TODO: Check if we already started the orchestration
12891309
fn = self._registry.get_orchestrator(event.executionStarted.name)
12901310
if fn is None:
@@ -1398,6 +1418,11 @@ def process_event(
13981418
else:
13991419
cur_task = activity_action.createSubOrchestration
14001420
instance_id = cur_task.instanceId
1421+
if cur_task.router and cur_task.router.targetAppID:
1422+
target_app_id = cur_task.router.targetAppID
1423+
else:
1424+
target_app_id = None
1425+
14011426
ctx.call_activity_function_helper(
14021427
id=activity_action.id,
14031428
activity_function=cur_task.name,
@@ -1406,6 +1431,7 @@ def process_event(
14061431
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
14071432
instance_id=instance_id,
14081433
fn_task=timer_task._retryable_parent,
1434+
app_id=target_app_id,
14091435
)
14101436
else:
14111437
ctx.resume()

0 commit comments

Comments
 (0)