diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index 9b593243ce56c..6a5848131098d 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -23,6 +23,7 @@ use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Context, Poll, Waker, ready}; +use std::time::{Duration, Instant}; use differential_dataflow::containers::{Columnation, TimelyStack}; use futures_util::Stream; @@ -571,11 +572,18 @@ impl OperatorBuilder { B: FnOnce(Vec>) -> L, L: Future + 'static, { + // Threshold for warning about slow operator polls. + // Polls that exceed this duration may indicate operators doing too much + // synchronous work before hitting an await point, which can block the + // timely worker thread and prevent heartbeat tasks from running. + const SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(10); + let operator_waker = self.operator_waker; let mut input_frontiers = self.input_frontiers; let mut input_queues = self.input_queues; let mut output_flushes = self.output_flushes; let mut shutdown_handle = self.shutdown_handle; + let operator_info = self.builder.operator_info(); self.builder.build_reschedule(move |caps| { let mut logic_fut = Some(Box::pin(constructor(caps))); move |new_frontiers| { @@ -616,7 +624,21 @@ impl OperatorBuilder { let waker = futures_util::task::waker_ref(&operator_waker); let mut cx = Context::from_waker(&waker); operator_waker.task_ready.store(false, Ordering::SeqCst); - if Pin::new(fut).poll(&mut cx).is_ready() { + // Track poll duration to identify operators doing too much + // synchronous work before hitting an await point. + let poll_start = Instant::now(); + let poll_result = Pin::new(fut).poll(&mut cx); + let poll_duration = poll_start.elapsed(); + if poll_duration >= SLOW_POLL_THRESHOLD { + tracing::warn!( + operator = ?operator_info.address, + global_id = operator_info.global_id, + poll_ms = poll_duration.as_millis(), + "slow operator poll detected; operator may be doing \ + too much synchronous work before await points" + ); + } + if poll_result.is_ready() { // We're done with logic so deallocate the task logic_fut = None; }