Skip to content

Commit 148774c

Browse files
authored
Fix autoscaler returning fatal error on GOAWAY if no scaling seen (#973)
1 parent 3fdbcae commit 148774c

File tree

5 files changed

+104
-27
lines changed

5 files changed

+104
-27
lines changed

client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";
8989

9090
/// Key used to communicate when a GRPC message is too large
9191
pub static MESSAGE_TOO_LARGE_KEY: &str = "message-too-large";
92+
/// Key used to indicate a error was returned by the retryer because of the short-circuit predicate
93+
pub static ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT: &str = "short-circuit";
9294

9395
/// The server times out polls after 60 seconds. Set our timeout to be slightly beyond that.
9496
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70);

client/src/retry.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
Client, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY, NamespacedClient, NoRetryOnMatching,
3-
Result, RetryConfig, raw::IsUserLongPoll,
2+
Client, ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY,
3+
NamespacedClient, NoRetryOnMatching, Result, RetryConfig, raw::IsUserLongPoll,
44
};
55
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
66
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
@@ -214,6 +214,10 @@ where
214214
if let Some(sc) = self.retry_short_circuit.as_ref()
215215
&& (sc.predicate)(&e)
216216
{
217+
e.metadata_mut().insert(
218+
ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT,
219+
tonic::metadata::MetadataValue::from(0),
220+
);
217221
return RetryPolicy::ForwardError(e);
218222
}
219223

@@ -441,7 +445,12 @@ mod tests {
441445
FixedClock(Instant::now()),
442446
);
443447
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
444-
assert_matches!(result, RetryPolicy::ForwardError(_))
448+
let e = assert_matches!(result, RetryPolicy::ForwardError(e) => e);
449+
assert!(
450+
e.metadata()
451+
.get(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT)
452+
.is_some()
453+
);
445454
}
446455

447456
#[tokio::test]

core/src/pollers/poll_buffer.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
},
1919
time::Duration,
2020
};
21-
use temporal_client::NoRetryOnMatching;
21+
use temporal_client::{ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, NoRetryOnMatching};
2222
use temporal_sdk_core_api::worker::{
2323
ActivitySlotKind, NexusSlotKind, PollerBehavior, SlotKind, WorkflowSlotKind,
2424
};
@@ -538,20 +538,27 @@ impl PollScalerReportHandle {
538538
}
539539
}
540540
Err(e) => {
541-
// We should only see (and react to) errors in autoscaling mode
542-
if matches!(self.behavior, PollerBehavior::Autoscaling { .. })
543-
&& self.ever_saw_scaling_decision.load(Ordering::Relaxed)
544-
{
545-
debug!("Got error from server while polling: {:?}", e);
546-
if e.code() == Code::ResourceExhausted {
547-
// Scale down significantly for resource exhaustion
548-
self.change_target(usize::saturating_div, 2);
549-
} else {
550-
// Other codes that would normally have made us back off briefly can
551-
// reclaim this poller
552-
self.change_target(usize::saturating_sub, 1);
541+
if matches!(self.behavior, PollerBehavior::Autoscaling { .. }) {
542+
// We should only react to errors in autoscaling mode if we saw a scaling
543+
// decision
544+
if self.ever_saw_scaling_decision.load(Ordering::Relaxed) {
545+
debug!("Got error from server while polling: {:?}", e);
546+
if e.code() == Code::ResourceExhausted {
547+
// Scale down significantly for resource exhaustion
548+
self.change_target(usize::saturating_div, 2);
549+
} else {
550+
// Other codes that would normally have made us back off briefly can
551+
// reclaim this poller
552+
self.change_target(usize::saturating_sub, 1);
553+
}
553554
}
554-
return false;
555+
// Only propagate errors out if they weren't because of the short-circuiting
556+
// logic. IE: We don't want to fail callers because we said we wanted to know
557+
// about ResourceExhausted errors, but we haven't seen a scaling decision yet,
558+
// so we're not reacting to errors, only propagating them.
559+
return !e
560+
.metadata()
561+
.contains_key(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT);
555562
}
556563
}
557564
}
@@ -748,4 +755,46 @@ mod tests {
748755
pb.poll().await.unwrap().unwrap();
749756
pb.shutdown().await;
750757
}
758+
759+
#[tokio::test]
760+
async fn autoscale_wont_fail_caller_on_short_circuited_error() {
761+
let mut mock_client = mock_manual_worker_client();
762+
mock_client
763+
.expect_poll_workflow_task()
764+
.times(1)
765+
.returning(move |_, _| {
766+
async {
767+
let mut st = tonic::Status::cancelled("whatever");
768+
st.metadata_mut()
769+
.insert(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, 1.into());
770+
Err(st)
771+
}
772+
.boxed()
773+
});
774+
mock_client
775+
.expect_poll_workflow_task()
776+
.times(1)
777+
.returning(move |_, _| async { Ok(Default::default()) }.boxed());
778+
779+
let pb = LongPollBuffer::new_workflow_task(
780+
Arc::new(mock_client),
781+
"sometq".to_string(),
782+
None,
783+
PollerBehavior::Autoscaling {
784+
minimum: 1,
785+
maximum: 1,
786+
initial: 1,
787+
},
788+
fixed_size_permit_dealer(1),
789+
CancellationToken::new(),
790+
None::<fn(usize)>,
791+
WorkflowTaskOptions {
792+
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))),
793+
},
794+
);
795+
796+
// Should not see error, unwraps should get empty response
797+
pb.poll().await.unwrap().unwrap();
798+
pb.shutdown().await;
799+
}
751800
}

