From 38064765b797034cf60df389704989c1525aa4b4 Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Wed, 14 Jan 2026 07:17:49 -0500 Subject: [PATCH] timely-util: add instrumentation for slow operator polls Async operators running in the timely context can block the worker thread if they do significant synchronous work before hitting an await point. This can prevent heartbeat tasks from running and cause persist reader lease expirations. This change adds instrumentation to detect slow polls: - Track the duration of each future poll - Log a warning when a poll exceeds 10ms threshold - Include operator address and global_id in the warning This provides visibility into problematic operators while the more invasive architectural fix (channel-based communication with tokio tasks) is planned for future work. Co-Authored-By: Claude Opus 4.5 --- src/timely-util/src/builder_async.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index 9b593243ce56c..6a5848131098d 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -23,6 +23,7 @@ use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Context, Poll, Waker, ready}; +use std::time::{Duration, Instant}; use differential_dataflow::containers::{Columnation, TimelyStack}; use futures_util::Stream; @@ -571,11 +572,18 @@ impl OperatorBuilder { B: FnOnce(Vec>) -> L, L: Future + 'static, { + // Threshold for warning about slow operator polls. + // Polls that exceed this duration may indicate operators doing too much + // synchronous work before hitting an await point, which can block the + // timely worker thread and prevent heartbeat tasks from running. + const SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(10); + let operator_waker = self.operator_waker; let mut input_frontiers = self.input_frontiers; let mut input_queues = self.input_queues; let mut output_flushes = self.output_flushes; let mut shutdown_handle = self.shutdown_handle; + let operator_info = self.builder.operator_info(); self.builder.build_reschedule(move |caps| { let mut logic_fut = Some(Box::pin(constructor(caps))); move |new_frontiers| { @@ -616,7 +624,21 @@ impl OperatorBuilder { let waker = futures_util::task::waker_ref(&operator_waker); let mut cx = Context::from_waker(&waker); operator_waker.task_ready.store(false, Ordering::SeqCst); - if Pin::new(fut).poll(&mut cx).is_ready() { + // Track poll duration to identify operators doing too much + // synchronous work before hitting an await point. + let poll_start = Instant::now(); + let poll_result = Pin::new(fut).poll(&mut cx); + let poll_duration = poll_start.elapsed(); + if poll_duration >= SLOW_POLL_THRESHOLD { + tracing::warn!( + operator = ?operator_info.address, + global_id = operator_info.global_id, + poll_ms = poll_duration.as_millis(), + "slow operator poll detected; operator may be doing \ + too much synchronous work before await points" + ); + } + if poll_result.is_ready() { // We're done with logic so deallocate the task logic_fut = None; }