Skip to content

Commit 2484ef7

Browse files
committed
Fix and simplify plumbing plugins from WorkerConfig to core, impose runtime client identity requirement
1 parent f684a05 commit 2484ef7

File tree

11 files changed

+130
-328
lines changed

11 files changed

+130
-328
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 93 additions & 252 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ crate-type = ["cdylib"]
1919
anyhow = "1.0"
2020
async-trait = "0.1"
2121
futures = "0.3"
22-
prost = "0.13"
22+
prost = "0.14"
2323
pyo3 = { version = "0.25", features = [
2424
"extension-module",
2525
"abi3-py39",
@@ -38,7 +38,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", featu
3838
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
3939
tokio = "1.26"
4040
tokio-stream = "0.1"
41-
tonic = "0.13"
41+
tonic = "0.14"
4242
tracing = "0.1"
4343
url = "2.2"
4444

temporalio/bridge/runtime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class RuntimeOptions:
9696
"""Python representation of the Rust struct for runtime options."""
9797

9898
telemetry: TelemetryConfig
99-
worker_heartbeat_interval_millis: Optional[int] = 30000 # 30s
99+
worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s
100100

101101

102102
# WARNING: This must match Rust runtime::BufferedLogEntry

temporalio/bridge/sdk-core

Submodule sdk-core updated 75 files

temporalio/bridge/src/client.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,19 @@ use std::time::Duration;
66
use temporal_client::{
77
ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder,
88
ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig,
9-
TemporalServiceClientWithMetrics, TlsConfig,
9+
TlsConfig, TemporalServiceClient,
1010
};
11-
use tonic::metadata::{
12-
AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue,
11+
use temporal_client::tonic::{
12+
self,
13+
metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue},
1314
};
1415
use url::Url;
1516

1617
use crate::runtime;
1718

1819
pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException);
1920

20-
type Client = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
21+
type Client = RetryClient<ConfiguredClient<TemporalServiceClient>>;
2122

2223
#[pyclass]
2324
pub struct ClientRef {

temporalio/worker/_replayer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ def __init__(
8080
runtime=runtime,
8181
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
8282
header_codec_behavior=header_codec_behavior,
83-
plugins=[plugin.name() for plugin in plugins],
8483
)
8584

8685
# Apply plugin configuration
@@ -294,7 +293,7 @@ def on_eviction_hook(
294293
nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
295294
1
296295
),
297-
plugins=self._config.get("plugins", []),
296+
plugins=[plugin.name() for plugin in self.plugins],
298297
skip_client_worker_set_check=True,
299298
),
300299
)
@@ -371,7 +370,6 @@ class ReplayerConfig(TypedDict, total=False):
371370
runtime: Optional[temporalio.runtime.Runtime]
372371
disable_safe_workflow_eviction: bool
373372
header_codec_behavior: HeaderCodecBehavior
374-
plugins: Sequence[str]
375373

376374

377375
@dataclass(frozen=True)

temporalio/worker/_worker.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def __init__(
381381
f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior."
382382
)
383383
plugins = plugins_from_client + list(plugins)
384-
config["plugins"] = [plugin.name() for plugin in plugins]
384+
config["plugins"] = plugins
385385

386386
self.plugins = plugins
387387
for plugin in plugins:
@@ -615,7 +615,7 @@ def check_activity(activity):
615615
nexus_task_poller_behavior=config[
616616
"nexus_task_poller_behavior"
617617
]._to_bridge(),
618-
plugins=config.get("plugins", []),
618+
plugins=[plugin.name() for plugin in config.get("plugins", [])],
619619
skip_client_worker_set_check=config["skip_client_worker_set_check"],
620620
),
621621
)
@@ -629,7 +629,6 @@ def config(self) -> WorkerConfig:
629629
config = self._config.copy()
630630
config["activities"] = list(config.get("activities", []))
631631
config["workflows"] = list(config.get("workflows", []))
632-
config["plugins"] = list(config.get("plugins", []))
633632
return config
634633

