Skip to content

Commit 84b184f

Browse files
authored
Nexus worker concurrency control (#1066)
1 parent 9d70d44 commit 84b184f

File tree

7 files changed

+229
-54
lines changed

7 files changed

+229
-54
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pub struct TunerHolder {
147147
workflow_slot_supplier: SlotSupplier,
148148
activity_slot_supplier: SlotSupplier,
149149
local_activity_slot_supplier: SlotSupplier,
150+
nexus_slot_supplier: SlotSupplier,
150151
}
151152

152153
#[derive(FromPyObject)]
@@ -745,10 +746,17 @@ fn convert_tuner_holder(
745746
} else {
746747
None
747748
};
749+
let maybe_nexus_resource_opts =
750+
if let SlotSupplier::ResourceBased(ref ss) = holder.nexus_slot_supplier {
751+
Some(&ss.tuner_config)
752+
} else {
753+
None
754+
};
748755
let all_resource_opts = [
749756
maybe_wf_resource_opts,
750757
maybe_act_resource_opts,
751758
maybe_local_act_resource_opts,
759+
maybe_nexus_resource_opts,
752760
];
753761
let mut set_resource_opts = all_resource_opts.iter().flatten();
754762
let first = set_resource_opts.next();
@@ -784,6 +792,10 @@ fn convert_tuner_holder(
784792
)?)
785793
.local_activity_slot_options(convert_slot_supplier(
786794
holder.local_activity_slot_supplier,
795+
task_locals.clone(),
796+
)?)
797+
.nexus_slot_options(convert_slot_supplier(
798+
holder.nexus_slot_supplier,
787799
task_locals,
788800
)?);
789801
Ok(options

temporalio/bridge/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class TunerHolder:
168168
workflow_slot_supplier: SlotSupplier
169169
activity_slot_supplier: SlotSupplier
170170
local_activity_slot_supplier: SlotSupplier
171+
nexus_slot_supplier: SlotSupplier
171172

172173

173174
class Worker:

temporalio/worker/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
CustomSlotSupplier,
3434
FixedSizeSlotSupplier,
3535
LocalActivitySlotInfo,
36+
NexusSlotInfo,
3637
ResourceBasedSlotConfig,
3738
ResourceBasedSlotSupplier,
3839
ResourceBasedTunerConfig,
@@ -117,4 +118,5 @@
117118
"SlotReleaseContext",
118119
"SlotReserveContext",
119120
"WorkflowSlotInfo",
121+
"NexusSlotInfo",
120122
]

temporalio/worker/_replayer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ def on_eviction_hook(
264264
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
265265
1
266266
),
267+
nexus_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
268+
1
269+
),
267270
),
268271
nonsticky_to_sticky_poll_ratio=1,
269272
no_remote_activities=True,

temporalio/worker/_tuning.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import logging
35
from abc import ABC, abstractmethod
@@ -10,7 +12,7 @@
1012
import temporalio.bridge.worker
1113
from temporalio.common import WorkerDeploymentVersion
1214

13-
_DEFAULT_RESOURCE_ACTIVITY_MAX = 500
15+
_DEFAULT_RESOURCE_SLOTS_MAX = 500
1416

1517
logger = logging.getLogger(__name__)
1618

@@ -150,7 +152,22 @@ class LocalActivitySlotInfo(Protocol):
150152
activity_type: str
151153

152154

153-
SlotInfo: TypeAlias = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]
155+
# WARNING: This must match Rust worker::NexusSlotInfo
156+
@runtime_checkable
157+
class NexusSlotInfo(Protocol):
158+
"""Info about a nexus task slot usage.
159+
160+
.. warning::
161+
Custom slot suppliers are currently experimental.
162+
"""
163+
164+
service: str
165+
operation: str
166+
167+
168+
SlotInfo: TypeAlias = Union[
169+
WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo
170+
]
154171

155172

156173
# WARNING: This must match Rust worker::SlotMarkUsedCtx
@@ -303,13 +320,14 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
303320

304321

