Skip to content

Commit e84a676

Browse files
Sergey Pershinmeta-codesync[bot]
authored andcommitted
fix: Add blockedWaitFor runtime metrics in Driver::closeByTask() (#16711)
Summary: Pull Request resolved: #16711 When a query fails and some drivers were blocked we fail to submit the blockedWaitFor runtime metrics. This deprives us from possibly valuable debugging information, such in cases with stuck queries. Reviewed By: xiaoxmeng, vandreykiv, amitkdutta Differential Revision: D96055408 fbshipit-source-id: 1eb06adfede26857a580f22f400b5bca9dc6c6fa
1 parent 87c8f7e commit e84a676

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

velox/exec/Driver.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ BlockingState::BlockingState(
202202
.count()) {
203203
// Set before leaving the thread.
204204
driver_->state().hasBlockingFuture = true;
205+
driver_->state().blockingStartUs = sinceUs_;
205206
numBlockedDrivers_++;
206207
}
207208

@@ -897,6 +898,15 @@ void Driver::updateStats() {
897898
task()->addDriverStats(ctx_->pipelineId, std::move(stats));
898899
}
899900

901+
void Driver::updateOperatorBlockingStats() {
902+
// Record blocked time if the driver was blocked when terminated.
903+
// This ensures we don't lose blocked time metrics when a query is aborted.
904+
if (state_.hasBlockingFuture && blockedOperatorId_ < operators_.size()) {
905+
operators_[blockedOperatorId_]->recordBlockingTime(
906+
state_.blockingStartUs, blockingReason_);
907+
}
908+
}
909+
900910
void Driver::startBarrier() {
901911
VELOX_CHECK(ctx_->task->underBarrier());
902912
VELOX_CHECK(
@@ -1003,6 +1013,7 @@ void Driver::close() {
10031013
void Driver::closeByTask() {
10041014
VELOX_CHECK(isOnThread());
10051015
VELOX_CHECK(isTerminated());
1016+
updateOperatorBlockingStats();
10061017
closeOperators();
10071018
updateStats();
10081019
closed_ = true;

velox/exec/Driver.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ struct ThreadState {
105105
/// True if there is a future outstanding that will schedule this on an
106106
/// executor thread when some promise is realized.
107107
bool hasBlockingFuture{false};
108+
/// Timestamp in microseconds when the driver became blocked. Used to record
109+
/// blocked time when the driver is terminated while still blocked.
110+
uint64_t blockingStartUs{0};
108111
/// The number of suspension requests on a on-thread driver. If > 0, this
109112
/// driver thread is in a (recursive) section waiting for RPC or memory
110113
/// strategy decision. The thread is not supposed to access its memory, which
@@ -170,6 +173,7 @@ struct ThreadState {
170173
obj["isTerminated"] = isTerminated.load();
171174
obj["isEnqueued"] = isEnqueued.load();
172175
obj["hasBlockingFuture"] = hasBlockingFuture;
176+
obj["blockingStartUs"] = blockingStartUs;
173177
obj["isSuspended"] = suspended();
174178
obj["startExecTime"] = startExecTimeMs;
175179
return obj;
@@ -595,6 +599,10 @@ class Driver : public std::enable_shared_from_this<Driver> {
595599

596600
void updateStats();
597601

602+
/// Records operator blocked time ins case the driver was off the thrread and
603+
/// blocked when terminated.
604+
void updateOperatorBlockingStats();
605+
598606
// Defines the driver barrier processing state.
599607
struct BarrierState {
600608
// True if the driver is under barrier processing. This is set by

velox/exec/tests/TaskTest.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,4 +3559,74 @@ DEBUG_ONLY_TEST_F(TaskTest, operatorShouldYieldMethod) {
35593559
}
35603560
}
35613561

3562+
// Verifies that blocked wait time is recorded even when an operator is
3563+
// terminated while blocked.
3564+
DEBUG_ONLY_TEST_F(TaskTest, blockedWaitTimeOnAbort) {
3565+
// Test that blocked time is recorded even when a task is aborted while
3566+
// an operator is blocked.
3567+
//
3568+
// We use a simple table scan that blocks waiting for splits. By starting
3569+
// the task without adding splits, the scan operator will block on
3570+
// kWaitForSplit. We then abort the task while blocked and verify that
3571+
// the blocked time is recorded via closeByTask().
3572+
constexpr int kBlockTimeMs = 100;
3573+
3574+
// Build a simple plan with a table scan.
3575+
core::PlanNodeId scanNodeId;
3576+
auto plan = PlanBuilder()
3577+
.tableScan(ROW({"c0"}, {BIGINT()}))
3578+
.capturePlanNodeId(scanNodeId)
3579+
.planFragment();
3580+
3581+
auto task = Task::create(
3582+
"blockedWaitTimeOnAbort",
3583+
plan,
3584+
0,
3585+
core::QueryCtx::create(driverExecutor_.get()),
3586+
Task::ExecutionMode::kParallel);
3587+
3588+
task->start(1, 1);
3589+
3590+
// Wait for the driver to become blocked waiting for splits.
3591+
auto startTime = std::chrono::steady_clock::now();
3592+
while (BlockingState::numBlockedDrivers() == 0) {
3593+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
3594+
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
3595+
std::chrono::steady_clock::now() - startTime)
3596+
.count();
3597+
if (elapsed > 5) {
3598+
task->requestAbort().wait();
3599+
GTEST_SKIP() << "Operator did not block in time";
3600+
}
3601+
}
3602+
3603+
// Let some time pass while blocked to have measurable blocked time.
3604+
std::this_thread::sleep_for(std::chrono::milliseconds(kBlockTimeMs));
3605+
3606+
// Abort the task while the operator is still blocked waiting for splits.
3607+
task->requestAbort().wait();
3608+
3609+
// Wait for the task to be fully aborted.
3610+
ASSERT_TRUE(waitForTaskAborted(task.get()));
3611+
3612+
// Verify that blocked wait time was recorded despite the abort.
3613+
const auto stats = task->taskStats().pipelineStats;
3614+
ASSERT_FALSE(stats.empty());
3615+
ASSERT_FALSE(stats[0].operatorStats.empty());
3616+
3617+
// Find operator stats with blocked time recorded.
3618+
bool foundBlockedTime = false;
3619+
for (const auto& opStats : stats[0].operatorStats) {
3620+
if (opStats.blockedWallNanos > 0) {
3621+
foundBlockedTime = true;
3622+
// Verify the blocked time is at least what we waited (with tolerance).
3623+
EXPECT_GE(opStats.blockedWallNanos, (kBlockTimeMs - 30) * 1'000'000)
3624+
<< "Blocked time should be at least " << (kBlockTimeMs - 30) << "ms";
3625+
break;
3626+
}
3627+
}
3628+
EXPECT_TRUE(foundBlockedTime)
3629+
<< "Operator should have recorded blocked time despite task abort";
3630+
}
3631+
35623632
} // namespace facebook::velox::exec::test

0 commit comments

Comments
 (0)