Skip to content

Commit 00f8d1c

Browse files
committed
autopep8
1 parent 2156df6 commit 00f8d1c

File tree

1 file changed

+69
-69
lines changed

1 file changed

+69
-69
lines changed

durabletask/worker.py

Lines changed: 69 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ class ConcurrencyOptions:
3535
"""
3636

3737
def __init__(
38-
self,
39-
maximum_concurrent_activity_work_items: Optional[int] = None,
40-
maximum_concurrent_orchestration_work_items: Optional[int] = None,
41-
maximum_thread_pool_workers: Optional[int] = None,
38+
self,
39+
maximum_concurrent_activity_work_items: Optional[int] = None,
40+
maximum_concurrent_orchestration_work_items: Optional[int] = None,
41+
maximum_thread_pool_workers: Optional[int] = None,
4242
):
4343
"""Initialize concurrency options.
4444
@@ -167,7 +167,7 @@ class TaskHubGrpcWorker:
167167
Example:
168168
Basic worker setup:
169169
170-
>>> from durabletask import TaskHubGrpcWorker, ConcurrencyOptions
170+
>>> from durabletask.worker import TaskHubGrpcWorker, ConcurrencyOptions
171171
>>>
172172
>>> # Create worker with custom concurrency settings
173173
>>> concurrency = ConcurrencyOptions(
@@ -215,15 +215,15 @@ class TaskHubGrpcWorker:
215215
_interceptors: Optional[list[shared.ClientInterceptor]] = None
216216

217217
def __init__(
218-
self,
219-
*,
220-
host_address: Optional[str] = None,
221-
metadata: Optional[list[tuple[str, str]]] = None,
222-
log_handler=None,
223-
log_formatter: Optional[logging.Formatter] = None,
224-
secure_channel: bool = False,
225-
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
226-
concurrency_options: Optional[ConcurrencyOptions] = None,
218+
self,
219+
*,
220+
host_address: Optional[str] = None,
221+
metadata: Optional[list[tuple[str, str]]] = None,
222+
log_handler=None,
223+
log_formatter: Optional[logging.Formatter] = None,
224+
secure_channel: bool = False,
225+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
226+
concurrency_options: Optional[ConcurrencyOptions] = None,
227227
):
228228
self._registry = _Registry()
229229
self._host_address = (
@@ -500,10 +500,10 @@ def stop(self):
500500
self._is_running = False
501501

502502
def _execute_orchestrator(
503-
self,
504-
req: pb.OrchestratorRequest,
505-
stub: stubs.TaskHubSidecarServiceStub,
506-
completionToken,
503+
self,
504+
req: pb.OrchestratorRequest,
505+
stub: stubs.TaskHubSidecarServiceStub,
506+
completionToken,
507507
):
508508
try:
509509
executor = _OrchestrationExecutor(self._registry, self._logger)
@@ -538,10 +538,10 @@ def _execute_orchestrator(
538538
)
539539

540540
def _execute_activity(
541-
self,
542-
req: pb.ActivityRequest,
543-
stub: stubs.TaskHubSidecarServiceStub,
544-
completionToken,
541+
self,
542+
req: pb.ActivityRequest,
543+
stub: stubs.TaskHubSidecarServiceStub,
544+
completionToken,
545545
):
546546
instance_id = req.orchestrationInstance.instanceId
547547
try:
@@ -626,10 +626,10 @@ def resume(self):
626626
self._previous_task = next_task
627627

628628
def set_complete(
629-
self,
630-
result: Any,
631-
status: pb.OrchestrationStatus,
632-
is_result_encoded: bool = False,
629+
self,
630+
result: Any,
631+
status: pb.OrchestrationStatus,
632+
is_result_encoded: bool = False,
633633
):
634634
if self._is_complete:
635635
return
@@ -731,9 +731,9 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
731731
return self.create_timer_internal(fire_at)
732732

733733
def create_timer_internal(
734-
self,
735-
fire_at: Union[datetime, timedelta],
736-
retryable_task: Optional[task.RetryableTask] = None,
734+
self,
735+
fire_at: Union[datetime, timedelta],
736+
retryable_task: Optional[task.RetryableTask] = None,
737737
) -> task.Task:
738738
id = self.next_sequence_number()
739739
if isinstance(fire_at, timedelta):
@@ -748,11 +748,11 @@ def create_timer_internal(
748748
return timer_task
749749

750750
def call_activity(
751-
self,
752-
activity: Union[task.Activity[TInput, TOutput], str],
753-
*,
754-
input: Optional[TInput] = None,
755-
retry_policy: Optional[task.RetryPolicy] = None,
751+
self,
752+
activity: Union[task.Activity[TInput, TOutput], str],
753+
*,
754+
input: Optional[TInput] = None,
755+
retry_policy: Optional[task.RetryPolicy] = None,
756756
) -> task.Task[TOutput]:
757757
id = self.next_sequence_number()
758758

@@ -762,12 +762,12 @@ def call_activity(
762762
return self._pending_tasks.get(id, task.CompletableTask())
763763

764764
def call_sub_orchestrator(
765-
self,
766-
orchestrator: task.Orchestrator[TInput, TOutput],
767-
*,
768-
input: Optional[TInput] = None,
769-
instance_id: Optional[str] = None,
770-
retry_policy: Optional[task.RetryPolicy] = None,
765+
self,
766+
orchestrator: task.Orchestrator[TInput, TOutput],
767+
*,
768+
input: Optional[TInput] = None,
769+
instance_id: Optional[str] = None,
770+
retry_policy: Optional[task.RetryPolicy] = None,
771771
) -> task.Task[TOutput]:
772772
id = self.next_sequence_number()
773773
orchestrator_name = task.get_name(orchestrator)
@@ -782,15 +782,15 @@ def call_sub_orchestrator(
782782
return self._pending_tasks.get(id, task.CompletableTask())
783783

784784
def call_activity_function_helper(
785-
self,
786-
id: Optional[int],
787-
activity_function: Union[task.Activity[TInput, TOutput], str],
788-
*,
789-
input: Optional[TInput] = None,
790-
retry_policy: Optional[task.RetryPolicy] = None,
791-
is_sub_orch: bool = False,
792-
instance_id: Optional[str] = None,
793-
fn_task: Optional[task.CompletableTask[TOutput]] = None,
785+
self,
786+
id: Optional[int],
787+
activity_function: Union[task.Activity[TInput, TOutput], str],
788+
*,
789+
input: Optional[TInput] = None,
790+
retry_policy: Optional[task.RetryPolicy] = None,
791+
is_sub_orch: bool = False,
792+
instance_id: Optional[str] = None,
793+
fn_task: Optional[task.CompletableTask[TOutput]] = None,
794794
):
795795
if id is None:
796796
id = self.next_sequence_number()
@@ -865,7 +865,7 @@ class ExecutionResults:
865865
encoded_custom_status: Optional[str]
866866

867867
def __init__(
868-
self, actions: list[pb.OrchestratorAction], encoded_custom_status: Optional[str]
868+
self, actions: list[pb.OrchestratorAction], encoded_custom_status: Optional[str]
869869
):
870870
self.actions = actions
871871
self.encoded_custom_status = encoded_custom_status
@@ -881,10 +881,10 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
881881
self._suspended_events: list[pb.HistoryEvent] = []
882882

883883
def execute(
884-
self,
885-
instance_id: str,
886-
old_events: Sequence[pb.HistoryEvent],
887-
new_events: Sequence[pb.HistoryEvent],
884+
self,
885+
instance_id: str,
886+
old_events: Sequence[pb.HistoryEvent],
887+
new_events: Sequence[pb.HistoryEvent],
888888
) -> ExecutionResults:
889889
if not new_events:
890890
raise task.OrchestrationStateError(
@@ -922,7 +922,7 @@ def execute(
922922
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
923923
)
924924
elif (
925-
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
925+
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
926926
):
927927
completion_status_str = pbh.get_orchestration_status_str(
928928
ctx._completion_status
@@ -941,7 +941,7 @@ def execute(
941941
)
942942

943943
def process_event(
944-
self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent
944+
self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent
945945
) -> None:
946946
if self._is_suspended and _is_suspendable(event):
947947
# We are suspended, so we need to buffer this event until we are resumed
@@ -963,7 +963,7 @@ def process_event(
963963
# deserialize the input, if any
964964
input = None
965965
if (
966-
event.executionStarted.input is not None and event.executionStarted.input.value != ""
966+
event.executionStarted.input is not None and event.executionStarted.input.value != ""
967967
):
968968
input = shared.from_json(event.executionStarted.input.value)
969969

@@ -1105,7 +1105,7 @@ def process_event(
11051105
task_id, expected_method_name, action
11061106
)
11071107
elif (
1108-
action.createSubOrchestration.name != event.subOrchestrationInstanceCreated.name
1108+
action.createSubOrchestration.name != event.subOrchestrationInstanceCreated.name
11091109
):
11101110
raise _get_wrong_action_name_error(
11111111
task_id,
@@ -1229,11 +1229,11 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
12291229
self._logger = logger
12301230

12311231
def execute(
1232-
self,
1233-
orchestration_id: str,
1234-
name: str,
1235-
task_id: int,
1236-
encoded_input: Optional[str],
1232+
self,
1233+
orchestration_id: str,
1234+
name: str,
1235+
task_id: int,
1236+
encoded_input: Optional[str],
12371237
) -> Optional[str]:
12381238
"""Executes an activity function and returns the serialized result, if any."""
12391239
self._logger.debug(
@@ -1262,7 +1262,7 @@ def execute(
12621262

12631263

12641264
def _get_non_determinism_error(
1265-
task_id: int, action_name: str
1265+
task_id: int, action_name: str
12661266
) -> task.NonDeterminismError:
12671267
return task.NonDeterminismError(
12681268
f"A previous execution called {action_name} with ID={task_id}, but the current "
@@ -1273,7 +1273,7 @@ def _get_non_determinism_error(
12731273

12741274

12751275
def _get_wrong_action_type_error(
1276-
task_id: int, expected_method_name: str, action: pb.OrchestratorAction
1276+
task_id: int, expected_method_name: str, action: pb.OrchestratorAction
12771277
) -> task.NonDeterminismError:
12781278
unexpected_method_name = _get_method_name_for_action(action)
12791279
return task.NonDeterminismError(
@@ -1286,7 +1286,7 @@ def _get_wrong_action_type_error(
12861286

12871287

12881288
def _get_wrong_action_name_error(
1289-
task_id: int, method_name: str, expected_task_name: str, actual_task_name: str
1289+
task_id: int, method_name: str, expected_task_name: str, actual_task_name: str
12901290
) -> task.NonDeterminismError:
12911291
return task.NonDeterminismError(
12921292
f"Failed to restore orchestration state due to a history mismatch: A previous execution called "
@@ -1471,7 +1471,7 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor
14711471
running_tasks.add(task)
14721472

14731473
async def _process_work_item(
1474-
self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, args, kwargs
1474+
self, semaphore: asyncio.Semaphore, queue: asyncio.Queue, func, args, kwargs
14751475
):
14761476
async with semaphore:
14771477
try:
@@ -1486,8 +1486,8 @@ async def _run_func(self, func, *args, **kwargs):
14861486
loop = asyncio.get_running_loop()
14871487
# Avoid submitting to executor after shutdown
14881488
if (
1489-
getattr(self, "_shutdown", False) and getattr(self, "thread_pool", None) and getattr(
1490-
self.thread_pool, "_shutdown", False)
1489+
getattr(self, "_shutdown", False) and getattr(self, "thread_pool", None) and getattr(
1490+
self.thread_pool, "_shutdown", False)
14911491
):
14921492
return None
14931493
return await loop.run_in_executor(

0 commit comments

Comments
 (0)