11import asyncio
22from typing import Optional
33
4- from cadence .api .v1 .common_pb2 import Payload
5- from cadence .api .v1 .service_worker_pb2 import PollForDecisionTaskRequest , PollForDecisionTaskResponse , \
6- RespondDecisionTaskFailedRequest
4+ from cadence .api .v1 .service_worker_pb2 import PollForDecisionTaskRequest , PollForDecisionTaskResponse
75from cadence .api .v1 .tasklist_pb2 import TaskList , TaskListKind
8- from cadence .api .v1 .workflow_pb2 import DecisionTaskFailedCause
96from cadence .client import Client
107from cadence .worker ._poller import Poller
118from cadence .worker ._types import WorkerOptions , _LONG_POLL_TIMEOUT
9+ from cadence .worker ._decision_task_handler import DecisionTaskHandler
10+ from cadence .worker ._registry import Registry
1211
1312
1413class DecisionWorker :
15- def __init__ (self , client : Client , task_list : str , options : WorkerOptions ) -> None :
14+ def __init__ (self , client : Client , task_list : str , registry : Registry , options : WorkerOptions ) -> None :
1615 self ._client = client
1716 self ._task_list = task_list
1817 self ._identity = options ["identity" ]
1918 permits = asyncio .Semaphore (options ["max_concurrent_decision_task_execution_size" ])
19+ self ._decision_handler = DecisionTaskHandler (client , task_list , registry , ** options )
2020 self ._poller = Poller [PollForDecisionTaskResponse ](options ["decision_task_pollers" ], permits , self ._poll , self ._execute )
2121 # TODO: Sticky poller, actually running workflows, etc.
2222
@@ -30,17 +30,12 @@ async def _poll(self) -> Optional[PollForDecisionTaskResponse]:
3030 identity = self ._identity ,
3131 ), timeout = _LONG_POLL_TIMEOUT )
3232
33- if task .task_token :
33+ if task and task .task_token :
3434 return task
3535 else :
3636 return None
3737
3838
3939 async def _execute (self , task : PollForDecisionTaskResponse ) -> None :
40- await self ._client .worker_stub .RespondDecisionTaskFailed (RespondDecisionTaskFailedRequest (
41- task_token = task .task_token ,
42- cause = DecisionTaskFailedCause .DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION ,
43- identity = self ._identity ,
44- details = Payload (data = b'not implemented' )
45- ))
40+ await self ._decision_handler .handle_task (task )
4641
0 commit comments