305322
def _to_bridge_slot_supplier(
306-
slot_supplier: SlotSupplier, kind: Literal["workflow", "activity", "local_activity"]
323+
slot_supplier: SlotSupplier,
324+
kind: Literal["workflow", "activity", "local_activity", "nexus"],
307325
) -> temporalio.bridge.worker.SlotSupplier:
308326
if isinstance(slot_supplier, FixedSizeSlotSupplier):
309327
return temporalio.bridge.worker.FixedSizeSlotSupplier(slot_supplier.num_slots)
310328
elif isinstance(slot_supplier, ResourceBasedSlotSupplier):
311329
min_slots = 5 if kind == "workflow" else 1
312-
max_slots = _DEFAULT_RESOURCE_ACTIVITY_MAX
330+
max_slots = _DEFAULT_RESOURCE_SLOTS_MAX
313331
ramp_throttle = (
314332
timedelta(seconds=0) if kind == "workflow" else timedelta(milliseconds=50)
315333
)
@@ -347,7 +365,8 @@ def create_resource_based(
347365
workflow_config: Optional[ResourceBasedSlotConfig] = None,
348366
activity_config: Optional[ResourceBasedSlotConfig] = None,
349367
local_activity_config: Optional[ResourceBasedSlotConfig] = None,
350-
) -> "WorkerTuner":
368+
nexus_config: Optional[ResourceBasedSlotConfig] = None,
369+
) -> WorkerTuner:
351370
"""Create a resource-based tuner with the provided options."""
352371
resource_cfg = ResourceBasedTunerConfig(target_memory_usage, target_cpu_usage)
353372
wf = ResourceBasedSlotSupplier(
@@ -359,26 +378,35 @@ def create_resource_based(
359378
local_act = ResourceBasedSlotSupplier(
360379
local_activity_config or ResourceBasedSlotConfig(), resource_cfg
361380
)
381+
nexus = ResourceBasedSlotSupplier(
382+
nexus_config or ResourceBasedSlotConfig(), resource_cfg
383+
)
362384
return _CompositeTuner(
363385
wf,
364386
act,
365387
local_act,
388+
nexus,
366389
)
367390

368391
@staticmethod
369392
def create_fixed(
370393
*,
371-
workflow_slots: Optional[int],
372-
activity_slots: Optional[int],
373-
local_activity_slots: Optional[int],
374-
) -> "WorkerTuner":
375-
"""Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""
394+
workflow_slots: Optional[int] = None,
395+
activity_slots: Optional[int] = None,
396+
local_activity_slots: Optional[int] = None,
397+
nexus_slots: Optional[int] = None,
398+
) -> WorkerTuner:
399+
"""Create a fixed-size tuner with the provided number of slots.
400+
401+
Any unspecified slot numbers will default to 100.
402+
"""
376403
return _CompositeTuner(
377404
FixedSizeSlotSupplier(workflow_slots if workflow_slots else 100),
378405
FixedSizeSlotSupplier(activity_slots if activity_slots else 100),
379406
FixedSizeSlotSupplier(
380407
local_activity_slots if local_activity_slots else 100
381408
),
409+
FixedSizeSlotSupplier(nexus_slots if nexus_slots else 100),
382410
)
383411

384412
@staticmethod
@@ -387,12 +415,14 @@ def create_composite(
387415
workflow_supplier: SlotSupplier,
388416
activity_supplier: SlotSupplier,
389417
local_activity_supplier: SlotSupplier,
390-
) -> "WorkerTuner":
418+
nexus_supplier: SlotSupplier,
419+
) -> WorkerTuner:
391420
"""Create a tuner composed of the provided slot suppliers."""
392421
return _CompositeTuner(
393422
workflow_supplier,
394423
activity_supplier,
395424
local_activity_supplier,
425+
nexus_supplier,
396426
)
397427

398428
@abstractmethod
@@ -407,6 +437,10 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:
407437
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
408438
raise NotImplementedError
409439

440+
@abstractmethod
441+
def _get_nexus_slot_supplier(self) -> SlotSupplier:
442+
raise NotImplementedError
443+
410444
def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
411445
return temporalio.bridge.worker.TunerHolder(
412446
_to_bridge_slot_supplier(
@@ -418,14 +452,25 @@ def _to_bridge_tuner(self) -> temporalio.bridge.worker.TunerHolder:
418452
_to_bridge_slot_supplier(
419453
self._get_local_activity_task_slot_supplier(), "local_activity"
420454
),
455+
_to_bridge_slot_supplier(self._get_nexus_slot_supplier(), "nexus"),
421456
)
422457

423458
def _get_activities_max(self) -> Optional[int]:
424-
ss = self._get_activity_task_slot_supplier()
425-
if isinstance(ss, FixedSizeSlotSupplier):
426-
return ss.num_slots
427-
elif isinstance(ss, ResourceBasedSlotSupplier):
428-
return ss.slot_config.maximum_slots or _DEFAULT_RESOURCE_ACTIVITY_MAX
459+
return WorkerTuner._get_slot_supplier_max(
460+
self._get_activity_task_slot_supplier()
461+
)
462+
463+
def _get_nexus_tasks_max(self) -> Optional[int]:
464+
return WorkerTuner._get_slot_supplier_max(self._get_nexus_slot_supplier())
465+
466+
@staticmethod
467+
def _get_slot_supplier_max(slot_supplier: SlotSupplier) -> Optional[int]:
468+
if isinstance(slot_supplier, FixedSizeSlotSupplier):
469+
return slot_supplier.num_slots
470+
elif isinstance(slot_supplier, ResourceBasedSlotSupplier):
471+
return (
472+
slot_supplier.slot_config.maximum_slots or _DEFAULT_RESOURCE_SLOTS_MAX
473+
)
429474
return None
430475

431476

@@ -436,6 +481,7 @@ class _CompositeTuner(WorkerTuner):
436481
workflow_slot_supplier: SlotSupplier
437482
activity_slot_supplier: SlotSupplier
438483
local_activity_slot_supplier: SlotSupplier
484+
nexus_slot_supplier: SlotSupplier
439485

440486
def _get_workflow_task_slot_supplier(self) -> SlotSupplier:
441487
return self.workflow_slot_supplier
@@ -445,3 +491,6 @@ def _get_activity_task_slot_supplier(self) -> SlotSupplier:
445491

446492
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
447493
return self.local_activity_slot_supplier
494+
495+
def _get_nexus_slot_supplier(self) -> SlotSupplier:
496+
return self.nexus_slot_supplier

0 commit comments

Comments
 (0)