Skip to content

Commit 6220334

Browse files
committed
Revert "revert"
This reverts commit e602262.
1 parent e602262 commit 6220334

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

temporalio/worker/_replayer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def on_eviction_hook(
264264
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
265265
1
266266
),
267+
nexus_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
268+
1
269+
),
267270
),
268271
nonsticky_to_sticky_poll_ratio=1,
269272
no_remote_activities=True,

temporalio/worker/_worker.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def __init__(
119119
max_concurrent_workflow_tasks: Optional[int] = None,
120120
max_concurrent_activities: Optional[int] = None,
121121
max_concurrent_local_activities: Optional[int] = None,
122+
max_concurrent_nexus_tasks: Optional[int] = None,
122123
tuner: Optional[WorkerTuner] = None,
123124
max_concurrent_workflow_task_polls: Optional[int] = None,
124125
nonsticky_to_sticky_poll_ratio: float = 0.2,
@@ -214,13 +215,16 @@ def __init__(
214215
max_concurrent_workflow_tasks: Maximum allowed number of workflow
215216
tasks that will ever be given to this worker at one time. Mutually exclusive with
216217
``tuner``. Must be set to at least two if ``max_cached_workflows`` is nonzero.
217-
max_concurrent_activities: Maximum number of activity tasks that
218-
will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
218+
max_concurrent_activities: Maximum number of activity tasks that will ever be given to
219+
the activity worker concurrently. Mutually exclusive with ``tuner``.
219220
max_concurrent_local_activities: Maximum number of local activity
220-
tasks that will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
221+
tasks that will ever be given to the activity worker concurrently. Mutually
222+
exclusive with ``tuner``.
223+
max_concurrent_nexus_tasks: Maximum number of Nexus tasks that will ever be given to
224+
the Nexus worker concurrently. Mutually exclusive with ``tuner``.
221225
tuner: Provide a custom :py:class:`WorkerTuner`. Mutually exclusive with the
222-
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
223-
``max_concurrent_local_activities`` arguments.
226+
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``,
227+
``max_concurrent_local_activities``, and ``max_concurrent_nexus_tasks`` arguments.
224228
225229
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
226230
max_* arguments are provided.
@@ -337,6 +341,7 @@ def __init__(
337341
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
338342
max_concurrent_activities=max_concurrent_activities,
339343
max_concurrent_local_activities=max_concurrent_local_activities,
344+
max_concurrent_nexus_tasks=max_concurrent_nexus_tasks,
340345
tuner=tuner,
341346
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
342347
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
@@ -387,7 +392,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
387392
"""
388393
self._config = config
389394

390-
# TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support
391395
if not (
392396
config["activities"]
393397
or config["nexus_service_handlers"]
@@ -462,6 +466,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
462466
)
463467
self._nexus_worker: Optional[_NexusWorker] = None
464468
if config["nexus_service_handlers"]:
469+
# TODO(nexus-preview) analogous calculations to those done for activity?
465470
self._nexus_worker = _NexusWorker(
466471
bridge_worker=lambda: self._bridge_worker,
467472
client=config["client"],
@@ -522,16 +527,19 @@ def check_activity(activity):
522527
config["max_concurrent_workflow_tasks"]
523528
or config["max_concurrent_activities"]
524529
or config["max_concurrent_local_activities"]
530+
or config["max_concurrent_nexus_tasks"]
525531
):
526532
raise ValueError(
527533
"Cannot specify max_concurrent_workflow_tasks, max_concurrent_activities, "
528-
"or max_concurrent_local_activities when also specifying tuner"
534+
"max_concurrent_local_activities, or max_concurrent_nexus_tasks when also "
535+
"specifying tuner"
529536
)
530537
else:
531538
tuner = WorkerTuner.create_fixed(
532539
workflow_slots=config["max_concurrent_workflow_tasks"],
533540
activity_slots=config["max_concurrent_activities"],
534541
local_activity_slots=config["max_concurrent_local_activities"],
542+
nexus_slots=config["max_concurrent_nexus_tasks"],
535543
)
536544

537545
bridge_tuner = tuner._to_bridge_tuner()
@@ -883,6 +891,7 @@ class WorkerConfig(TypedDict, total=False):
883891
max_concurrent_workflow_tasks: Optional[int]
884892
max_concurrent_activities: Optional[int]
885893
max_concurrent_local_activities: Optional[int]
894+
max_concurrent_nexus_tasks: Optional[int]
886895
tuner: Optional[WorkerTuner]
887896
max_concurrent_workflow_task_polls: Optional[int]
888897
nonsticky_to_sticky_poll_ratio: float

0 commit comments

Comments
 (0)