Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/timely-util/src/builder_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker, ready};
use std::time::{Duration, Instant};

use differential_dataflow::containers::{Columnation, TimelyStack};
use futures_util::Stream;
Expand Down Expand Up @@ -571,11 +572,18 @@ impl<G: Scope> OperatorBuilder<G> {
B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
L: Future + 'static,
{
// Threshold for warning about slow operator polls.
// Polls that exceed this duration may indicate operators doing too much
// synchronous work before hitting an await point, which can block the
// timely worker thread and prevent heartbeat tasks from running.
const SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(10);

let operator_waker = self.operator_waker;
let mut input_frontiers = self.input_frontiers;
let mut input_queues = self.input_queues;
let mut output_flushes = self.output_flushes;
let mut shutdown_handle = self.shutdown_handle;
let operator_info = self.builder.operator_info();
self.builder.build_reschedule(move |caps| {
let mut logic_fut = Some(Box::pin(constructor(caps)));
move |new_frontiers| {
Expand Down Expand Up @@ -616,7 +624,21 @@ impl<G: Scope> OperatorBuilder<G> {
let waker = futures_util::task::waker_ref(&operator_waker);
let mut cx = Context::from_waker(&waker);
operator_waker.task_ready.store(false, Ordering::SeqCst);
if Pin::new(fut).poll(&mut cx).is_ready() {
// Track poll duration to identify operators doing too much
// synchronous work before hitting an await point.
let poll_start = Instant::now();
let poll_result = Pin::new(fut).poll(&mut cx);
let poll_duration = poll_start.elapsed();
if poll_duration >= SLOW_POLL_THRESHOLD {
tracing::warn!(
operator = ?operator_info.address,
global_id = operator_info.global_id,
poll_ms = poll_duration.as_millis(),
"slow operator poll detected; operator may be doing \
too much synchronous work before await points"
);
}
if poll_result.is_ready() {
// We're done with logic so deallocate the task
logic_fut = None;
}
Expand Down