Skip to content

Commit d4eb94d

Browse files
authored
Merge branch 'master' into otel-tracing-error
2 parents f390cfc + 4a2368d commit d4eb94d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+776
-557
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ license-file = "LICENSE.txt"
99
[workspace.dependencies]
1010
derive_builder = "0.20"
1111
derive_more = { version = "1.0", features = ["constructor", "display", "from", "into", "debug"] }
12+
thiserror = "2"
1213
tonic = "0.12"
1314
tonic-build = "0.12"
14-
opentelemetry = { version = "0.24", features = ["metrics"] }
15+
opentelemetry = { version = "0.26", features = ["metrics"] }
1516
prost = "0.13"
1617
prost-types = "0.13"
1718

client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ opentelemetry = { workspace = true, features = ["metrics"], optional = true }
3030
parking_lot = "0.12"
3131
prost-types = { workspace = true }
3232
slotmap = "1.0"
33-
thiserror = "1.0"
33+
thiserror = { workspace = true }
3434
tokio = "1.1"
3535
tonic = { workspace = true, features = ["tls", "tls-roots"] }
3636
tower = { version = "0.5", features = ["util"] }

client/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use crate::{
1818
proxy::HttpConnectProxyOptions,
1919
retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES},
2020
};
21+
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
2122
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
2223
pub use temporal_sdk_core_protos::temporal::api::{
2324
enums::v1::ArchivalState,
@@ -42,13 +43,12 @@ use crate::{
4243
use backoff::{exponential, ExponentialBackoff, SystemClock};
4344
use http::{uri::InvalidUri, Uri};
4445
use parking_lot::RwLock;
45-
use std::sync::OnceLock;
4646
use std::{
4747
collections::HashMap,
4848
fmt::{Debug, Formatter},
4949
ops::{Deref, DerefMut},
5050
str::FromStr,
51-
sync::Arc,
51+
sync::{Arc, OnceLock},
5252
time::{Duration, Instant},
5353
};
5454
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;

client/src/metrics.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ use temporal_sdk_core_api::telemetry::metrics::{
1212
use tonic::{body::BoxBody, transport::Channel, Code};
1313
use tower::Service;
1414

15+
/// The string name (which may be prefixed) for this metric
16+
pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
17+
/// The string name (which may be prefixed) for this metric
18+
pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";
19+
1520
/// Used to track context associated with metrics, and record/update them
1621
// Possible improvement: make generic over some type tag so that methods are only exposed if the
1722
// appropriate k/vs have already been set.
@@ -58,12 +63,12 @@ impl MetricsContext {
5863
unit: "".into(),
5964
}),
6065
svc_request_latency: meter.histogram_duration(MetricParameters {
61-
name: "request_latency".into(),
66+
name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
6267
unit: "duration".into(),
6368
description: "Histogram of client request latencies".into(),
6469
}),
6570
long_svc_request_latency: meter.histogram_duration(MetricParameters {
66-
name: "long_request_latency".into(),
71+
name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
6772
unit: "duration".into(),
6873
description: "Histogram of client long-poll request latencies".into(),
6974
}),

core-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ opentelemetry = { workspace = true, optional = true }
2323
prost = { workspace = true }
2424
prost-types = { workspace = true }
2525
serde_json = "1.0"
26-
thiserror = "1.0"
26+
thiserror = { workspace = true }
2727
tonic = { workspace = true }
2828
tracing-core = "0.1"
2929
url = "2.3"

core-api/src/telemetry.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub struct OtelCollectorOptions {
6868
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
6969
#[builder(default)]
7070
pub use_seconds_for_durations: bool,
71+
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
72+
#[builder(default)]
73+
pub histogram_bucket_overrides: HistogramBucketOverrides,
7174
}
7275

7376
/// Options for exporting metrics to Prometheus
@@ -78,15 +81,33 @@ pub struct PrometheusExporterOptions {
7881
#[builder(default)]
7982
pub global_tags: HashMap<String, String>,
8083
/// If set true, all counters will include a "_total" suffix
81-
#[builder(default = "false")]
84+
#[builder(default)]
8285
pub counters_total_suffix: bool,
8386
/// If set true, all histograms will include the unit in their name as a suffix.
8487
/// Ex: "_milliseconds".
85-
#[builder(default = "false")]
88+
#[builder(default)]
8689
pub unit_suffix: bool,
8790
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
8891
#[builder(default)]
8992
pub use_seconds_for_durations: bool,
93+
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
94+
#[builder(default)]
95+
pub histogram_bucket_overrides: HistogramBucketOverrides,
96+
}
97+
98+
/// Allows overriding the buckets used by histogram metrics
99+
#[derive(Debug, Clone, Default)]
100+
pub struct HistogramBucketOverrides {
101+
/// Overrides where the key is the metric name and the value is the list of bucket boundaries.
102+
/// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
103+
/// `*metric_name`.
104+
///
105+
/// The string names of core's built-in histogram metrics are publicly available on the
106+
/// `core::telemetry` module and the `client` crate.
107+
///
108+
/// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
109+
/// for the exact meaning of boundaries.
110+
pub overrides: HashMap<String, Vec<f64>>,
90111
}
91112

92113
/// Control where logs go
@@ -102,7 +123,8 @@ pub enum Logger {
102123
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
103124
filter: String,
104125
},
105-
// Push logs to Lang. Can used with temporal_sdk_core::telemetry::CoreLogBufferedConsumer to buffer.
126+
/// Push logs to Lang. Can be used with
127+
/// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
106128
Push {
107129
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
108130
filter: String,

core-api/src/telemetry/metrics.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,32 @@ mod otel_impls {
377377
}
378378
}
379379

380+
impl Gauge for metrics::Gauge<u64> {
381+
fn record(&self, value: u64, attributes: &MetricAttributes) {
382+
if let MetricAttributes::OTel { kvs } = attributes {
383+
self.record(value, kvs);
384+
} else {
385+
debug_assert!(
386+
false,
387+
"Must use OTel attributes with an OTel metric implementation"
388+
);
389+
}
390+
}
391+
}
392+
393+
impl GaugeF64 for metrics::Gauge<f64> {
394+
fn record(&self, value: f64, attributes: &MetricAttributes) {
395+
if let MetricAttributes::OTel { kvs } = attributes {
396+
self.record(value, kvs);
397+
} else {
398+
debug_assert!(
399+
false,
400+
"Must use OTel attributes with an OTel metric implementation"
401+
);
402+
}
403+
}
404+
}
405+
380406
impl Histogram for metrics::Histogram<u64> {
381407
fn record(&self, value: u64, attributes: &MetricAttributes) {
382408
if let MetricAttributes::OTel { kvs } = attributes {

core-api/src/worker.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ pub struct WorkerConfig {
8080
/// server-side. Note that this only takes effect upon an activity poll request. If multiple
8181
/// workers on the same queue have different values set, they will thrash with the last poller
8282
/// winning.
83+
///
84+
/// Setting this to a nonzero value will also disable eager activity execution.
8385
#[builder(default)]
8486
pub max_task_queue_activities_per_second: Option<f64>,
8587

core/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ itertools = "0.13"
4343
lru = "0.12"
4444
mockall = "0.13"
4545
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
46-
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
47-
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
48-
opentelemetry-prometheus = { version = "0.17", optional = true }
46+
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
47+
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true }
48+
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
4949
parking_lot = { version = "0.12", features = ["send_guard"] }
5050
pid = "4.0"
5151
pin-project = "1.0"
@@ -61,7 +61,7 @@ siphasher = "1.0"
6161
slotmap = "1.0"
6262
sysinfo = { version = "0.32", default-features = false, features = ["system"] }
6363
tar = { version = "0.4", optional = true }
64-
thiserror = "1.0"
64+
thiserror = { workspace = true }
6565
tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
6666
tokio-util = { version = "0.7", features = ["io", "io-util"] }
6767
tokio-stream = "0.1"

core/src/core_tests/activity_tasks.rs

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::{
22
advance_fut, job_assert, prost_dur,
33
test_help::{
44
build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply,
5-
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply,
6-
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
7-
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
5+
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker,
6+
poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs,
7+
MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
88
},
99
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
1010
ActivityHeartbeat, Worker,
@@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{
4545
},
4646
temporal::api::{
4747
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
48-
enums::v1::EventType,
48+
enums::v1::{CommandType, EventType},
4949
history::v1::{
5050
history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes,
5151
},
52+
sdk::v1::UserMetadata,
5253
workflowservice::v1::{
5354
PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse,
5455
RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse,
5556
RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse,
5657
},
5758
},
58-
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
59+
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
5960
};
6061
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
6162
use tokio::{join, sync::Barrier, time::sleep};
@@ -633,11 +634,11 @@ async fn max_tq_acts_set_passed_to_poll_properly() {
633634
worker.poll_activity_task().await.unwrap();
634635
}
635636

636-
/// This test doesn't test the real worker config since [mock_worker] bypasses the worker
637-
/// constructor, [mock_worker] will not pass an activity poller to the worker when
638-
/// `no_remote_activities` is set to `true`.
637+
#[rstest::rstest]
639638
#[tokio::test]
640-
async fn no_eager_activities_requested_when_worker_options_disable_remote_activities() {
639+
async fn no_eager_activities_requested_when_worker_options_disable_it(
640+
#[values("no_remote", "throttle")] reason: &'static str,
641+
) {
641642
let wfid = "fake_wf_id";
642643
let mut t = TestHistoryBuilder::default();
643644
t.add_by_type(EventType::WorkflowExecutionStarted);
@@ -648,7 +649,6 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
648649
t.add_full_wf_task();
649650
t.add_workflow_execution_completed();
650651
let num_eager_requested = Arc::new(AtomicUsize::new(0));
651-
// Clone it to move into the callback below
652652
let num_eager_requested_clone = num_eager_requested.clone();
653653

654654
let mut mock = mock_workflow_client();
@@ -677,14 +677,13 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
677677
})
678678
});
679679
let mut mock = single_hist_mock_sg(wfid, t, [1], mock, true);
680-
let mut mock_poller = mock_manual_poller();
681-
mock_poller
682-
.expect_poll()
683-
.returning(|| futures_util::future::pending().boxed());
684-
mock.set_act_poller(Box::new(mock_poller));
685680
mock.worker_cfg(|wc| {
686681
wc.max_cached_workflows = 2;
687-
wc.no_remote_activities = true;
682+
if reason == "no_remote" {
683+
wc.no_remote_activities = true;
684+
} else {
685+
wc.max_task_queue_activities_per_second = Some(1.0);
686+
}
688687
});
689688
let core = mock_worker(mock);
690689

@@ -1187,3 +1186,54 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
11871186
};
11881187
join!(shutdown_task, complete_task);
11891188
}
1189+
1190+
#[tokio::test]
1191+
async fn pass_activity_summary_to_metadata() {
1192+
let t = canned_histories::single_activity("1");
1193+
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
1194+
let wf_id = mock_cfg.hists[0].wf_id.clone();
1195+
let wf_type = DEFAULT_WORKFLOW_TYPE;
1196+
let expected_user_metadata = Some(UserMetadata {
1197+
summary: Some(b"activity summary".into()),
1198+
details: None,
1199+
});
1200+
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
1201+
asserts
1202+
.then(move |wft| {
1203+
assert_eq!(wft.commands.len(), 1);
1204+
assert_eq!(
1205+
wft.commands[0].command_type(),
1206+
CommandType::ScheduleActivityTask
1207+
);
1208+
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
1209+
})
1210+
.then(move |wft| {
1211+
assert_eq!(wft.commands.len(), 1);
1212+
assert_eq!(
1213+
wft.commands[0].command_type(),
1214+
CommandType::CompleteWorkflowExecution
1215+
);
1216+
});
1217+
});
1218+
1219+
let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
1220+
worker.register_wf(wf_type, |ctx: WfContext| async move {
1221+
ctx.activity(ActivityOptions {
1222+
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1223+
summary: Some("activity summary".to_string()),
1224+
..Default::default()
1225+
})
1226+
.await;
1227+
Ok(().into())
1228+
});
1229+
worker
1230+
.submit_wf(
1231+
wf_id.to_owned(),
1232+
wf_type.to_owned(),
1233+
vec![],
1234+
WorkflowOptions::default(),
1235+
)
1236+
.await
1237+
.unwrap();
1238+
worker.run_until_done().await.unwrap();
1239+
}

0 commit comments

Comments
 (0)