Skip to content

Commit 97db55e

Browse files
committed
Various cleanup
1 parent b8336a8 commit 97db55e

File tree

17 files changed

+218
-246
lines changed

17 files changed

+218
-246
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ license = "MIT"
1414
license-file = "LICENSE.txt"
1515

1616
[workspace.dependencies]
17-
bon = "3"
17+
bon = { version = "3", features = ["implied-bounds"] }
1818
derive_more = { version = "2.0", features = [
1919
"constructor",
2020
"display",

crates/common/src/worker.rs

Lines changed: 62 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -277,89 +277,84 @@ impl WorkerConfig {
277277
impl<S: worker_config_builder::IsComplete> WorkerConfigBuilder<S> {
278278
pub fn build(self) -> Result<WorkerConfig, String> {
279279
let config = self.build_internal();
280+
let task_types = &config.task_types;
281+
if task_types.is_empty() {
282+
return Err("At least one task type must be enabled in `task_types`".to_string());
283+
}
284+
if !task_types.enable_workflows && task_types.enable_local_activities {
285+
return Err(
286+
"`task_types` cannot enable local activities without workflows".to_string(),
287+
);
288+
}
289+
290+
config.workflow_task_poller_behavior.validate()?;
291+
config.activity_task_poller_behavior.validate()?;
292+
config.nexus_task_poller_behavior.validate()?;
293+
294+
if let Some(ref x) = config.max_worker_activities_per_second
295+
&& (!x.is_normal() || x.is_sign_negative())
280296
{
281-
let config = config;
282-
let task_types = &config.task_types;
283-
if task_types.is_empty() {
284-
return Err("At least one task type must be enabled in `task_types`".to_string());
285-
}
286-
if !task_types.enable_workflows && task_types.enable_local_activities {
287-
return Err(
288-
"`task_types` cannot enable local activities without workflows".to_string(),
289-
);
290-
}
297+
return Err(
298+
"`max_worker_activities_per_second` must be positive and nonzero".to_string(),
299+
);
300+
}
291301

292-
config.workflow_task_poller_behavior.validate()?;
293-
config.activity_task_poller_behavior.validate()?;
294-
config.nexus_task_poller_behavior.validate()?;
302+
if matches!(config.max_outstanding_workflow_tasks, Some(v) if v == 0) {
303+
return Err("`max_outstanding_workflow_tasks` must be > 0".to_string());
304+
}
305+
if matches!(config.max_outstanding_activities, Some(v) if v == 0) {
306+
return Err("`max_outstanding_activities` must be > 0".to_string());
307+
}
308+
if matches!(config.max_outstanding_local_activities, Some(v) if v == 0) {
309+
return Err("`max_outstanding_local_activities` must be > 0".to_string());
310+
}
311+
if matches!(config.max_outstanding_nexus_tasks, Some(v) if v == 0) {
312+
return Err("`max_outstanding_nexus_tasks` must be > 0".to_string());
313+
}
295314

296-
if let Some(ref x) = config.max_worker_activities_per_second
297-
&& (!x.is_normal() || x.is_sign_negative())
315+
if config.max_cached_workflows > 0 {
316+
if let Some(max_wft) = config.max_outstanding_workflow_tasks
317+
&& max_wft < 2
298318
{
299319
return Err(
300-
"`max_worker_activities_per_second` must be positive and nonzero".to_string(),
320+
"`max_cached_workflows` > 0 requires `max_outstanding_workflow_tasks` >= 2"
321+
.to_string(),
301322
);
302323
}
303-
304-
if matches!(config.max_outstanding_workflow_tasks, Some(v) if v == 0) {
305-
return Err("`max_outstanding_workflow_tasks` must be > 0".to_string());
306-
}
307-
if matches!(config.max_outstanding_activities, Some(v) if v == 0) {
308-
return Err("`max_outstanding_activities` must be > 0".to_string());
309-
}
310-
if matches!(config.max_outstanding_local_activities, Some(v) if v == 0) {
311-
return Err("`max_outstanding_local_activities` must be > 0".to_string());
312-
}
313-
if matches!(config.max_outstanding_nexus_tasks, Some(v) if v == 0) {
314-
return Err("`max_outstanding_nexus_tasks` must be > 0".to_string());
324+
if matches!(config.workflow_task_poller_behavior, PollerBehavior::SimpleMaximum(u) if u < 2)
325+
{
326+
return Err("`max_cached_workflows` > 0 requires `workflow_task_poller_behavior` to be at least 2".to_string());
315327
}
328+
}
316329

317-
if config.max_cached_workflows > 0 {
318-
if let Some(max_wft) = config.max_outstanding_workflow_tasks
319-
&& max_wft < 2
330+
if config.tuner.is_some()
331+
&& (config.max_outstanding_workflow_tasks.is_some()
332+
|| config.max_outstanding_activities.is_some()
333+
|| config.max_outstanding_local_activities.is_some())
334+
{
335+
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_string());
336+
}
337+
338+
match &config.versioning_strategy {
339+
WorkerVersioningStrategy::None { .. } => {}
340+
WorkerVersioningStrategy::WorkerDeploymentBased(d) => {
341+
if d.use_worker_versioning
342+
&& (d.version.build_id.is_empty() || d.version.deployment_name.is_empty())
320343
{
344+
return Err("WorkerDeploymentVersion must have a non-empty build_id and deployment_name when deployment-based versioning is enabled".to_string());
345+
}
346+
}
347+
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => {
348+
if build_id.is_empty() {
321349
return Err(
322-
"`max_cached_workflows` > 0 requires `max_outstanding_workflow_tasks` >= 2"
350+
"Legacy build id-based versioning must have a non-empty build_id"
323351
.to_string(),
324352
);
325353
}
326-
if matches!(config.workflow_task_poller_behavior, PollerBehavior::SimpleMaximum(u) if u < 2)
327-
{
328-
return Err("`max_cached_workflows` > 0 requires `workflow_task_poller_behavior` to be at least 2".to_string());
329-
}
330-
}
331-
332-
if config.tuner.is_some()
333-
&& (config.max_outstanding_workflow_tasks.is_some()
334-
|| config.max_outstanding_activities.is_some()
335-
|| config.max_outstanding_local_activities.is_some())
336-
{
337-
return Err(
338-
"max_outstanding_* fields are mutually exclusive with `tuner`".to_string(),
339-
);
340-
}
341-
342-
match &config.versioning_strategy {
343-
WorkerVersioningStrategy::None { .. } => {}
344-
WorkerVersioningStrategy::WorkerDeploymentBased(d) => {
345-
if d.use_worker_versioning
346-
&& (d.version.build_id.is_empty() || d.version.deployment_name.is_empty())
347-
{
348-
return Err("WorkerDeploymentVersion must have a non-empty build_id and deployment_name when deployment-based versioning is enabled".to_string());
349-
}
350-
}
351-
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => {
352-
if build_id.is_empty() {
353-
return Err(
354-
"Legacy build id-based versioning must have a non-empty build_id"
355-
.to_string(),
356-
);
357-
}
358-
}
359354
}
360-
361-
Ok(config)
362355
}
356+
357+
Ok(config)
363358
}
364359
}
365360

crates/sdk-core-c-bridge/src/runtime.rs

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ impl Runtime {
203203
// Build telemetry options
204204
let mut log_forwarder = None;
205205
let telemetry_options = if let Some(v) = unsafe { options.telemetry.as_ref() } {
206-
// Prepare values
207206
let (attach_service_name, metric_prefix) =
208207
if let Some(v) = unsafe { v.metrics.as_ref() } {
209208
(v.attach_service_name, v.metric_prefix.to_option_string())
@@ -229,7 +228,6 @@ impl Runtime {
229228
}
230229
});
231230

232-
// Build with all values
233231
CoreTelemetryOptions::builder()
234232
.attach_service_name(attach_service_name)
235233
.maybe_metric_prefix(metric_prefix)
@@ -239,17 +237,15 @@ impl Runtime {
239237
CoreTelemetryOptions::default()
240238
};
241239

242-
let heartbeat_interval = if options.worker_heartbeat_interval_millis == 0 {
243-
None
244-
} else {
245-
Some(Duration::from_millis(
246-
options.worker_heartbeat_interval_millis,
247-
))
248-
};
249-
250240
let core_runtime_options = CoreRuntimeOptions::builder()
251241
.telemetry_options(telemetry_options)
252-
.maybe_heartbeat_interval(heartbeat_interval)
242+
.maybe_heartbeat_interval(if options.worker_heartbeat_interval_millis == 0 {
243+
None
244+
} else {
245+
Some(Duration::from_millis(
246+
options.worker_heartbeat_interval_millis,
247+
))
248+
})
253249
.build()
254250
.map_err(|e| anyhow::anyhow!(e))?;
255251

@@ -392,29 +388,30 @@ fn create_meter(
392388
));
393389
}
394390
// Build OTel exporter
395-
let metric_periodicity = if otel_options.metric_periodicity_millis > 0 {
396-
Some(Duration::from_millis(
397-
otel_options.metric_periodicity_millis.into(),
398-
))
399-
} else {
400-
None
401-
};
402-
let build = OtelCollectorOptions::builder()
403-
.url(Url::parse(otel_options.url.to_str())?)
404-
.headers(otel_options.headers.to_string_map_on_newlines())
405-
.metric_temporality(match otel_options.metric_temporality {
406-
OpenTelemetryMetricTemporality::Cumulative => MetricTemporality::Cumulative,
407-
OpenTelemetryMetricTemporality::Delta => MetricTemporality::Delta,
408-
})
409-
.global_tags(options.global_tags.to_string_map_on_newlines())
410-
.use_seconds_for_durations(otel_options.durations_as_seconds)
411-
.histogram_bucket_overrides(HistogramBucketOverrides {
412-
overrides: parse_histogram_bucket_overrides(
413-
&otel_options.histogram_bucket_overrides,
414-
)?,
415-
})
416-
.maybe_metric_periodicity(metric_periodicity);
417-
Ok(Arc::new(build_otlp_metric_exporter(build.build())?))
391+
Ok(Arc::new(build_otlp_metric_exporter(
392+
OtelCollectorOptions::builder()
393+
.url(Url::parse(otel_options.url.to_str())?)
394+
.headers(otel_options.headers.to_string_map_on_newlines())
395+
.metric_temporality(match otel_options.metric_temporality {
396+
OpenTelemetryMetricTemporality::Cumulative => MetricTemporality::Cumulative,
397+
OpenTelemetryMetricTemporality::Delta => MetricTemporality::Delta,
398+
})
399+
.global_tags(options.global_tags.to_string_map_on_newlines())
400+
.use_seconds_for_durations(otel_options.durations_as_seconds)
401+
.histogram_bucket_overrides(HistogramBucketOverrides {
402+
overrides: parse_histogram_bucket_overrides(
403+
&otel_options.histogram_bucket_overrides,
404+
)?,
405+
})
406+
.maybe_metric_periodicity(if otel_options.metric_periodicity_millis > 0 {
407+
Some(Duration::from_millis(
408+
otel_options.metric_periodicity_millis.into(),
409+
))
410+
} else {
411+
None
412+
})
413+
.build(),
414+
)?))
418415
} else if let Some(prom_options) = unsafe { options.prometheus.as_ref() } {
419416
if custom_meter.is_some() {
420417
return Err(anyhow::anyhow!(

crates/sdk-core-c-bridge/src/testing.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,24 +203,23 @@ impl TryFrom<&DevServerOptions> for ephemeral_server::TemporalDevServerConfig {
203203

204204
fn try_from(options: &DevServerOptions) -> anyhow::Result<Self> {
205205
let test_server_options = unsafe { &*options.test_server };
206-
let ui_port = if options.ui_port != 0 && options.ui {
207-
Some(options.ui_port)
208-
} else {
209-
None
210-
};
211206
Ok(ephemeral_server::TemporalDevServerConfig::builder()
212207
.exe(test_server_options.exe())
213208
.namespace(options.namespace.to_string())
214209
.ip(options.ip.to_string())
210+
.maybe_port(test_server_options.port())
211+
.maybe_db_filename(options.database_filename.to_option_string())
215212
.ui(options.ui)
213+
.maybe_ui_port(if options.ui_port != 0 && options.ui {
214+
Some(options.ui_port)
215+
} else {
216+
None
217+
})
216218
.log((
217219
options.log_format.to_string(),
218220
options.log_level.to_string(),
219221
))
220222
.extra_args(test_server_options.extra_args())
221-
.maybe_port(test_server_options.port())
222-
.maybe_db_filename(options.database_filename.to_option_string())
223-
.maybe_ui_port(ui_port)
224223
.build())
225224
}
226225
}
@@ -231,8 +230,8 @@ impl TryFrom<&TestServerOptions> for ephemeral_server::TestServerConfig {
231230
fn try_from(options: &TestServerOptions) -> anyhow::Result<Self> {
232231
Ok(ephemeral_server::TestServerConfig::builder()
233232
.exe(options.exe())
234-
.extra_args(options.extra_args())
235233
.maybe_port(options.port())
234+
.extra_args(options.extra_args())
236235
.build())
237236
}
238237
}

crates/sdk-core-c-bridge/src/worker.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,7 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
11641164

11651165
fn try_from(opt: &WorkerOptions) -> anyhow::Result<Self> {
11661166
let converted_tuner: temporalio_sdk_core::TunerHolder = (&opt.tuner).try_into()?;
1167-
let config = WorkerConfig::builder()
1167+
WorkerConfig::builder()
11681168
.namespace(opt.namespace.to_str())
11691169
.task_queue(opt.task_queue.to_str())
11701170
.versioning_strategy({
@@ -1274,8 +1274,7 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
12741274
.collect::<HashSet<_>>(),
12751275
)
12761276
.build()
1277-
.map_err(|e| anyhow::anyhow!(e))?;
1278-
Ok(config)
1277+
.map_err(|err| anyhow::anyhow!(err))
12791278
}
12801279
}
12811280

@@ -1325,21 +1324,17 @@ impl TryFrom<&TunerHolder> for temporalio_sdk_core::TunerHolder {
13251324
bail!("All resource-based slot suppliers must have the same ResourceBasedTunerOptions",);
13261325
}
13271326

1328-
let resource_based = first.map(|f| {
1329-
temporalio_sdk_core::ResourceBasedSlotsOptions::builder()
1330-
.target_mem_usage(f.target_memory_usage)
1331-
.target_cpu_usage(f.target_cpu_usage)
1332-
.build()
1333-
});
1334-
1335-
let builder = temporalio_sdk_core::TunerHolderOptions::builder()
1327+
temporalio_sdk_core::TunerHolderOptions::builder()
13361328
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?)
13371329
.activity_slot_options(holder.activity_slot_supplier.try_into()?)
13381330
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?)
13391331
.nexus_slot_options(holder.nexus_task_slot_supplier.try_into()?)
1340-
.maybe_resource_based_options(resource_based);
1341-
1342-
builder
1332+
.maybe_resource_based_options(first.map(|f| {
1333+
temporalio_sdk_core::ResourceBasedSlotsOptions::builder()
1334+
.target_mem_usage(f.target_memory_usage)
1335+
.target_cpu_usage(f.target_cpu_usage)
1336+
.build()
1337+
}))
13431338
.build()
13441339
.map_err(|e| anyhow::anyhow!("Failed building tuner holder options: {}", e))?
13451340
.build_tuner_holder()

0 commit comments

Comments
 (0)