diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71d804e5a..bc8895419 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,8 +58,8 @@ jobs: - run: uv sync --all-extras - run: poe bridge-lint if: ${{ matrix.clippyLinter }} - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml timeout-minutes: 15 @@ -149,8 +149,8 @@ jobs: - run: uv tool install poethepoet - run: uv lock --upgrade - run: uv sync --all-extras - - run: poe lint - run: poe build-develop + - run: poe lint - run: mkdir junit-xml - run: poe test -s --junit-xml=junit-xml/latest-deps.xml timeout-minutes: 10 diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b40d5fc72..b9046647e 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -412,6 +412,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -1328,9 +1334,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "opentelemetry" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1342,9 +1348,9 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", @@ -1355,9 +1361,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http", "opentelemetry", @@ -1374,21 +1380,22 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry_sdk" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", @@ -1396,7 +1403,6 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.15", "tokio", "tokio-stream", @@ -1589,9 +1595,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", "prost-derive", @@ -1599,9 +1605,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", "itertools", @@ -1612,6 +1618,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn", "tempfile", @@ -1619,9 +1627,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", "itertools", @@ -1632,18 +1640,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.5" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ "prost", ] [[package]] name = "prost-wkt" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497e1e938f0c09ef9cabe1d49437b4016e03e8f82fbbe5d1c62a9b61b9decae1" +checksum = "655944d0ce015e71b3ec21279437e6a09e58433e50c7b0677901f3d5235e74f5" dependencies = [ "chrono", "inventory", @@ -1656,9 +1664,9 @@ dependencies = [ [[package]] name = "prost-wkt-build" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b8bf115b70a7aa5af1fd5d6e9418492e9ccb6e4785e858c938e28d132a884b" +checksum = "f869f1443fee474b785e935d92e1007f57443e485f51668ed41943fc01a321a2" dependencies = [ "heck", "prost", @@ -1669,9 +1677,9 @@ dependencies = [ [[package]] name = "prost-wkt-types" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8cdde6df0a98311c839392ca2f2f0bcecd545f86a62b4e3c6a49c336e970fe5" +checksum = "eeeffd6b9becd4600dd461399f3f71aeda2ff0848802a9ed526cf12e8f42902a" dependencies = [ "chrono", "prost", @@ -1705,6 +1713,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyo3" version = "0.25.1" @@ -2450,6 +2478,7 @@ dependencies = [ "bytes", "derive_builder", "derive_more", + "dyn-clone", "futures-retry", "futures-util", "http", @@ -2569,6 +2598,7 @@ dependencies = [ "tracing", "tracing-core", "url", + "uuid", ] [[package]] @@ -2579,16 +2609,15 @@ dependencies = [ "base64", "derive_more", "prost", - "prost-build", "prost-wkt", - "prost-wkt-build", "prost-wkt-types", "rand 0.9.2", "serde", "serde_json", "thiserror 2.0.15", "tonic", - "tonic-build", + "tonic-prost", + "tonic-prost-build", "uuid", ] @@ -2778,9 +2807,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" [[package]] name = "tonic" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "axum", @@ -2795,9 +2824,9 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", "rustls-native-certs", - "socket2 0.5.10", + "socket2 0.6.0", + "sync_wrapper", "tokio", "tokio-rustls", "tokio-stream", @@ -2809,9 +2838,32 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.13.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" dependencies = [ "prettyplease", "proc-macro2", @@ -2819,6 +2871,8 @@ dependencies = [ "prost-types", "quote", "syn", + "tempfile", + "tonic-build", ] [[package]] @@ -2955,6 +3009,12 @@ dependencies = [ "syn", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -2998,9 +3058,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index ad839c19b..1da8d5d2e 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] anyhow = "1.0" async-trait = "0.1" futures = "0.3" -prost = "0.13" +prost = "0.14" pyo3 = { version = "0.25", features = [ "extension-module", "abi3-py310", @@ -38,7 +38,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", featu temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" tokio-stream = "0.1" -tonic = "0.13" +tonic = "0.14" tracing = "0.1" url = "2.2" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index afc79f0f5..b08b97584 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -23,9 +23,9 @@ def _raise_in_thread(thread_id: int, exc_type: Type[BaseException]) -> bool: thread_id, exc_type ) - def __init__(self, *, telemetry: TelemetryConfig) -> None: + def __init__(self, *, options: RuntimeOptions) -> None: """Create SDK Core runtime.""" - self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(options) def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" @@ -91,6 +91,14 @@ class TelemetryConfig: metrics: Optional[MetricsConfig] +@dataclass(frozen=True) +class RuntimeOptions: + """Python representation of the Rust struct for runtime options.""" + + telemetry: TelemetryConfig + worker_heartbeat_interval_millis: Optional[int] = 30_000 # 30s + + # WARNING: This must match Rust runtime::BufferedLogEntry class BufferedLogEntry(Protocol): """A buffered log entry.""" diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 682d441dd..9e9a46191 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 682d441dd3b830e1477af3edb7c2330b403c4c33 +Subproject commit 9e9a46191656fc9ccd95589dac3552410561d620 diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index dfbd432a1..68ebb1cb4 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -6,10 +6,11 @@ use std::time::Duration; use temporal_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, ConfiguredClient, HttpConnectProxyOptions, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TlsConfig, + TlsConfig, TemporalServiceClient, }; -use tonic::metadata::{ - AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, +use temporal_client::tonic::{ + self, + metadata::{AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue}, }; use url::Url; @@ -17,7 +18,7 @@ use crate::runtime; pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException); -type Client = RetryClient>; +type Client = RetryClient>; #[pyclass] pub struct ClientRef { diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index cbd5be10e..ee157fb18 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -82,8 +82,8 @@ fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option PyResult { - runtime::init_runtime(telemetry_config) +fn init_runtime(options: runtime::RuntimeOptions) -> PyResult { + runtime::init_runtime(options) } #[pyfunction] diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 72cc905ae..be77a1676 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -13,7 +13,7 @@ use temporal_sdk_core::telemetry::{ build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, MetricsCallBuffer, }; -use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; +use temporal_sdk_core::{CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder}; use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, @@ -86,16 +86,27 @@ pub struct PrometheusConfig { histogram_bucket_overrides: Option>>, } +#[derive(FromPyObject)] +pub struct RuntimeOptions { + telemetry: TelemetryConfig, + worker_heartbeat_interval_millis: Option, +} + const FORWARD_LOG_BUFFER_SIZE: usize = 2048; const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; -pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { +pub fn init_runtime(options: RuntimeOptions) -> PyResult { + let RuntimeOptions { + telemetry: TelemetryConfig { logging, metrics }, + worker_heartbeat_interval_millis, + } = options; + // Have to build/start telemetry config pieces let mut telemetry_build = TelemetryOptionsBuilder::default(); // Build logging config, capturing forwarding info to start later let mut log_forwarding: Option<(Receiver, PyObject)> = None; - if let Some(logging_conf) = telemetry_config.logging { + if let Some(logging_conf) = logging { telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { // Note, actual log forwarding is started later let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); @@ -113,26 +124,31 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { // Build metric config, but actual metrics instance is late-bound after // CoreRuntime is created since it needs Tokio runtime - if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { + if let Some(metrics_conf) = metrics.as_ref() { telemetry_build.attach_service_name(metrics_conf.attach_service_name); if let Some(prefix) = &metrics_conf.metric_prefix { telemetry_build.metric_prefix(prefix.to_string()); } } + let runtime_options = RuntimeOptionsBuilder::default() + .telemetry_options( + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, + ) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid runtime options: {err}")))?; + // Create core runtime which starts tokio multi-thread runtime - let mut core = CoreRuntime::new( - telemetry_build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {err}")))?, - TokioRuntimeBuilder::default(), - ) - .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; + let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) + .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle let mut metrics_call_buffer: Option>> = None; - if let Some(metrics_conf) = telemetry_config.metrics { + if let Some(metrics_conf) = metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered if metrics_conf.buffered_with_size > 0 { diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 92b43f356..0a5deb928 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -23,6 +23,7 @@ use temporal_sdk_core_protos::coresdk::{ nexus::NexusTaskCompletion, ActivityHeartbeat, ActivityTaskCompletion, }; use temporal_sdk_core_protos::temporal::api::history::v1::History; +use temporal_sdk_core_protos::temporal::api::worker::v1::PluginInfo; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -63,6 +64,8 @@ pub struct WorkerConfig { nondeterminism_as_workflow_fail: bool, nondeterminism_as_workflow_fail_for_types: HashSet, nexus_task_poller_behavior: PollerBehavior, + plugins: Vec, + skip_client_worker_set_check: bool, } #[derive(FromPyObject)] @@ -644,11 +647,12 @@ impl WorkerRef { Ok(()) } - fn replace_client(&self, client: &client::ClientRef) { + fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> { self.worker .as_ref() .expect("missing worker") - .replace_client(client.retry_client.clone().into_inner()); + .replace_client(client.retry_client.clone().into_inner()) + .map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}"))) } fn initiate_shutdown(&self) -> PyResult<()> { @@ -722,6 +726,16 @@ fn convert_worker_config( .collect::>>(), ) .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) + .plugins( + conf.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) + .skip_client_worker_set_check(conf.skip_client_worker_set_check) .build() .map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}"))) } diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 8e20b670a..d82f07b14 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -64,6 +64,8 @@ class WorkerConfig: nondeterminism_as_workflow_fail: bool nondeterminism_as_workflow_fail_for_types: Set[str] nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[str] + skip_client_worker_set_check: bool @dataclass diff --git a/temporalio/client.py b/temporalio/client.py index 71f43d6d9..540b80de6 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -349,6 +349,11 @@ def api_key(self, value: Optional[str]) -> None: self.service_client.config.api_key = value self.service_client.update_api_key(value) + @property + def plugins(self) -> Sequence[Plugin]: + """Plugins used by this client.""" + return self._config["plugins"] + # Overload for no-param workflow @overload async def start_workflow( diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 84b683941..04e83ed50 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -70,14 +70,42 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: raise RuntimeError("Runtime default already set") _default_runtime = runtime - def __init__(self, *, telemetry: TelemetryConfig) -> None: - """Create a default runtime with the given telemetry config. + def __init__( + self, + *, + telemetry: TelemetryConfig, + worker_heartbeat_interval: Optional[timedelta] = timedelta(seconds=30), + ) -> None: + """Create a runtime with the provided configuration. Each new runtime creates a new internal thread pool, so use sparingly. + + Args: + telemetry: Telemetry configuration when not supplying + ``runtime_options``. + worker_heartbeat_interval: Interval for worker heartbeats. ``None`` + disables heartbeating. + + Raises: + ValueError: If both ```runtime_options`` is a negative value. """ - self._core_runtime = temporalio.bridge.runtime.Runtime( - telemetry=telemetry._to_bridge_config() + if worker_heartbeat_interval is None: + heartbeat_millis = None + else: + if worker_heartbeat_interval <= timedelta(0): + raise ValueError("worker_heartbeat_interval must be positive") + heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) + if heartbeat_millis == 0: + heartbeat_millis = 1 + + self._heartbeat_millis = heartbeat_millis + + runtime_options = temporalio.bridge.runtime.RuntimeOptions( + telemetry=telemetry._to_bridge_config(), + worker_heartbeat_interval_millis=heartbeat_millis, ) + + self._core_runtime = temporalio.bridge.runtime.Runtime(options=runtime_options) if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 04a32be17..fae1e1c05 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -293,6 +293,8 @@ def on_eviction_hook( nexus_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum( 1 ), + plugins=[plugin.name() for plugin in self.plugins], + skip_client_worker_set_check=True, ), ) # Start worker diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 9850d32a7..a968398a6 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -148,6 +148,7 @@ def __init__( nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( maximum=5 ), + skip_client_worker_set_check: bool = False, ) -> None: """Create a worker to process workflows and/or activities. @@ -320,6 +321,9 @@ def __init__( Defaults to a 5-poller maximum. nexus_task_poller_behavior: Specify the behavior of Nexus task polling. Defaults to a 5-poller maximum. + skip_client_worker_set_check: Skip the runtime validation that ensures + the client is registered with the worker set. This should only be + used in tests. """ config = WorkerConfig( client=client, @@ -362,6 +366,7 @@ def __init__( workflow_task_poller_behavior=workflow_task_poller_behavior, activity_task_poller_behavior=activity_task_poller_behavior, nexus_task_poller_behavior=nexus_task_poller_behavior, + skip_client_worker_set_check=skip_client_worker_set_check, ) plugins_from_client = cast( @@ -374,6 +379,7 @@ def __init__( f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior." ) plugins = plugins_from_client + list(plugins) + config["plugins"] = plugins self.plugins = plugins for plugin in plugins: @@ -553,6 +559,10 @@ def check_activity(activity): maximum=config["max_concurrent_activity_task_polls"] ) + worker_plugins = [plugin.name() for plugin in config.get("plugins", [])] + client_plugins = [plugin.name() for plugin in config["client"].plugins] + plugins = list(set(worker_plugins + client_plugins)) + # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to @@ -607,6 +617,8 @@ def check_activity(activity): nexus_task_poller_behavior=config[ "nexus_task_poller_behavior" ]._to_bridge(), + plugins=plugins, + skip_client_worker_set_check=config["skip_client_worker_set_check"], ), ) @@ -900,6 +912,8 @@ class WorkerConfig(TypedDict, total=False): workflow_task_poller_behavior: PollerBehavior activity_task_poller_behavior: PollerBehavior nexus_task_poller_behavior: PollerBehavior + plugins: Sequence[Plugin] + skip_client_worker_set_check: bool def _warn_if_activity_executor_max_workers_is_inconsistent( diff --git a/tests/conftest.py b/tests/conftest.py index 8ffd3a456..0e2156f86 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,8 @@ import pytest import pytest_asyncio +import temporalio.worker + from . import DEV_SERVER_DOWNLOAD_VERSION # If there is an integration test environment variable set, we must remove the @@ -57,6 +59,18 @@ def pytest_addoption(parser): ) +@pytest.fixture(autouse=True) +def _skip_client_worker_set_check(monkeypatch): + original_init = temporalio.worker.Worker.__init__ + + def patched_init(self, *args, **kwargs): + kwargs.setdefault("skip_client_worker_set_check", True) + return original_init(self, *args, **kwargs) + + monkeypatch.setattr(temporalio.worker.Worker, "__init__", patched_init) + yield + + @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 5571841b4..f4b98397a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,11 +1,13 @@ import dataclasses import uuid import warnings +from collections import Counter from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, Awaitable, Callable, Optional, cast import pytest +import temporalio.bridge.temporal_sdk_bridge import temporalio.client import temporalio.converter import temporalio.worker @@ -149,6 +151,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyWorkerPlugin().name() + ] # Test client plugin propagation to worker plugins new_config = client.config() @@ -156,6 +161,9 @@ async def test_worker_plugin_basic_config(client: Client) -> None: client = Client(**new_config) worker = Worker(client, task_queue="queue", activities=[never_run_activity]) assert worker.config().get("task_queue") == "combined" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name() + ] # Test both. Client propagated plugins are called first, so the worker plugin overrides in this case worker = Worker( @@ -165,6 +173,40 @@ async def test_worker_plugin_basic_config(client: Client) -> None: plugins=[MyWorkerPlugin()], ) assert worker.config().get("task_queue") == "replaced_queue" + assert [p.name() for p in worker.config().get("plugins", [])] == [ + MyCombinedPlugin().name(), + MyWorkerPlugin().name(), + ] + + +async def test_worker_plugin_names_forwarded_to_core( + client: Client, monkeypatch: pytest.MonkeyPatch +) -> None: + captured_plugins: list[str] = [] + + original_new_worker = temporalio.bridge.temporal_sdk_bridge.new_worker + + def new_worker_wrapper(runtime_ref, client_ref, config): + nonlocal captured_plugins + captured_plugins = list(config.plugins) + return original_new_worker(runtime_ref, client_ref, config) + + monkeypatch.setattr( + temporalio.bridge.temporal_sdk_bridge, + "new_worker", + new_worker_wrapper, + ) + + plugin1 = SimplePlugin("test-worker-plugin1") + plugin2 = SimplePlugin("test-worker-plugin2") + worker = Worker( + client, + task_queue="queue", + activities=[never_run_activity], + plugins=[plugin1, plugin2], + ) + # Use counter to compare unordered lists + assert Counter(captured_plugins) == Counter([plugin1.name(), plugin2.name()]) async def test_worker_duplicated_plugin(client: Client) -> None: diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 4505ebfcf..5677eb040 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -7,6 +7,8 @@ from typing import List, cast from urllib.request import urlopen +import pytest + from temporalio import workflow from temporalio.client import Client from temporalio.runtime import ( @@ -18,7 +20,12 @@ TelemetryFilter, ) from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port +from tests.helpers import ( + assert_eq_eventually, + assert_eventually, + find_free_port, + worker_versioning_enabled, +) @workflow.defn @@ -254,3 +261,27 @@ async def check_metrics() -> None: # Wait for metrics to appear and match the expected buckets await assert_eventually(check_metrics) + + +def test_runtime_options_to_bridge_config() -> None: + runtime = Runtime(telemetry=TelemetryConfig()) + assert runtime._heartbeat_millis == 30_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=timedelta(seconds=60), + ) + assert runtime._heartbeat_millis == 60_000 + + runtime = Runtime( + telemetry=TelemetryConfig(), + worker_heartbeat_interval=None, + ) + assert runtime._heartbeat_millis is None + + +def test_runtime_options_invalid_heartbeat() -> None: + with pytest.raises(ValueError): + Runtime( + telemetry=TelemetryConfig(), worker_heartbeat_interval=timedelta(seconds=-5) + ) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 203b89a5a..bf5956fed 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1370,6 +1370,7 @@ async def _execute_workflow_with_activity( worker_config["task_queue"] = str(uuid.uuid4()) worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager + worker_config["skip_client_worker_set_check"] = True if not worker_config.get("max_concurrent_activities"): worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 32f27f631..116fe4336 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -17,6 +17,7 @@ from temporalio.api.workflowservice.v1 import ( DescribeWorkerDeploymentRequest, DescribeWorkerDeploymentResponse, + ListWorkersRequest, SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentCurrentVersionResponse, SetWorkerDeploymentRampingVersionRequest, @@ -28,7 +29,11 @@ TaskReachabilityType, ) from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + PrometheusConfig, + Runtime, + TelemetryConfig, +) from temporalio.service import RPCError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -1133,7 +1138,9 @@ async def test_can_run_autoscaling_polling_worker( activity_pollers = [l for l in matches if "activity_task" in l] assert len(activity_pollers) == 1 assert activity_pollers[0].endswith("2") - workflow_pollers = [l for l in matches if "workflow_task" in l] + workflow_pollers = [ + l for l in matches if "workflow_task" in l and w.task_queue in l + ] assert len(workflow_pollers) == 2 # There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on # initialization timing. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index da335635b..2d8f3a605 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5301,57 +5301,19 @@ async def run(self) -> None: await asyncio.sleep(0.1) -async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Only testing against two real servers") - # We are going to start a second ephemeral server and then replace the - # client. So we will start a no-cache ticking workflow with the current - # client and confirm it has accomplished at least one task. Then we will - # start another on the other client, and confirm it gets started too. Then - # we will terminate both. We have to use a ticking workflow with only one - # poller to force a quick re-poll to recognize our client change quickly (as - # opposed to just waiting the minute for poll timeout). - async with await WorkflowEnvironment.start_local( - dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION - ) as other_env: - # Start both workflows on different servers - task_queue = f"tq-{uuid.uuid4()}" - handle1 = await client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - handle2 = await other_env.client.start_workflow( - TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue - ) - - async def any_task_completed(handle: WorkflowHandle) -> bool: - async for e in handle.fetch_history_events(): - if e.HasField("workflow_task_completed_event_attributes"): - return True - return False - - # Now start the worker on the first env - async with Worker( - client, - task_queue=task_queue, - workflows=[TickingWorkflow], - max_cached_workflows=0, - max_concurrent_workflow_task_polls=1, - ) as worker: - # Confirm the first ticking workflow has completed a task but not - # the second - await assert_eq_eventually(True, lambda: any_task_completed(handle1)) - assert not await any_task_completed(handle2) - - # Now replace the client, which should be used fairly quickly - # because we should have timer-done poll completions every 100ms - worker.client = other_env.client - - # Now confirm the other workflow has started - await assert_eq_eventually(True, lambda: any_task_completed(handle2)) - - # Terminate both - await handle1.terminate() - await handle2.terminate() +async def test_workflow_replace_worker_client(client: Client): + other_runtime = Runtime(telemetry=TelemetryConfig()) + other_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=other_runtime, + ) + async with new_worker(client, HelloWorkflow) as worker: + with pytest.raises( + ValueError, + match="New client is not on the same runtime as the existing client", + ): + worker.client = other_client @activity.defn(dynamic=True)