tests/integ_tests/client_tests.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ struct GenericService<F> {
110110
}
111111
impl<F> Service<tonic::codegen::http::Request<Body>> for GenericService<F>
112112
where
113-
F: FnMut() -> Response<Body>,
113+
F: FnMut() -> BoxFuture<'static, Response<Body>>,
114114
{
115115
type Response = Response<Body>;
116116
type Error = Infallible;
@@ -133,7 +133,7 @@ where
133133
)
134134
.unwrap();
135135
let r = (self.response_maker)();
136-
async move { Ok(r) }.boxed()
136+
async move { Ok(r.await) }.boxed()
137137
}
138138
}
139139
impl<F> NamedService for GenericService<F> {
@@ -144,12 +144,12 @@ struct FakeServer {
144144
addr: std::net::SocketAddr,
145145
shutdown_tx: oneshot::Sender<()>,
146146
header_rx: tokio::sync::mpsc::UnboundedReceiver<String>,
147-
server_handle: tokio::task::JoinHandle<()>,
147+
pub server_handle: tokio::task::JoinHandle<()>,
148148
}
149149

150150
async fn fake_server<F>(response_maker: F) -> FakeServer
151151
where
152-
F: FnMut() -> Response<Body> + Clone + Send + Sync + 'static,
152+
F: FnMut() -> BoxFuture<'static, Response<Body>> + Clone + Send + Sync + 'static,
153153
{
154154
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
155155
let (header_tx, header_rx) = tokio::sync::mpsc::unbounded_channel();
@@ -191,7 +191,7 @@ impl FakeServer {
191191

192192
#[tokio::test]
193193
async fn timeouts_respected_one_call_fake_server() {
194-
let mut fs = fake_server(|| Response::new(Body::empty())).await;
194+
let mut fs = fake_server(|| async { Response::new(Body::empty()) }.boxed()).await;
195195
let header_rx = &mut fs.header_rx;
196196

197197
let mut opts = get_integ_server_options();
@@ -260,7 +260,11 @@ async fn non_retryable_errors() {
260260
Code::Unauthenticated,
261261
Code::Unimplemented,
262262
] {
263-
let mut fs = fake_server(move || Status::new(code, "bla").into_http()).await;
263+
let mut fs = fake_server(move || {
264+
let s = Status::new(code, "bla").into_http();
265+
async { s }.boxed()
266+
})
267+
.await;
264268

265269
let mut opts = get_integ_server_options();
266270
let uri = format!("http://localhost:{}", fs.addr.port())
@@ -292,13 +296,13 @@ async fn retryable_errors() {
292296
{
293297
let count = Arc::new(AtomicUsize::new(0));
294298
let mut fs = fake_server(move || {
295-
dbg!("Making resp");
296299
let prev = count.fetch_add(1, Ordering::Relaxed);
297-
if prev < 3 {
300+
let r = if prev < 3 {
298301
Status::new(code, "bla").into_http()
299302
} else {
300303
make_ok_response(RespondActivityTaskCanceledResponse::default())
301-
}
304+
};
305+
async { r }.boxed()
302306
})
303307
.await;
304308

@@ -335,7 +339,7 @@ async fn namespace_header_attached_to_relevant_calls() {
335339
.add_service(GenericService {
336340
header_to_parse: "Temporal-Namespace",
337341
header_tx,
338-
response_maker: || Response::new(Body::empty()),
342+
response_maker: || async { Response::new(Body::empty()) }.boxed(),
339343
})
340344
.serve_with_incoming_shutdown(
341345
tokio_stream::wrappers::TcpListenerStream::new(listener),

tests/integ_tests/workflow_tests/activities.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use temporal_sdk::{
1111
ActContext, ActExitValue, ActivityError, ActivityOptions, CancellableFuture, WfContext,
1212
WfExitValue, WorkflowResult,
1313
};
14+
use temporal_sdk_core_api::worker::PollerBehavior;
1415
use temporal_sdk_core_protos::{
1516
DEFAULT_ACTIVITY_TYPE, TaskToken,
1617
coresdk::{
@@ -1066,11 +1067,23 @@ async fn activity_can_be_cancelled_by_local_timeout() {
10661067

10671068
#[tokio::test]
10681069
#[ignore] // Runs forever, used to manually attempt to repro spurious activity completion rpc errs
1070+
// Unfortunately there is no way to unit test this as tonic doesn't publicly expose the necessary
1071+
// machinery to construct the right kind of error.
10691072
async fn long_activity_timeout_repro() {
10701073
let wf_name = "long_activity_timeout_repro";
10711074
let mut starter = CoreWfStarter::new(wf_name);
10721075
starter
10731076
.worker_config
1077+
.workflow_task_poller_behavior(PollerBehavior::Autoscaling {
1078+
minimum: 1,
1079+
maximum: 10,
1080+
initial: 5,
1081+
})
1082+
.activity_task_poller_behavior(PollerBehavior::Autoscaling {
1083+
minimum: 1,
1084+
maximum: 10,
1085+
initial: 5,
1086+
})
10741087
.local_timeout_buffer_for_activities(Duration::from_secs(0));
10751088
let mut worker = starter.worker().await;
10761089
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {

0 commit comments

Comments
 (0)