Skip to content

Commit 87afc29

Browse files
Emit GrpcMessageTooLarge as failure_reason for WFT (#1069)
Emit GrpcMessageTooLarge as failure_reason for WFT
1 parent aed367e commit 87afc29

File tree

3 files changed

+40
-5
lines changed

3 files changed

+40
-5
lines changed

crates/sdk-core/src/telemetry/metrics.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ pub(crate) enum FailureReason {
642642
Timeout,
643643
NexusOperation(String),
644644
NexusHandlerError(String),
645+
GrpcMessageTooLarge,
645646
}
646647
impl Display for FailureReason {
647648
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -651,6 +652,7 @@ impl Display for FailureReason {
651652
FailureReason::Timeout => "timeout".to_owned(),
652653
FailureReason::NexusOperation(op) => format!("operation_{op}"),
653654
FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"),
655+
FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(),
654656
};
655657
write!(f, "{str}")
656658
}

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ use crate::{
2323
internal_flags::InternalFlags,
2424
pollers::TrackedPermittedTqResp,
2525
protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage},
26-
telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread},
26+
telemetry::{
27+
VecDisplayer,
28+
metrics::{self, FailureReason},
29+
set_trace_subscriber_for_current_thread,
30+
},
2731
worker::{
2832
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
2933
PostActivateHookData,
@@ -66,9 +70,8 @@ use temporalio_common::{
6670
remove_from_cache::EvictionReason, workflow_activation_job,
6771
},
6872
workflow_commands::*,
69-
workflow_completion,
7073
workflow_completion::{
71-
Failure, WorkflowActivationCompletion, workflow_activation_completion,
74+
self, Failure, WorkflowActivationCompletion, workflow_activation_completion,
7275
},
7376
},
7477
temporal::api::{
@@ -134,6 +137,7 @@ pub(crate) struct Workflows {
134137
local_act_mgr: Option<Arc<LocalActivityManager>>,
135138
ever_polled: AtomicBool,
136139
default_versioning_behavior: Option<VersioningBehavior>,
140+
metrics: MetricsContext,
137141
}
138142

139143
pub(crate) struct WorkflowBasics {
@@ -176,6 +180,7 @@ impl Workflows {
176180
let (fetch_tx, fetch_rx) = unbounded_channel();
177181
let shutdown_tok = basics.shutdown_token.clone();
178182
let task_queue = basics.worker_config.task_queue.clone();
183+
let metrics = basics.metrics.clone();
179184
let default_versioning_behavior = basics.default_versioning_behavior;
180185
let extracted_wft_stream = WFTExtractor::build(
181186
client.clone(),
@@ -267,6 +272,7 @@ impl Workflows {
267272
local_act_mgr,
268273
ever_polled: AtomicBool::new(false),
269274
default_versioning_behavior,
275+
metrics,
270276
}
271277
}
272278

@@ -431,6 +437,11 @@ impl Workflows {
431437
);
432438
self.handle_activation_failed(run_id, completion_time, new_outcome)
433439
.await;
440+
self.metrics
441+
.with_new_attrs([metrics::failure_reason(
442+
FailureReason::GrpcMessageTooLarge,
443+
)])
444+
.wf_task_failed();
434445
return Err(e);
435446
}
436447
e => {

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() {
210210

211211
#[tokio::test]
212212
async fn oversize_grpc_message() {
213+
use crate::common::{NAMESPACE, prom_metrics};
213214
let wf_name = "oversize_grpc_message";
214-
let mut starter = CoreWfStarter::new(wf_name);
215+
// Enable Prometheus metrics for this test and capture the address
216+
let (telemopts, addr, _aborter) = prom_metrics(None);
217+
let runtime = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
218+
let mut starter = CoreWfStarter::new_with_runtime(wf_name, runtime);
215219
starter
216220
.worker_config
217221
.task_types(WorkerTaskTypes::workflow_only());
@@ -238,7 +242,25 @@ async fn oversize_grpc_message() {
238242
} else {
239243
false
240244
}
241-
}))
245+
}));
246+
247+
// Verify the workflow task failure metric includes the GrpcMessageTooLarge reason
248+
let tq = starter.get_task_queue();
249+
crate::common::eventually(
250+
|| async {
251+
let body = crate::integ_tests::metrics_tests::get_text(format!("http://{addr}/metrics")).await;
252+
if body.contains(&format!(
253+
"temporal_workflow_task_execution_failed{{failure_reason=\"GrpcMessageTooLarge\",namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",task_queue=\"{tq}\"}} 1"
254+
)) {
255+
Ok(())
256+
} else {
257+
Err(())
258+
}
259+
},
260+
Duration::from_secs(2),
261+
)
262+
.await
263+
.unwrap();
242264
}
243265

244266
#[tokio::test]

0 commit comments

Comments
 (0)