Skip to content

Commit a313c42

Browse files
committed
integrate into worker
1 parent 2c1e5ca commit a313c42

File tree

2 files changed

+8
-13
lines changed

2 files changed

+8
-13
lines changed

cadence/worker/_decision.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import asyncio
22
from typing import Optional
33

4-
from cadence.api.v1.common_pb2 import Payload
5-
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse, \
6-
RespondDecisionTaskFailedRequest
4+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse
75
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
8-
from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause
96
from cadence.client import Client
107
from cadence.worker._poller import Poller
118
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT
9+
from cadence.worker._decision_task_handler import DecisionTaskHandler
10+
from cadence.worker._registry import Registry
1211

1312

1413
class DecisionWorker:
15-
def __init__(self, client: Client, task_list: str, options: WorkerOptions) -> None:
14+
def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None:
1615
self._client = client
1716
self._task_list = task_list
1817
self._identity = options["identity"]
1918
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"])
19+
self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options)
2020
self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute)
2121
# TODO: Sticky poller, actually running workflows, etc.
2222

@@ -30,17 +30,12 @@ async def _poll(self) -> Optional[PollForDecisionTaskResponse]:
3030
identity=self._identity,
3131
), timeout=_LONG_POLL_TIMEOUT)
3232

33-
if task.task_token:
33+
if task and task.task_token:
3434
return task
3535
else:
3636
return None
3737

3838

3939
async def _execute(self, task: PollForDecisionTaskResponse) -> None:
40-
await self._client.worker_stub.RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest(
41-
task_token=task.task_token,
42-
cause=DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION,
43-
identity=self._identity,
44-
details=Payload(data=b'not implemented')
45-
))
40+
await self._decision_handler.handle_task(task)
4641

cadence/worker/_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(self, client: Client, task_list: str, registry: Registry, **kwargs:
1919
_validate_and_copy_defaults(client, task_list, options)
2020
self._options = options
2121
self._activity_worker = ActivityWorker(client, task_list, registry, options)
22-
self._decision_worker = DecisionWorker(client, task_list, options)
22+
self._decision_worker = DecisionWorker(client, task_list, registry, options)
2323

2424

2525
async def run(self) -> None:

0 commit comments

Comments
 (0)