635634
@property
@@ -911,7 +910,7 @@ class WorkerConfig(TypedDict, total=False):
911910
workflow_task_poller_behavior: PollerBehavior
912911
activity_task_poller_behavior: PollerBehavior
913912
nexus_task_poller_behavior: PollerBehavior
914-
plugins: Sequence[str]
913+
plugins: Sequence[Plugin]
915914
skip_client_worker_set_check: bool
916915

917916

tests/test_plugins.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,15 @@ async def test_worker_plugin_basic_config(client: Client) -> None:
150150
plugins=[MyWorkerPlugin()],
151151
)
152152
assert worker.config().get("task_queue") == "replaced_queue"
153-
assert worker.config().get("plugins") == [MyWorkerPlugin().name()]
153+
assert [p.name() for p in worker.config().get("plugins")] == [MyWorkerPlugin().name()]
154154

155155
# Test client plugin propagation to worker plugins
156156
new_config = client.config()
157157
new_config["plugins"] = [MyCombinedPlugin()]
158158
client = Client(**new_config)
159159
worker = Worker(client, task_queue="queue", activities=[never_run_activity])
160160
assert worker.config().get("task_queue") == "combined"
161-
assert worker.config().get("plugins") == [MyCombinedPlugin().name()]
161+
assert [p.name() for p in worker.config().get("plugins")] == [MyCombinedPlugin().name()]
162162

163163
# Test both. Client propagated plugins are called first, so the worker plugin overrides in this case
164164
worker = Worker(
@@ -168,7 +168,7 @@ async def test_worker_plugin_basic_config(client: Client) -> None:
168168
plugins=[MyWorkerPlugin()],
169169
)
170170
assert worker.config().get("task_queue") == "replaced_queue"
171-
assert worker.config().get("plugins") == [
171+
assert [p.name() for p in worker.config().get("plugins")] == [
172172
MyCombinedPlugin().name(),
173173
MyWorkerPlugin().name(),
174174
]
@@ -307,7 +307,6 @@ async def test_replay(client: Client) -> None:
307307
replayer = Replayer(workflows=[], plugins=[plugin])
308308
assert len(replayer.config().get("workflows") or []) == 1
309309
assert replayer.config().get("data_converter") == pydantic_data_converter
310-
assert replayer.config().get("plugins") == [plugin.name()]
311310

312311
await replayer.replay_workflow(await handle.fetch_history())
313312

tests/test_runtime.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
TelemetryFilter,
2222
)
2323
from temporalio.worker import Worker
24-
from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port
24+
from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port, worker_versioning_enabled
2525

2626

2727
@workflow.defn
@@ -260,10 +260,12 @@ async def check_metrics() -> None:
260260

261261

262262
def test_runtime_options_to_bridge_config() -> None:
263-
assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis is None
264-
options = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=30))
265-
bridge_config = options._to_bridge_config()
266-
assert bridge_config.worker_heartbeat_interval_millis == 30_000
263+
assert RuntimeOptions()._to_bridge_config().worker_heartbeat_interval_millis == 30_000
264+
bridge_config = RuntimeOptions(worker_heartbeat_interval=timedelta(seconds=60))._to_bridge_config()
265+
assert bridge_config.worker_heartbeat_interval_millis == 60_000
266+
267+
bridge_config1 = RuntimeOptions(worker_heartbeat_interval=None)._to_bridge_config()
268+
assert bridge_config1.worker_heartbeat_interval_millis is None
267269

268270

269271
def test_runtime_options_invalid_heartbeat() -> None:

tests/worker/test_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1143,7 +1143,7 @@ async def test_can_run_autoscaling_polling_worker(
11431143
activity_pollers = [l for l in matches if "activity_task" in l]
11441144
assert len(activity_pollers) == 1
11451145
assert activity_pollers[0].endswith("2")
1146-
workflow_pollers = [l for l in matches if "workflow_task" in l]
1146+
workflow_pollers = [l for l in matches if "workflow_task" in l and w.task_queue in l]
11471147
assert len(workflow_pollers) == 2
11481148
# There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on
11491149
# initialization timing.

0 commit comments

Comments
 (0)