Skip to content

Commit 9f4b04e

Browse files
committed
Integ tests
1 parent 766de2e commit 9f4b04e

32 files changed

+423
-556
lines changed

crates/sdk-core/tests/common/mod.rs

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ use temporalio_common::{
4646
},
4747
},
4848
telemetry::{
49-
Logger, OtelCollectorOptionsBuilder, PrometheusExporterOptions,
50-
PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder,
49+
Logger, OtelCollectorOptions, PrometheusExporterOptions,
50+
TelemetryOptions,
5151
metrics::CoreMeter,
5252
},
5353
worker::{WorkerTaskTypes, WorkerVersioningStrategy},
@@ -62,8 +62,8 @@ use temporalio_sdk::{
6262
#[cfg(any(feature = "test-utilities", test))]
6363
pub(crate) use temporalio_sdk_core::test_help::NAMESPACE;
6464
use temporalio_sdk_core::{
65-
ClientOptions, CoreRuntime, RuntimeOptions, RuntimeOptionsBuilder, WorkerConfig,
66-
WorkerConfigBuilder, init_replay_worker, init_worker,
65+
ClientOptions, CoreRuntime, RuntimeOptions, WorkerConfig,
66+
init_replay_worker, init_worker,
6767
replay::{HistoryForReplay, ReplayWorkerInput},
6868
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
6969
test_help::{MockPollCfg, build_mock_pollers, mock_worker},
@@ -101,9 +101,9 @@ pub(crate) async fn init_core_and_create_wf(test_name: &str) -> CoreWfStarter {
101101
starter
102102
}
103103

104-
pub(crate) fn integ_worker_config(tq: &str) -> WorkerConfigBuilder {
105-
let mut b = WorkerConfig::builder();
106-
b.namespace(NAMESPACE)
104+
pub(crate) fn integ_worker_config(tq: &str) -> WorkerConfig {
105+
WorkerConfig::builder()
106+
.namespace(NAMESPACE)
107107
.task_queue(tq)
108108
.max_outstanding_activities(100_usize)
109109
.max_outstanding_local_activities(100_usize)
@@ -112,8 +112,9 @@ pub(crate) fn integ_worker_config(tq: &str) -> WorkerConfigBuilder {
112112
build_id: "test_build_id".to_owned(),
113113
})
114114
.task_types(WorkerTaskTypes::all())
115-
.skip_client_worker_set_check(true);
116-
b
115+
.skip_client_worker_set_check(true)
116+
.build()
117+
.expect("Configuration options construct properly")
117118
}
118119

119120
/// Create a worker replay instance preloaded with provided histories. Returns the worker impl.
@@ -129,9 +130,7 @@ where
129130
I: Stream<Item = HistoryForReplay> + Send + 'static,
130131
{
131132
init_integ_telem();
132-
let worker_cfg = integ_worker_config(test_name)
133-
.build()
134-
.expect("Configuration options construct properly");
133+
let worker_cfg = integ_worker_config(test_name);
135134
let worker = init_replay_worker(ReplayWorkerInput::new(worker_cfg, histories))
136135
.expect("Replay worker must init properly");
137136
Arc::new(worker)
@@ -177,7 +176,7 @@ pub(crate) fn init_integ_telem() -> Option<&'static CoreRuntime> {
177176
}
178177
Some(INTEG_TESTS_RT.get_or_init(|| {
179178
let telemetry_options = get_integ_telem_options();
180-
let runtime_options = RuntimeOptionsBuilder::default()
179+
let runtime_options = RuntimeOptions::builder()
181180
.telemetry_options(telemetry_options)
182181
.build()
183182
.expect("Runtime options build cleanly");
@@ -224,7 +223,7 @@ pub(crate) async fn get_cloud_client() -> RetryClient<Client> {
224223
pub(crate) struct CoreWfStarter {
225224
/// Used for both the task queue and workflow id
226225
task_queue_name: String,
227-
pub worker_config: WorkerConfigBuilder,
226+
pub worker_config: WorkerConfig,
228227
/// Options to use when starting workflow(s)
229228
pub workflow_options: WorkflowOptions,
230229
initted_worker: OnceCell<InitializedWorker>,
@@ -298,10 +297,20 @@ impl CoreWfStarter {
298297
) -> Self {
299298
let task_q_salt = rand_6_chars();
300299
let task_queue = format!("{test_name}_{task_q_salt}");
301-
let mut worker_config = integ_worker_config(&task_queue);
302-
worker_config
300+
let worker_config = WorkerConfig::builder()
303301
.namespace(env::var(INTEG_NAMESPACE_ENV_VAR).unwrap_or(NAMESPACE.to_string()))
304-
.max_cached_workflows(1000_usize);
302+
.task_queue(&task_queue)
303+
.max_outstanding_activities(100_usize)
304+
.max_outstanding_local_activities(100_usize)
305+
.max_outstanding_workflow_tasks(100_usize)
306+
.versioning_strategy(WorkerVersioningStrategy::None {
307+
build_id: "test_build_id".to_owned(),
308+
})
309+
.task_types(WorkerTaskTypes::all())
310+
.skip_client_worker_set_check(true)
311+
.max_cached_workflows(1000_usize)
312+
.build()
313+
.expect("Configuration options construct properly");
305314
Self {
306315
task_queue_name: task_queue,
307316
worker_config,
@@ -452,10 +461,7 @@ impl CoreWfStarter {
452461
} else {
453462
init_integ_telem().unwrap()
454463
};
455-
let cfg = self
456-
.worker_config
457-
.build()
458-
.expect("Worker config must be valid");
464+
let cfg = self.worker_config.clone();
459465
let client = if let Some(client) = self.client_override.take() {
460466
client
461467
} else {
@@ -794,41 +800,49 @@ pub(crate) fn get_integ_tls_config() -> Option<TlsOptions> {
794800
}
795801

796802
pub(crate) fn get_integ_telem_options() -> TelemetryOptions {
797-
let mut ob = TelemetryOptions::builder();
798803
let filter_string =
799804
env::var("RUST_LOG").unwrap_or_else(|_| "INFO,temporalio_sdk_core=INFO".to_string());
805+
800806
if let Some(url) = env::var(OTEL_URL_ENV_VAR)
801807
.ok()
802808
.map(|x| x.parse::<Url>().unwrap())
803809
{
804-
let opts = OtelCollectorOptionsBuilder::default()
810+
let opts = OtelCollectorOptions::builder()
805811
.url(url)
812+
.build();
813+
TelemetryOptions::builder()
814+
.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc<dyn CoreMeter>)
815+
.logging(Logger::Console {
816+
filter: filter_string,
817+
})
806818
.build()
807-
.unwrap();
808-
ob.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc<dyn CoreMeter>);
809-
}
810-
if let Some(addr) = env::var(PROM_ENABLE_ENV_VAR)
819+
} else if let Some(addr) = env::var(PROM_ENABLE_ENV_VAR)
811820
.ok()
812821
.map(|x| SocketAddr::new([127, 0, 0, 1].into(), x.parse().unwrap()))
813822
{
814823
let prom_info = start_prometheus_metric_exporter(
815-
PrometheusExporterOptionsBuilder::default()
824+
PrometheusExporterOptions::builder()
816825
.socket_addr(addr)
817-
.build()
818-
.unwrap(),
826+
.build(),
819827
)
820828
.unwrap();
821-
ob.metrics(prom_info.meter as Arc<dyn CoreMeter>);
829+
TelemetryOptions::builder()
830+
.metrics(prom_info.meter as Arc<dyn CoreMeter>)
831+
.logging(Logger::Console {
832+
filter: filter_string,
833+
})
834+
.build()
835+
} else {
836+
TelemetryOptions::builder()
837+
.logging(Logger::Console {
838+
filter: filter_string,
839+
})
840+
.build()
822841
}
823-
ob.logging(Logger::Console {
824-
filter: filter_string,
825-
})
826-
.build()
827-
.unwrap()
828842
}
829843

830844
pub(crate) fn get_integ_runtime_options(telemopts: TelemetryOptions) -> RuntimeOptions {
831-
RuntimeOptionsBuilder::default()
845+
RuntimeOptions::builder()
832846
.telemetry_options(telemopts)
833847
.build()
834848
.unwrap()
@@ -886,10 +900,9 @@ pub(crate) fn prom_metrics(
886900
options_override: Option<PrometheusExporterOptions>,
887901
) -> (TelemetryOptions, SocketAddr, AbortOnDrop) {
888902
let prom_exp_opts = options_override.unwrap_or_else(|| {
889-
PrometheusExporterOptionsBuilder::default()
903+
PrometheusExporterOptions::builder()
890904
.socket_addr(ANY_PORT.parse().unwrap())
891905
.build()
892-
.unwrap()
893906
});
894907
let mut telemopts = get_integ_telem_options();
895908
let prom_info = start_prometheus_metric_exporter(prom_exp_opts).unwrap();
@@ -998,13 +1011,14 @@ impl Drop for ActivationAssertionsInterceptor {
9981011

9991012
#[cfg(feature = "ephemeral-server")]
10001013
use temporalio_sdk_core::ephemeral_server::{
1001-
EphemeralExe, EphemeralExeVersion, TemporalDevServerConfigBuilder, default_cached_download,
1014+
EphemeralExe, EphemeralExeVersion, TemporalDevServerConfig, default_cached_download,
10021015
};
10031016

10041017
#[cfg(feature = "ephemeral-server")]
10051018
pub(crate) fn integ_dev_server_config(
10061019
mut extra_args: Vec<String>,
1007-
) -> TemporalDevServerConfigBuilder {
1020+
ui: bool,
1021+
) -> TemporalDevServerConfig {
10081022
let cli_version = if let Ok(ver_override) = env::var(CLI_VERSION_OVERRIDE_ENV_VAR) {
10091023
EphemeralExe::CachedDownload {
10101024
version: EphemeralExeVersion::Fixed(ver_override.to_owned()),
@@ -1041,7 +1055,9 @@ pub(crate) fn integ_dev_server_config(
10411055
.map(Into::into),
10421056
);
10431057

1044-
let mut config = TemporalDevServerConfigBuilder::default();
1045-
config.exe(cli_version).extra_args(extra_args);
1046-
config
1058+
TemporalDevServerConfig::builder()
1059+
.exe(cli_version)
1060+
.extra_args(extra_args)
1061+
.ui(ui)
1062+
.build()
10471063
}

crates/sdk-core/tests/global_metric_tests.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use common::CoreWfStarter;
77
use parking_lot::Mutex;
88
use std::{sync::Arc, time::Duration};
99
use temporalio_common::telemetry::{
10-
Logger, OtelCollectorOptionsBuilder, TelemetryOptionsBuilder, metrics::CoreMeter,
10+
Logger, OtelCollectorOptions, TelemetryOptions, metrics::CoreMeter,
1111
};
1212
use temporalio_sdk_core::{
1313
CoreRuntime,
@@ -58,18 +58,16 @@ async fn otel_errors_logged_as_errors() {
5858
.with_env_filter("debug")
5959
.finish(),
6060
);
61-
let opts = OtelCollectorOptionsBuilder::default()
61+
let opts = OtelCollectorOptions::builder()
6262
.url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port
63-
.build()
64-
.unwrap();
63+
.build();
6564
let exporter = build_otlp_metric_exporter(opts).unwrap();
6665

6766
// Global initialization is needed to capture (some) otel logging.
6867
telemetry_init_global(
6968
TelemetryOptions::builder()
7069
.subscriber_override(subscriber)
71-
.build()
72-
.unwrap(),
70+
.build(),
7371
)
7472
.unwrap();
7573
let telemopts = TelemetryOptions::builder()
@@ -78,8 +76,7 @@ async fn otel_errors_logged_as_errors() {
7876
.logging(Logger::Console {
7977
filter: construct_filter_string(Level::INFO, Level::WARN),
8078
})
81-
.build()
82-
.unwrap();
79+
.build();
8380

8481
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
8582
let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt);

crates/sdk-core/tests/heavy_tests.rs

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,10 @@ async fn activity_load() {
4040
const CONCURRENCY: usize = 512;
4141

4242
let mut starter = CoreWfStarter::new("activity_load");
43-
starter
44-
.worker_config
45-
.max_outstanding_workflow_tasks(CONCURRENCY)
46-
.max_cached_workflows(CONCURRENCY)
47-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize))
48-
.max_outstanding_activities(CONCURRENCY);
43+
starter.worker_config.max_outstanding_workflow_tasks = Some(CONCURRENCY);
44+
starter.worker_config.max_cached_workflows = CONCURRENCY;
45+
starter.worker_config.activity_task_poller_behavior = PollerBehavior::SimpleMaximum(10);
46+
starter.worker_config.max_outstanding_activities = Some(CONCURRENCY);
4947
let mut worker = starter.worker().await;
5048

5149
let activity_id = "act-1";
@@ -110,11 +108,12 @@ async fn chunky_activities_resource_based() {
110108
const WORKFLOWS: usize = 100;
111109

112110
let mut starter = CoreWfStarter::new("chunky_activities_resource_based");
113-
starter
114-
.worker_config
115-
.clear_max_outstanding_opts()
116-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize))
117-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize));
111+
starter.worker_config.max_outstanding_workflow_tasks = None;
112+
starter.worker_config.max_outstanding_local_activities = None;
113+
starter.worker_config.max_outstanding_activities = None;
114+
starter.worker_config.max_outstanding_nexus_tasks = None;
115+
starter.worker_config.workflow_task_poller_behavior = PollerBehavior::SimpleMaximum(10_usize);
116+
starter.worker_config.activity_task_poller_behavior = PollerBehavior::SimpleMaximum(10_usize);
118117
let mut tuner = ResourceBasedTuner::new(0.7, 0.7);
119118
tuner
120119
.with_workflow_slots_options(ResourceSlotOptions::new(
@@ -123,7 +122,7 @@ async fn chunky_activities_resource_based() {
123122
Duration::from_millis(0),
124123
))
125124
.with_activity_slots_options(ResourceSlotOptions::new(5, 1000, Duration::from_millis(50)));
126-
starter.worker_config.tuner(Arc::new(tuner));
125+
starter.worker_config.tuner = Some(Arc::new(tuner));
127126
let mut worker = starter.worker().await;
128127

129128
let activity_id = "act-1";
@@ -203,12 +202,10 @@ async fn workflow_load() {
203202
init_integ_telem();
204203
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
205204
let mut starter = CoreWfStarter::new_with_runtime("workflow_load", rt);
206-
starter
207-
.worker_config
208-
.max_outstanding_workflow_tasks(5_usize)
209-
.max_cached_workflows(200_usize)
210-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize))
211-
.max_outstanding_activities(100_usize);
205+
starter.worker_config.max_outstanding_workflow_tasks = Some(5);
206+
starter.worker_config.max_cached_workflows = 200;
207+
starter.worker_config.activity_task_poller_behavior = PollerBehavior::SimpleMaximum(10);
208+
starter.worker_config.max_outstanding_activities = Some(100);
212209
let mut worker = starter.worker().await;
213210
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
214211
let sigchan = ctx.make_signal_channel(SIGNAME).map(Ok);
@@ -282,10 +279,8 @@ async fn workflow_load() {
282279
async fn evict_while_la_running_no_interference() {
283280
let wf_name = "evict_while_la_running_no_interference";
284281
let mut starter = CoreWfStarter::new(wf_name);
285-
starter
286-
.worker_config
287-
.max_outstanding_local_activities(20_usize)
288-
.max_cached_workflows(20_usize);
282+
starter.worker_config.max_outstanding_local_activities = Some(20);
283+
starter.worker_config.max_cached_workflows = 20;
289284
// Though it doesn't make sense to set wft higher than cached workflows, leaving this commented
290285
// introduces more instability that can be useful in the test.
291286
// starter.max_wft(20);
@@ -350,11 +345,9 @@ pub async fn many_parallel_timers_longhist(ctx: WfContext) -> WorkflowResult<()>
350345
async fn can_paginate_long_history() {
351346
let wf_name = "can_paginate_long_history";
352347
let mut starter = CoreWfStarter::new(wf_name);
353-
starter
354-
.worker_config
355-
.task_types(WorkerTaskTypes::workflow_only())
356-
// Do not use sticky queues so we are forced to paginate once history gets long
357-
.max_cached_workflows(0_usize);
348+
starter.worker_config.task_types = WorkerTaskTypes::workflow_only();
349+
// Do not use sticky queues so we are forced to paginate once history gets long
350+
starter.worker_config.max_cached_workflows = 0;
358351

359352
let mut worker = starter.worker().await;
360353
worker.register_wf(wf_name.to_owned(), many_parallel_timers_longhist);
@@ -394,21 +387,19 @@ async fn poller_autoscaling_basic_loadtest() {
394387
let num_workflows = 100;
395388
let wf_name = "poller_load";
396389
let mut starter = CoreWfStarter::new("poller_load");
397-
starter
398-
.worker_config
399-
.max_cached_workflows(5000_usize)
400-
.max_outstanding_workflow_tasks(1000_usize)
401-
.max_outstanding_activities(1000_usize)
402-
.workflow_task_poller_behavior(PollerBehavior::Autoscaling {
403-
minimum: 1,
404-
maximum: 200,
405-
initial: 5,
406-
})
407-
.activity_task_poller_behavior(PollerBehavior::Autoscaling {
408-
minimum: 1,
409-
maximum: 200,
410-
initial: 5,
411-
});
390+
starter.worker_config.max_cached_workflows = 5000;
391+
starter.worker_config.max_outstanding_workflow_tasks = Some(1000);
392+
starter.worker_config.max_outstanding_activities = Some(1000);
393+
starter.worker_config.workflow_task_poller_behavior = PollerBehavior::Autoscaling {
394+
minimum: 1,
395+
maximum: 200,
396+
initial: 5,
397+
};
398+
starter.worker_config.activity_task_poller_behavior = PollerBehavior::Autoscaling {
399+
minimum: 1,
400+
maximum: 200,
401+
initial: 5,
402+
};
412403
let mut worker = starter.worker().await;
413404
let shutdown_handle = worker.inner_mut().shutdown_handle();
414405
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {

crates/sdk-core/tests/heavy_tests/fuzzy_workflow.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,9 @@ async fn fuzzy_workflow() {
7777
let num_workflows = 200;
7878
let wf_name = "fuzzy_wf";
7979
let mut starter = CoreWfStarter::new("fuzzy_workflow");
80-
starter
81-
.worker_config
82-
.max_outstanding_workflow_tasks(25_usize)
83-
.max_cached_workflows(25_usize)
84-
.max_outstanding_activities(25_usize);
80+
starter.worker_config.max_outstanding_workflow_tasks = Some(25);
81+
starter.worker_config.max_cached_workflows = 25;
82+
starter.worker_config.max_outstanding_activities = Some(25);
8583
let mut worker = starter.worker().await;
8684
worker.register_wf(wf_name.to_owned(), fuzzy_wf_def);
8785
worker.register_activity("echo_activity", echo);

0 commit comments

Comments
 (0)