Skip to content

Commit 85b978d

Browse files
committed
More rusty, clarify naming, simplify Runtime args, add Client plugins to list
1 parent 504fc05 commit 85b978d

File tree

6 files changed

+45
-58
lines changed

6 files changed

+45
-58
lines changed

temporalio/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,11 @@ def api_key(self, value: Optional[str]) -> None:
349349
self.service_client.config.api_key = value
350350
self.service_client.update_api_key(value)
351351

352+
@property
353+
def plugins(self) -> Sequence[Plugin]:
354+
"""Plugins used by this client."""
355+
return self._config["plugins"]
356+
352357
# Overload for no-param workflow
353358
@overload
354359
async def start_workflow(

temporalio/runtime.py

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None:
7373
def __init__(
7474
self,
7575
*,
76-
telemetry: Optional[TelemetryConfig] = None,
77-
runtime_options: Optional["RuntimeOptions"] = None,
76+
telemetry: TelemetryConfig,
77+
worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30),
7878
) -> None:
7979
"""Create a runtime with the provided configuration.
8080
@@ -83,25 +83,29 @@ def __init__(
8383
Args:
8484
telemetry: Telemetry configuration when not supplying
8585
``runtime_options``.
86-
runtime_options: Full runtime configuration including telemetry and
87-
worker heartbeating options.
86+
worker_heartbeat_interval: Interval for worker heartbeats. ``None``
87+
disables heartbeating.
8888
8989
Raises:
90-
ValueError: If both ``telemetry`` and ``runtime_options`` are
91-
provided.
90+
ValueError: If both ```runtime_options`` is a negative value.
9291
"""
93-
if runtime_options and telemetry:
94-
raise ValueError("Cannot supply both telemetry and runtime_options")
95-
96-
if runtime_options is None:
97-
telemetry = telemetry or TelemetryConfig()
98-
runtime_options = RuntimeOptions(telemetry=telemetry)
92+
if worker_heartbeat_interval is None:
93+
heartbeat_millis = None
9994
else:
100-
telemetry = runtime_options.telemetry
95+
if worker_heartbeat_interval <= timedelta(0):
96+
raise ValueError("worker_heartbeat_interval must be positive")
97+
heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000)
98+
if heartbeat_millis == 0:
99+
heartbeat_millis = 1
100+
101+
self._heartbeat_millis = heartbeat_millis
101102

102-
self._core_runtime = temporalio.bridge.runtime.Runtime(
103-
options=runtime_options._to_bridge_config()
103+
runtime_options = temporalio.bridge.runtime.RuntimeOptions(
104+
telemetry=telemetry._to_bridge_config(),
105+
worker_heartbeat_interval_millis=heartbeat_millis,
104106
)
107+
108+
self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options)
105109
if isinstance(telemetry.metrics, MetricBuffer):
106110
telemetry.metrics._runtime = self
107111
core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime)
@@ -415,34 +419,6 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig:
415419
)
416420

417421

418-
@dataclass(frozen=True)
419-
class RuntimeOptions:
420-
"""Configuration for runtime initialization."""
421-
422-
telemetry: TelemetryConfig = field(default_factory=TelemetryConfig)
423-
"""Telemetry configuration applied to the runtime."""
424-
425-
worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30)
426-
"""Interval for worker heartbeats. ``None`` disables heartbeating."""
427-
428-
def _to_bridge_config(self) -> temporalio.bridge.runtime.RuntimeOptions:
429-
heartbeat_millis: Optional[int]
430-
if self.worker_heartbeat_interval is None:
431-
heartbeat_millis = None
432-
else:
433-
if self.worker_heartbeat_interval <= timedelta(0):
434-
raise ValueError("worker_heartbeat_interval must be positive")
435-
heartbeat_millis = int(
436-
self.worker_heartbeat_interval.total_seconds() * 1000
437-
)
438-
if heartbeat_millis == 0:
439-
heartbeat_millis = 1
440-
return temporalio.bridge.runtime.RuntimeOptions(
441-
telemetry=self.telemetry._to_bridge_config(),
442-
worker_heartbeat_interval_millis=heartbeat_millis,
443-
)
444-
445-
446422
BufferedMetricKind = NewType("BufferedMetricKind", int)
447423
"""Representation of a buffered metric kind."""
448424

temporalio/worker/_worker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,10 @@ def check_activity(activity):
559559
maximum=config["max_concurrent_activity_task_polls"]
560560
)
561561

562+
worker_plugins = [plugin.name() for plugin in config.get("plugins", [])]
563+
client_plugins = [plugin.name() for plugin in config["client"].plugins]
564+
plugins = list(set(worker_plugins + client_plugins))
565+
562566
# Create bridge worker last. We have empirically observed that if it is
563567
# created before an error is raised from the activity worker
564568
# constructor, a deadlock/hang will occur presumably while trying to
@@ -613,7 +617,7 @@ def check_activity(activity):
613617
nexus_task_poller_behavior=config[
614618
"nexus_task_poller_behavior"
615619
]._to_bridge(),
616-
plugins=[plugin.name() for plugin in config.get("plugins", [])],
620+
plugins=plugins,
617621
skip_client_worker_set_check=config["skip_client_worker_set_check"],
618622
),
619623
)

tests/test_runtime.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
LoggingConfig,
1717
PrometheusConfig,
1818
Runtime,
19-
RuntimeOptions,
2019
TelemetryConfig,
2120
TelemetryFilter,
2221
)
@@ -265,20 +264,24 @@ async def check_metrics() -> None:
265264

266265

267266
def test_runtime_options_to_bridge_config() -> None:
268-
assert (
269-
RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000
267+
runtime = Runtime(telemetry=TelemetryConfig())
268+
assert runtime._heartbeat_millis == 30_000
269+
270+
runtime = Runtime(
271+
telemetry=TelemetryConfig(),
272+
worker_heartbeat_interval=timedelta(seconds=60),
270273
)
271-
bridge_config = RuntimeOptions(
272-
worker_heartbeat_interval=timedelta(seconds=60)
273-
)._to_bridge_config()
274-
assert bridge_config.worker_heartbeat_interval_millis == 60_000
274+
assert runtime._heartbeat_millis == 60_000
275275

276-
bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config()
277-
assert bridge_config1.worker_heartbeat_interval_millis is None
276+
runtime = Runtime(
277+
telemetry=TelemetryConfig(),
278+
worker_heartbeat_interval=None,
279+
)
280+
assert runtime._heartbeat_millis is None
278281

279282

280283
def test_runtime_options_invalid_heartbeat() -> None:
281284
with pytest.raises(ValueError):
282-
RuntimeOptions(
283-
worker_heartbeat_interval=timedelta(seconds=-5)
284-
)._to_bridge_config()
285+
Runtime(
286+
telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5)
287+
)

tests/worker/test_worker.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
from temporalio.runtime import (
3333
PrometheusConfig,
3434
Runtime,
35-
RuntimeOptions,
3635
TelemetryConfig,
3736
)
3837
from temporalio.service import RPCError

tests/worker/test_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5302,7 +5302,7 @@ async def run(self) -> None:
53025302

53035303

53045304
async def test_workflow_replace_worker_client(client: Client):
5305-
other_runtime = Runtime()
5305+
other_runtime = Runtime(telemetry=TelemetryConfig())
53065306
other_client = await Client.connect(
53075307
client.service_client.config.target_host,
53085308
namespace=client.namespace,

0 commit comments

Comments
 (0)