Skip to content

Commit f79f0bf

Browse files
Sergey Pershinfacebook-github-bot
authored andcommitted
fix: Add blockedWaitFor runtime metrics in Driver::closeByTask() (facebookincubator#16711)
Summary: When a query fails and some drivers were blocked we fail to submit the blockedWaitFor runtime metrics. This deprives us from possibly valuable debugging information, such in cases with stuck queries. Reviewed By: vandreykiv, amitkdutta Differential Revision: D96055408
1 parent b423dc1 commit f79f0bf

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

velox/exec/Driver.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ BlockingState::BlockingState(
202202
.count()) {
203203
// Set before leaving the thread.
204204
driver_->state().hasBlockingFuture = true;
205+
driver_->state().blockingStartUs = sinceUs_;
205206
numBlockedDrivers_++;
206207
}
207208

@@ -1003,6 +1004,15 @@ void Driver::close() {
10031004
void Driver::closeByTask() {
10041005
VELOX_CHECK(isOnThread());
10051006
VELOX_CHECK(isTerminated());
1007+
// Record blocked time if the driver was blocked when terminated.
1008+
// This ensures we don't lose blocked time metrics when a query is aborted.
1009+
if (state_.hasBlockingFuture && state_.blockingStartUs != 0 &&
1010+
blockedOperatorId_ < operators_.size()) {
1011+
TestValue::adjust(
1012+
"facebook::velox::exec::Driver::closeByTask::recordBlockingTime", this);
1013+
operators_[blockedOperatorId_]->recordBlockingTime(
1014+
state_.blockingStartUs, blockingReason_);
1015+
}
10061016
closeOperators();
10071017
updateStats();
10081018
closed_ = true;

velox/exec/Driver.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ struct ThreadState {
105105
/// True if there is a future outstanding that will schedule this on an
106106
/// executor thread when some promise is realized.
107107
bool hasBlockingFuture{false};
108+
/// Timestamp in microseconds when the driver became blocked. Used to record
109+
/// blocked time when the driver is terminated while still blocked.
110+
uint64_t blockingStartUs{0};
108111
/// The number of suspension requests on a on-thread driver. If > 0, this
109112
/// driver thread is in a (recursive) section waiting for RPC or memory
110113
/// strategy decision. The thread is not supposed to access its memory, which
@@ -170,6 +173,7 @@ struct ThreadState {
170173
obj["isTerminated"] = isTerminated.load();
171174
obj["isEnqueued"] = isEnqueued.load();
172175
obj["hasBlockingFuture"] = hasBlockingFuture;
176+
obj["blockingStartUs"] = blockingStartUs;
173177
obj["isSuspended"] = suspended();
174178
obj["startExecTime"] = startExecTimeMs;
175179
return obj;

velox/exec/tests/TaskTest.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,4 +3559,87 @@ DEBUG_ONLY_TEST_F(TaskTest, operatorShouldYieldMethod) {
35593559
}
35603560
}
35613561

3562+
// Verifies that blocked wait time is recorded even when an operator is
3563+
// terminated while blocked.
3564+
DEBUG_ONLY_TEST_F(TaskTest, blockedWaitTimeOnAbort) {
3565+
// Test that blocked time is recorded even when a task is aborted while
3566+
// an operator is blocked.
3567+
//
3568+
// We use a simple table scan that blocks waiting for splits. By starting
3569+
// the task without adding splits, the scan operator will block on
3570+
// kWaitForSplit. We then abort the task while blocked and verify that
3571+
// the blocked time is recorded via closeByTask().
3572+
constexpr int kBlockTimeMs = 100;
3573+
3574+
// Track if recordBlockingTime is called in closeByTask via TestValue.
3575+
std::atomic_bool recordBlockingTimeCalled{false};
3576+
SCOPED_TESTVALUE_SET(
3577+
"facebook::velox::exec::Driver::closeByTask::recordBlockingTime",
3578+
std::function<void(Driver*)>(
3579+
[&](Driver* /*driver*/) { recordBlockingTimeCalled = true; }));
3580+
3581+
// Build a simple plan with a table scan.
3582+
core::PlanNodeId scanNodeId;
3583+
auto plan = PlanBuilder()
3584+
.tableScan(ROW({"c0"}, {BIGINT()}))
3585+
.capturePlanNodeId(scanNodeId)
3586+
.planFragment();
3587+
3588+
auto task = Task::create(
3589+
"blockedWaitTimeOnAbort",
3590+
plan,
3591+
0,
3592+
core::QueryCtx::create(driverExecutor_.get()),
3593+
Task::ExecutionMode::kParallel);
3594+
3595+
task->start(1, 1);
3596+
3597+
// Wait for the driver to become blocked waiting for splits.
3598+
auto startTime = std::chrono::steady_clock::now();
3599+
while (BlockingState::numBlockedDrivers() == 0) {
3600+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
3601+
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
3602+
std::chrono::steady_clock::now() - startTime)
3603+
.count();
3604+
if (elapsed > 5) {
3605+
task->requestAbort().wait();
3606+
GTEST_SKIP() << "Operator did not block in time";
3607+
}
3608+
}
3609+
3610+
// Let some time pass while blocked to have measurable blocked time.
3611+
std::this_thread::sleep_for(std::chrono::milliseconds(kBlockTimeMs));
3612+
3613+
// Abort the task while the operator is still blocked waiting for splits.
3614+
task->requestAbort().wait();
3615+
3616+
// Wait for the task to be fully aborted.
3617+
ASSERT_TRUE(waitForTaskAborted(task.get()));
3618+
3619+
// Verify that recordBlockingTime was called in closeByTask.
3620+
// This is the key assertion that validates the fix.
3621+
EXPECT_TRUE(recordBlockingTimeCalled)
3622+
<< "recordBlockingTime should be called in closeByTask when "
3623+
<< "driver is terminated while blocked";
3624+
3625+
// Verify that blocked wait time was recorded despite the abort.
3626+
const auto stats = task->taskStats().pipelineStats;
3627+
ASSERT_FALSE(stats.empty());
3628+
ASSERT_FALSE(stats[0].operatorStats.empty());
3629+
3630+
// Find operator stats with blocked time recorded.
3631+
bool foundBlockedTime = false;
3632+
for (const auto& opStats : stats[0].operatorStats) {
3633+
if (opStats.blockedWallNanos > 0) {
3634+
foundBlockedTime = true;
3635+
// Verify the blocked time is at least what we waited (with tolerance).
3636+
EXPECT_GE(opStats.blockedWallNanos, (kBlockTimeMs - 30) * 1'000'000)
3637+
<< "Blocked time should be at least " << (kBlockTimeMs - 30) << "ms";
3638+
break;
3639+
}
3640+
}
3641+
EXPECT_TRUE(foundBlockedTime)
3642+
<< "Operator should have recorded blocked time despite task abort";
3643+
}
3644+
35623645
} // namespace facebook::velox::exec::test

0 commit comments

Comments
 (0)