Skip to content

Commit 54df6c4

Browse files
committed
Autoformat
1 parent 7071c0e commit 54df6c4

File tree

12 files changed

+97
-139
lines changed

12 files changed

+97
-139
lines changed

crates/common/src/worker.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,40 @@ pub struct WorkerConfig {
240240
pub skip_client_worker_set_check: bool,
241241
}
242242

243+
impl WorkerConfig {
244+
/// Returns true if the configuration specifies we should fail a workflow on a certain error
245+
/// type rather than failing the workflow task.
246+
pub fn should_fail_workflow(
247+
&self,
248+
workflow_type: &str,
249+
error_type: &WorkflowErrorType,
250+
) -> bool {
251+
self.workflow_failure_errors.contains(error_type)
252+
|| self
253+
.workflow_types_to_failure_errors
254+
.get(workflow_type)
255+
.map(|s| s.contains(error_type))
256+
.unwrap_or(false)
257+
}
258+
259+
pub fn computed_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
260+
let wdv = match self.versioning_strategy {
261+
WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion {
262+
deployment_name: "".to_owned(),
263+
build_id: build_id.clone(),
264+
},
265+
WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(),
266+
WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => {
267+
WorkerDeploymentVersion {
268+
deployment_name: "".to_owned(),
269+
build_id: build_id.clone(),
270+
}
271+
}
272+
};
273+
if wdv.is_empty() { None } else { Some(wdv) }
274+
}
275+
}
276+
243277
impl<S: worker_config_builder::IsComplete> WorkerConfigBuilder<S> {
244278
pub fn build(self) -> Result<WorkerConfig, String> {
245279
let config = self.build_internal();
@@ -260,12 +294,12 @@ impl<S: worker_config_builder::IsComplete> WorkerConfigBuilder<S> {
260294
config.nexus_task_poller_behavior.validate()?;
261295

262296
if let Some(ref x) = config.max_worker_activities_per_second
263-
&& (!x.is_normal() || x.is_sign_negative()) {
264-
return Err(
265-
"`max_worker_activities_per_second` must be positive and nonzero"
266-
.to_string(),
267-
);
268-
}
297+
&& (!x.is_normal() || x.is_sign_negative())
298+
{
299+
return Err(
300+
"`max_worker_activities_per_second` must be positive and nonzero".to_string(),
301+
);
302+
}
269303

270304
if matches!(config.max_outstanding_workflow_tasks, Some(v) if v == 0) {
271305
return Err("`max_outstanding_workflow_tasks` must be > 0".to_string());
@@ -282,9 +316,13 @@ impl<S: worker_config_builder::IsComplete> WorkerConfigBuilder<S> {
282316

283317
if config.max_cached_workflows > 0 {
284318
if let Some(max_wft) = config.max_outstanding_workflow_tasks
285-
&& max_wft < 2 {
286-
return Err("`max_cached_workflows` > 0 requires `max_outstanding_workflow_tasks` >= 2".to_string());
287-
}
319+
&& max_wft < 2
320+
{
321+
return Err(
322+
"`max_cached_workflows` > 0 requires `max_outstanding_workflow_tasks` >= 2"
323+
.to_string(),
324+
);
325+
}
288326
if matches!(config.workflow_task_poller_behavior, PollerBehavior::SimpleMaximum(u) if u < 2)
289327
{
290328
return Err("`max_cached_workflows` > 0 requires `workflow_task_poller_behavior` to be at least 2".to_string());
@@ -325,44 +363,6 @@ impl<S: worker_config_builder::IsComplete> WorkerConfigBuilder<S> {
325363
}
326364
}
327365

328-
impl WorkerConfig {
329-
/// Returns true if the configuration specifies we should fail a workflow on a certain error
330-
/// type rather than failing the workflow task.
331-
pub fn should_fail_workflow(
332-
&self,
333-
workflow_type: &str,
334-
error_type: &WorkflowErrorType,
335-
) -> bool {
336-
self.workflow_failure_errors.contains(error_type)
337-
|| self
338-
.workflow_types_to_failure_errors
339-
.get(workflow_type)
340-
.map(|s| s.contains(error_type))
341-
.unwrap_or(false)
342-
}
343-
344-
pub fn computed_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
345-
let wdv = match self.versioning_strategy {
346-
WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion {
347-
deployment_name: "".to_owned(),
348-
build_id: build_id.clone(),
349-
},
350-
WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(),
351-
WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => {
352-
WorkerDeploymentVersion {
353-
deployment_name: "".to_owned(),
354-
build_id: build_id.clone(),
355-
}
356-
}
357-
};
358-
if wdv.is_empty() { None } else { Some(wdv) }
359-
}
360-
}
361-
362-
// Note: WorkerConfigBuilder is now generated by bon.
363-
// The custom clear_max_outstanding_opts method is removed because bon's builder
364-
// doesn't support custom methods the same way. Users should manually unset these fields if needed.
365-
366366
/// This trait allows users to customize the performance characteristics of workers dynamically.
367367
/// For more, see the docstrings of the traits in the return types of its functions.
368368
pub trait WorkerTuner {

crates/sdk-core-c-bridge/src/runtime.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ use std::{
1717
};
1818
use temporalio_common::telemetry::{
1919
CoreLog, CoreLogConsumer, HistogramBucketOverrides, Logger, MetricTemporality,
20-
OtelCollectorOptions, PrometheusExporterOptions,
21-
TelemetryOptions as CoreTelemetryOptions, metrics::CoreMeter,
20+
OtelCollectorOptions, PrometheusExporterOptions, TelemetryOptions as CoreTelemetryOptions,
21+
metrics::CoreMeter,
2222
};
2323
use temporalio_sdk_core::{
24-
CoreRuntime, RuntimeOptions as CoreRuntimeOptions,
25-
TokioRuntimeBuilder as TokioRuntime,
24+
CoreRuntime, RuntimeOptions as CoreRuntimeOptions, TokioRuntimeBuilder as TokioRuntime,
2625
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
2726
};
2827
use tracing::Level;
@@ -143,11 +142,8 @@ pub extern "C" fn temporal_core_runtime_new(options: *const RuntimeOptions) -> R
143142
// freeable
144143
let mut runtime = Runtime {
145144
core: Arc::new(
146-
CoreRuntime::new(
147-
CoreRuntimeOptions::default(),
148-
TokioRuntime::default(),
149-
)
150-
.unwrap(),
145+
CoreRuntime::new(CoreRuntimeOptions::default(), TokioRuntime::default())
146+
.unwrap(),
151147
),
152148
log_forwarder: None,
153149
};
@@ -208,13 +204,15 @@ impl Runtime {
208204
let mut log_forwarder = None;
209205
let telemetry_options = if let Some(v) = unsafe { options.telemetry.as_ref() } {
210206
// Prepare values
211-
let (attach_service_name, metric_prefix) = if let Some(v) = unsafe { v.metrics.as_ref() } {
212-
(v.attach_service_name, v.metric_prefix.to_option_string())
213-
} else {
214-
(true, None)
215-
};
207+
let (attach_service_name, metric_prefix) =
208+
if let Some(v) = unsafe { v.metrics.as_ref() } {
209+
(v.attach_service_name, v.metric_prefix.to_option_string())
210+
} else {
211+
(true, None)
212+
};
216213

217-
let logging = unsafe { v.logging.as_ref() }.map(|v| if let Some(callback) = v.forward_to {
214+
let logging = unsafe { v.logging.as_ref() }.map(|v| {
215+
if let Some(callback) = v.forward_to {
218216
let consumer = Arc::new(LogForwarder {
219217
callback,
220218
active: AtomicBool::new(false),
@@ -228,7 +226,8 @@ impl Runtime {
228226
Logger::Console {
229227
filter: v.filter.to_string(),
230228
}
231-
});
229+
}
230+
});
232231

233232
// Build with all values
234233
CoreTelemetryOptions::builder()

crates/sdk-core-c-bridge/src/tests/utils.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use crate::{
55
};
66
use std::{collections::HashMap, ops::Deref};
77
use temporalio_client::ClientOptions;
8-
use temporalio_sdk_core::ephemeral_server::{
9-
TemporalDevServerConfig, default_cached_download,
10-
};
8+
use temporalio_sdk_core::ephemeral_server::{TemporalDevServerConfig, default_cached_download};
119
use url::Url;
1210

1311
pub fn byte_array_to_vec(runtime: *mut Runtime, byte_array: *const ByteArray) -> Option<Vec<u8>> {

crates/sdk-core/benches/workflow_replay_bench.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,9 @@ pub fn bench_metrics(c: &mut Criterion) {
8686
c.bench_function("Record with new attributes on each call", move |b| {
8787
b.iter_batched(
8888
|| {
89-
let c = meter.counter(
90-
MetricParameters::builder()
91-
.name("c")
92-
.build(),
93-
);
94-
let h = meter.histogram(
95-
MetricParameters::builder()
96-
.name("h")
97-
.build(),
98-
);
99-
let g = meter.gauge(
100-
MetricParameters::builder()
101-
.name("g")
102-
.build(),
103-
);
89+
let c = meter.counter(MetricParameters::builder().name("c").build());
90+
let h = meter.histogram(MetricParameters::builder().name("h").build());
91+
let g = meter.gauge(MetricParameters::builder().name("g").build());
10492

10593
let vals = [1, 2, 3, 4, 5];
10694
let labels = ["l1", "l2"];

crates/sdk-core/src/worker/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1458,7 +1458,7 @@ mod tests {
14581458
// The validation would need to be added manually via a builder validator
14591459
// For now, this test just documents that a config with 0 pollers can be created
14601460
// but would fail at runtime when attempting to create the worker
1461-
use temporalio_common::worker::{WorkerConfig, WorkerVersioningStrategy, WorkerTaskTypes};
1461+
use temporalio_common::worker::{WorkerConfig, WorkerTaskTypes, WorkerVersioningStrategy};
14621462
let _cfg = WorkerConfig::builder()
14631463
.namespace("test")
14641464
.task_queue("test")

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ use tokio::{
9898
task::{LocalSet, spawn_blocking},
9999
};
100100
use tokio_stream::wrappers::UnboundedReceiverStream;
101-
use tokio_util::either::Either;
102-
use tokio_util::sync::CancellationToken;
101+
use tokio_util::{either::Either, sync::CancellationToken};
103102
use tracing::{Span, Subscriber};
104103

105104
/// Id used by server for "legacy" queries. IE: Queries that come in the `query` rather than

crates/sdk-core/tests/common/mod.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ use temporalio_common::{
4646
},
4747
},
4848
telemetry::{
49-
Logger, OtelCollectorOptions, PrometheusExporterOptions,
50-
TelemetryOptions,
49+
Logger, OtelCollectorOptions, PrometheusExporterOptions, TelemetryOptions,
5150
metrics::CoreMeter,
5251
},
5352
worker::{WorkerTaskTypes, WorkerVersioningStrategy},
@@ -62,8 +61,7 @@ use temporalio_sdk::{
6261
#[cfg(any(feature = "test-utilities", test))]
6362
pub(crate) use temporalio_sdk_core::test_help::NAMESPACE;
6463
use temporalio_sdk_core::{
65-
ClientOptions, CoreRuntime, RuntimeOptions, WorkerConfig,
66-
init_replay_worker, init_worker,
64+
ClientOptions, CoreRuntime, RuntimeOptions, WorkerConfig, init_replay_worker, init_worker,
6765
replay::{HistoryForReplay, ReplayWorkerInput},
6866
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
6967
test_help::{MockPollCfg, build_mock_pollers, mock_worker},
@@ -807,9 +805,7 @@ pub(crate) fn get_integ_telem_options() -> TelemetryOptions {
807805
.ok()
808806
.map(|x| x.parse::<Url>().unwrap())
809807
{
810-
let opts = OtelCollectorOptions::builder()
811-
.url(url)
812-
.build();
808+
let opts = OtelCollectorOptions::builder().url(url).build();
813809
TelemetryOptions::builder()
814810
.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc<dyn CoreMeter>)
815811
.logging(Logger::Console {

crates/sdk-core/tests/integ_tests/ephemeral_server_tests.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ async fn temporal_cli_shutdown_port_reuse() {
6363
#[ignore]
6464
async fn temporal_cli_concurrent_starts() -> Result<(), Box<dyn std::error::Error>> {
6565
stream::iter((0..80).map(|_| {
66-
Ok::<TemporalDevServerConfig, Box<dyn std::error::Error>>(TemporalDevServerConfig::builder()
67-
.exe(default_cached_download())
68-
.build())
66+
Ok::<TemporalDevServerConfig, Box<dyn std::error::Error>>(
67+
TemporalDevServerConfig::builder()
68+
.exe(default_cached_download())
69+
.build(),
70+
)
6971
}))
7072
.try_for_each_concurrent(8, |config| async move {
7173
let mut server = config.start_server().await?;

crates/sdk-core/tests/integ_tests/metrics_tests.rs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ use temporalio_common::{
4848
},
4949
},
5050
telemetry::{
51-
HistogramBucketOverrides, OtelCollectorOptions, OtlpProtocol,
52-
PrometheusExporterOptions, TaskQueueLabelStrategy, TelemetryOptions,
51+
HistogramBucketOverrides, OtelCollectorOptions, OtlpProtocol, PrometheusExporterOptions,
52+
TaskQueueLabelStrategy, TelemetryOptions,
5353
metrics::{
5454
CoreMeter, CounterBase, Gauge, GaugeBase, HistogramBase, MetricKeyValue,
5555
MetricParameters, NewAttributes,
5656
},
5757
},
5858
worker::{
5959
PollerBehavior, SlotKind, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext,
60-
SlotSupplier, SlotSupplierPermit, WorkerConfig, WorkerTaskTypes,
61-
WorkerVersioningStrategy, WorkflowSlotKind,
60+
SlotSupplier, SlotSupplierPermit, WorkerConfig, WorkerTaskTypes, WorkerVersioningStrategy,
61+
WorkflowSlotKind,
6262
},
6363
};
6464
use temporalio_sdk::{
@@ -668,19 +668,15 @@ async fn request_fail_codes_otel() {
668668
.ok()
669669
.map(|x| x.parse::<Url>().unwrap())
670670
{
671-
let opts = OtelCollectorOptions::builder()
672-
.url(url)
673-
.build();
671+
let opts = OtelCollectorOptions::builder().url(url).build();
674672
build_otlp_metric_exporter(opts).unwrap()
675673
} else {
676674
// skip
677675
return;
678676
};
679677
let exporter = Arc::new(exporter);
680-
let telemopts = TelemetryOptions::builder()
681-
.metrics(exporter as Arc<dyn CoreMeter>);
682-
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts.build()))
683-
.unwrap();
678+
let telemopts = TelemetryOptions::builder().metrics(exporter as Arc<dyn CoreMeter>);
679+
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts.build())).unwrap();
684680
let opts = get_integ_server_options();
685681
let mut client = opts
686682
.connect(NAMESPACE, rt.telemetry().get_temporal_metric_meter())
@@ -1313,16 +1309,11 @@ async fn test_prometheus_metric_format_consistency() {
13131309

13141310
#[tokio::test]
13151311
async fn prometheus_label_nonsense() {
1316-
let opts_builder = PrometheusExporterOptions::builder()
1317-
.socket_addr(ANY_PORT.parse().unwrap());
1312+
let opts_builder = PrometheusExporterOptions::builder().socket_addr(ANY_PORT.parse().unwrap());
13181313
let (telemopts, addr, _aborter) = prom_metrics(Some(opts_builder.build()));
13191314
let meter = telemopts.metrics.clone().unwrap();
13201315

1321-
let ctr = meter.counter(
1322-
MetricParameters::builder()
1323-
.name("some_counter")
1324-
.build(),
1325-
);
1316+
let ctr = meter.counter(MetricParameters::builder().name("some_counter").build());
13261317
let a1 = meter.new_attributes(NewAttributes::from([MetricKeyValue::new("thing", "foo")]));
13271318
let a2 = meter.new_attributes(NewAttributes::from([MetricKeyValue::new("blerp", "baz")]));
13281319
ctr.add(1, &a1);

crates/sdk-core/tests/integ_tests/polling_tests.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,7 @@ async fn switching_worker_client_changes_poll() {
193193
let mut config = integ_worker_config("my-task-queue");
194194
// We want a cache so we don't get extra remove-job activations
195195
config.max_cached_workflows = 100_usize;
196-
let worker = init_worker(
197-
init_integ_telem().unwrap(),
198-
config,
199-
client1.clone(),
200-
)
201-
.unwrap();
196+
let worker = init_worker(init_integ_telem().unwrap(), config, client1.clone()).unwrap();
202197

203198
// Poll for first task, confirm it's first wf, complete, and wait for complete
204199
info!("Doing initial poll");
@@ -398,14 +393,8 @@ async fn replace_client_works_after_polling_failure() {
398393

399394
let mut config = integ_worker_config(&task_queue);
400395
config.max_cached_workflows = 100_usize;
401-
let worker = Arc::new(
402-
init_worker(
403-
&rt,
404-
config,
405-
client_for_initial_server.clone(),
406-
)
407-
.unwrap(),
408-
);
396+
let worker =
397+
Arc::new(init_worker(&rt, config, client_for_initial_server.clone()).unwrap());
409398

410399
// Polling the initial server the first time is successful.
411400
let wf_1 = client_for_initial_server

0 commit comments

Comments
 (0)