Skip to content

Commit 18aa733

Browse files
committed
Fill out boilerplate
1 parent 7f228d8 commit 18aa733

File tree

4 files changed

+45
-11
lines changed

4 files changed

+45
-11
lines changed

temporalio/bridge/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class TunerHolder:
165165
workflow_slot_supplier: SlotSupplier
166166
activity_slot_supplier: SlotSupplier
167167
local_activity_slot_supplier: SlotSupplier
168+
nexus_slot_supplier: SlotSupplier
168169

169170

170171
class Worker:

temporalio/worker/_replayer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def on_eviction_hook(
264264
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
265265
1
266266
),
267+
nexus_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
268+
1
269+
),
267270
),
268271
nonsticky_to_sticky_poll_ratio=1,
269272
no_remote_activities=True,

temporalio/worker/_tuning.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import logging
35
from abc import ABC, abstractmethod
@@ -303,7 +305,8 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
303305

304306

305307
def _to_bridge_slot_supplier(
306-
slot_supplier: SlotSupplier, kind: Literal["workflow", "activity", "local_activity"]
308+
slot_supplier: SlotSupplier,
309+
kind: Literal["workflow", "activity", "local_activity", "nexus"],
307310
) -> temporalio.bridge.worker.SlotSupplier:
308311
if isinstance(slot_supplier, FixedSizeSlotSupplier):
309312
return temporalio.bridge.worker.FixedSizeSlotSupplier(slot_supplier.num_slots)
@@ -347,7 +350,8 @@ def create_resource_based(
347350
workflow_config: Optional[ResourceBasedSlotConfig] = None,
348351
activity_config: Optional[ResourceBasedSlotConfig] = None,
349352
local_activity_config: Optional[ResourceBasedSlotConfig] = None,
350-
) -> "WorkerTuner":
353+
nexus_config: Optional[ResourceBasedSlotConfig] = None,
354+
) -> WorkerTuner:
351355
"""Create a resource-based tuner with the provided options."""
352356
resource_cfg = ResourceBasedTunerConfig(target_memory_usage, target_cpu_usage)
353357
wf = ResourceBasedSlotSupplier(
@@ -359,10 +363,14 @@ def create_resource_based(
359363
local_act = ResourceBasedSlotSupplier(
360364
local_activity_config or ResourceBasedSlotConfig(), resource_cfg
361365
)
366+
nexus = ResourceBasedSlotSupplier(
367+
nexus_config or ResourceBasedSlotConfig(), resource_cfg
368+
)
362369
return _CompositeTuner(
363370
wf,
364371
act,
365372
local_act,
373+
nexus,
366374
)
367375

368376
@staticmethod
@@ -371,14 +379,16 @@ def create_fixed(
371379
workflow_slots: Optional[int],
372380
activity_slots: Optional[int],
373381
local_activity_slots: Optional[int],
374-
) -> "WorkerTuner":
382+
nexus_slots: Optional[int],
383+
) -> WorkerTuner:
375384
"""Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""
376385
return _CompositeTuner(
377386
FixedSizeSlotSupplier(workflow_slots if workflow_slots else 100),
378387
FixedSizeSlotSupplier(activity_slots if activity_slots else 100),
379388
FixedSizeSlotSupplier(
380389
local_activity_slots if local_activity_slots else 100
381390
),
391+
FixedSizeSlotSupplier(nexus_slots if nexus_slots else 100),
382392
)
383393

384394
@staticmethod
@@ -387,12 +397,14 @@ def create_composite(
387397
workflow_supplier: SlotSupplier,
388398
activity_supplier: SlotSupplier,
389399
local_activity_supplier: SlotSupplier,
390-
) -> "WorkerTuner":
400+
nexus_supplier: SlotSupplier,
401+
) -> WorkerTuner:
391402
"""Create a tuner composed of the provided slot suppliers."""
392403
return _CompositeTuner(
393404
workflow_supplier,
394405
activity_supplier,
395406
local_activity_supplier,
407+
nexus_supplier,
396408
)
397409

398410
@abstractmethod
@@ -407,6 +419,10 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:
407419
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
408420
raise NotImplementedError
409421

422+
@abstractmethod
423+
def _get_nexus_slot_supplier(self) -> SlotSupplier:
424+
raise NotImplementedError
425+
410426
def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
411427
return temporalio.bridge.worker.TunerHolder(
412428
_to_bridge_slot_supplier(
@@ -418,6 +434,7 @@ def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
418434
_to_bridge_slot_supplier(
419435
self._get_local_activity_task_slot_supplier(), "local_activity"
420436
),
437+
_to_bridge_slot_supplier(self._get_nexus_slot_supplier(), "nexus"),
421438
)
422439

423440
def _get_activities_max(self) -> Optional[int]:
@@ -436,6 +453,7 @@ class _CompositeTuner(WorkerTuner):
436453
workflow_slot_supplier: SlotSupplier
437454
activity_slot_supplier: SlotSupplier
438455
local_activity_slot_supplier: SlotSupplier
456+
nexus_slot_supplier: SlotSupplier
439457

440458
def _get_workflow_task_slot_supplier(self) -> SlotSupplier:
441459
return self.workflow_slot_supplier
@@ -445,3 +463,6 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:
445463

446464
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
447465
return self.local_activity_slot_supplier
466+
467+
def _get_nexus_slot_supplier(self) -> SlotSupplier:
468+
return self.nexus_slot_supplier

temporalio/worker/_worker.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def __init__(
119119
max_concurrent_workflow_tasks: Optional[int] = None,
120120
max_concurrent_activities: Optional[int] = None,
121121
max_concurrent_local_activities: Optional[int] = None,
122+
max_concurrent_nexus_tasks: Optional[int] = None,
122123
tuner: Optional[WorkerTuner] = None,
123124
max_concurrent_workflow_task_polls: Optional[int] = None,
124125
nonsticky_to_sticky_poll_ratio: float = 0.2,
@@ -214,13 +215,16 @@ def __init__(
214215
max_concurrent_workflow_tasks: Maximum allowed number of workflow
215216
tasks that will ever be given to this worker at one time. Mutually exclusive with
216217
``tuner``. Must be set to at least two if ``max_cached_workflows`` is nonzero.
217-
max_concurrent_activities: Maximum number of activity tasks that
218-
will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
218+
max_concurrent_activities: Maximum number of activity tasks that will ever be given to
219+
the activity worker concurrently. Mutually exclusive with ``tuner``.
219220
max_concurrent_local_activities: Maximum number of local activity
220-
tasks that will ever be given to the activity worker concurrently. Mutually exclusive with ``tuner``.
221+
tasks that will ever be given to the activity worker concurrently. Mutually
222+
exclusive with ``tuner``.
223+
max_concurrent_nexus_tasks: Maximum number of Nexus tasks that will ever be given to
224+
the Nexus worker concurrently. Mutually exclusive with ``tuner``.
221225
tuner: Provide a custom :py:class:`WorkerTuner`. Mutually exclusive with the
222-
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
223-
``max_concurrent_local_activities`` arguments.
226+
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``,
227+
``max_concurrent_local_activities``, and ``max_concurrent_nexus_tasks`` arguments.
224228
225229
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
226230
max_* arguments are provided.
@@ -337,6 +341,7 @@ def __init__(
337341
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
338342
max_concurrent_activities=max_concurrent_activities,
339343
max_concurrent_local_activities=max_concurrent_local_activities,
344+
max_concurrent_nexus_tasks=max_concurrent_nexus_tasks,
340345
tuner=tuner,
341346
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
342347
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
@@ -387,7 +392,6 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
387392
"""
388393
self._config = config
389394

390-
# TODO(nexus-preview): max_concurrent_nexus_tasks / tuner support
391395
if not (
392396
config["activities"]
393397
or config["nexus_service_handlers"]
@@ -462,6 +466,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
462466
)
463467
self._nexus_worker: Optional[_NexusWorker] = None
464468
if config["nexus_service_handlers"]:
469+
# TODO(nexus-preview) analogous calculations to those done for activity?
465470
self._nexus_worker = _NexusWorker(
466471
bridge_worker=lambda: self._bridge_worker,
467472
client=config["client"],
@@ -522,16 +527,19 @@ def check_activity(activity):
522527
config["max_concurrent_workflow_tasks"]
523528
or config["max_concurrent_activities"]
524529
or config["max_concurrent_local_activities"]
530+
or config["max_concurrent_nexus_tasks"]
525531
):
526532
raise ValueError(
527533
"Cannot specify max_concurrent_workflow_tasks, max_concurrent_activities, "
528-
"or max_concurrent_local_activities when also specifying tuner"
534+
"max_concurrent_local_activities, or max_concurrent_nexus_tasks when also "
535+
"specifying tuner"
529536
)
530537
else:
531538
tuner = WorkerTuner.create_fixed(
532539
workflow_slots=config["max_concurrent_workflow_tasks"],
533540
activity_slots=config["max_concurrent_activities"],
534541
local_activity_slots=config["max_concurrent_local_activities"],
542+
nexus_slots=config["max_concurrent_nexus_tasks"],
535543
)
536544

537545
bridge_tuner = tuner._to_bridge_tuner()
@@ -883,6 +891,7 @@ class WorkerConfig(TypedDict, total=False):
883891
max_concurrent_workflow_tasks: Optional[int]
884892
max_concurrent_activities: Optional[int]
885893
max_concurrent_local_activities: Optional[int]
894+
max_concurrent_nexus_tasks: Optional[int]
886895
tuner: Optional[WorkerTuner]
887896
max_concurrent_workflow_task_polls: Optional[int]
888897
nonsticky_to_sticky_poll_ratio: float

0 commit comments

Comments
 (0)