Skip to content

Commit b203ac9

Browse files
Sergey Pershinfacebook-github-bot
authored andcommitted
fix: Add blockedWaitFor runtime metrics in Driver::closeByTask()
Summary: When a query fails and some drivers were blocked we fail to submit the blockedWaitFor runtime metrics. This deprives us from possibly valuable debugging information, such in cases with stuck queries. Differential Revision: D96055408
1 parent 1038972 commit b203ac9

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

velox/exec/Driver.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ BlockingState::BlockingState(
202202
.count()) {
203203
// Set before leaving the thread.
204204
driver_->state().hasBlockingFuture = true;
205+
driver_->state().blockingStartUs = sinceUs_;
205206
numBlockedDrivers_++;
206207
}
207208

@@ -994,6 +995,9 @@ void Driver::close() {
994995
if (!isOnThread() && !isTerminated()) {
995996
LOG(FATAL) << "Driver::close is only allowed from the Driver's thread";
996997
}
998+
// A driver that completes normally should not have a blocking future.
999+
// If it does, it indicates a bug in the driver lifecycle.
1000+
VELOX_CHECK(!state_.hasBlockingFuture);
9971001
closeOperators();
9981002
updateStats();
9991003
closed_ = true;
@@ -1003,6 +1007,15 @@ void Driver::close() {
10031007
void Driver::closeByTask() {
10041008
VELOX_CHECK(isOnThread());
10051009
VELOX_CHECK(isTerminated());
1010+
// Record blocked time if the driver was blocked when terminated.
1011+
// This ensures we don't lose blocked time metrics when a query is aborted.
1012+
if (state_.hasBlockingFuture && state_.blockingStartUs != 0 &&
1013+
blockedOperatorId_ < operators_.size()) {
1014+
TestValue::adjust(
1015+
"facebook::velox::exec::Driver::closeByTask::recordBlockingTime", this);
1016+
operators_[blockedOperatorId_]->recordBlockingTime(
1017+
state_.blockingStartUs, blockingReason_);
1018+
}
10061019
closeOperators();
10071020
updateStats();
10081021
closed_ = true;

velox/exec/Driver.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ struct ThreadState {
105105
/// True if there is a future outstanding that will schedule this on an
106106
/// executor thread when some promise is realized.
107107
bool hasBlockingFuture{false};
108+
/// Timestamp in microseconds when the driver became blocked. Used to record
109+
/// blocked time when the driver is terminated while still blocked.
110+
uint64_t blockingStartUs{0};
108111
/// The number of suspension requests on a on-thread driver. If > 0, this
109112
/// driver thread is in a (recursive) section waiting for RPC or memory
110113
/// strategy decision. The thread is not supposed to access its memory, which
@@ -170,6 +173,7 @@ struct ThreadState {
170173
obj["isTerminated"] = isTerminated.load();
171174
obj["isEnqueued"] = isEnqueued.load();
172175
obj["hasBlockingFuture"] = hasBlockingFuture;
176+
obj["blockingStartUs"] = blockingStartUs;
173177
obj["isSuspended"] = suspended();
174178
obj["startExecTime"] = startExecTimeMs;
175179
return obj;

velox/exec/tests/TaskTest.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,4 +3559,87 @@ DEBUG_ONLY_TEST_F(TaskTest, operatorShouldYieldMethod) {
35593559
}
35603560
}
35613561

3562+
// Verifies that blocked wait time is recorded even when an operator is
3563+
// terminated while blocked.
3564+
DEBUG_ONLY_TEST_F(TaskTest, blockedWaitTimeOnAbort) {
3565+
// Test that blocked time is recorded even when a task is aborted while
3566+
// an operator is blocked.
3567+
//
3568+
// We use a simple table scan that blocks waiting for splits. By starting
3569+
// the task without adding splits, the scan operator will block on
3570+
// kWaitForSplit. We then abort the task while blocked and verify that
3571+
// the blocked time is recorded via closeByTask().
3572+
constexpr int kBlockTimeMs = 100;
3573+
3574+
// Track if recordBlockingTime is called in closeByTask via TestValue.
3575+
std::atomic_bool recordBlockingTimeCalled{false};
3576+
SCOPED_TESTVALUE_SET(
3577+
"facebook::velox::exec::Driver::closeByTask::recordBlockingTime",
3578+
std::function<void(Driver*)>(
3579+
[&](Driver* /*driver*/) { recordBlockingTimeCalled = true; }));
3580+
3581+
// Build a simple plan with a table scan.
3582+
core::PlanNodeId scanNodeId;
3583+
auto plan = PlanBuilder()
3584+
.tableScan(ROW({"c0"}, {BIGINT()}))
3585+
.capturePlanNodeId(scanNodeId)
3586+
.planFragment();
3587+
3588+
auto task = Task::create(
3589+
"blockedWaitTimeOnAbort",
3590+
plan,
3591+
0,
3592+
core::QueryCtx::create(driverExecutor_.get()),
3593+
Task::ExecutionMode::kParallel);
3594+
3595+
task->start(1, 1);
3596+
3597+
// Wait for the driver to become blocked waiting for splits.
3598+
auto startTime = std::chrono::steady_clock::now();
3599+
while (BlockingState::numBlockedDrivers() == 0) {
3600+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
3601+
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
3602+
std::chrono::steady_clock::now() - startTime)
3603+
.count();
3604+
if (elapsed > 5) {
3605+
task->requestAbort().wait();
3606+
GTEST_SKIP() << "Operator did not block in time";
3607+
}
3608+
}
3609+
3610+
// Let some time pass while blocked to have measurable blocked time.
3611+
std::this_thread::sleep_for(std::chrono::milliseconds(kBlockTimeMs));
3612+
3613+
// Abort the task while the operator is still blocked waiting for splits.
3614+
task->requestAbort().wait();
3615+
3616+
// Wait for the task to be fully aborted.
3617+
ASSERT_TRUE(waitForTaskAborted(task.get()));
3618+
3619+
// Verify that recordBlockingTime was called in closeByTask.
3620+
// This is the key assertion that validates the fix.
3621+
EXPECT_TRUE(recordBlockingTimeCalled)
3622+
<< "recordBlockingTime should be called in closeByTask when "
3623+
<< "driver is terminated while blocked";
3624+
3625+
// Verify that blocked wait time was recorded despite the abort.
3626+
const auto stats = task->taskStats().pipelineStats;
3627+
ASSERT_FALSE(stats.empty());
3628+
ASSERT_FALSE(stats[0].operatorStats.empty());
3629+
3630+
// Find operator stats with blocked time recorded.
3631+
bool foundBlockedTime = false;
3632+
for (const auto& opStats : stats[0].operatorStats) {
3633+
if (opStats.blockedWallNanos > 0) {
3634+
foundBlockedTime = true;
3635+
// Verify the blocked time is at least what we waited (with tolerance).
3636+
EXPECT_GE(opStats.blockedWallNanos, (kBlockTimeMs - 30) * 1'000'000)
3637+
<< "Blocked time should be at least " << (kBlockTimeMs - 30) << "ms";
3638+
break;
3639+
}
3640+
}
3641+
EXPECT_TRUE(foundBlockedTime)
3642+
<< "Operator should have recorded blocked time despite task abort";
3643+
}
3644+
35623645
} // namespace facebook::velox::exec::test

0 commit comments

Comments
 (0)