@@ -3559,4 +3559,87 @@ DEBUG_ONLY_TEST_F(TaskTest, operatorShouldYieldMethod) {
35593559 }
35603560}
35613561
3562+ // Verifies that blocked wait time is recorded even when an operator is
3563+ // terminated while blocked.
3564+ DEBUG_ONLY_TEST_F (TaskTest, blockedWaitTimeOnAbort) {
3565+ // Test that blocked time is recorded even when a task is aborted while
3566+ // an operator is blocked.
3567+ //
3568+ // We use a simple table scan that blocks waiting for splits. By starting
3569+ // the task without adding splits, the scan operator will block on
3570+ // kWaitForSplit. We then abort the task while blocked and verify that
3571+ // the blocked time is recorded via closeByTask().
3572+ constexpr int kBlockTimeMs = 100 ;
3573+
3574+ // Track if recordBlockingTime is called in closeByTask via TestValue.
3575+ std::atomic_bool recordBlockingTimeCalled{false };
3576+ SCOPED_TESTVALUE_SET (
3577+ " facebook::velox::exec::Driver::closeByTask::recordBlockingTime" ,
3578+ std::function<void (Driver*)>(
3579+ [&](Driver* /* driver*/ ) { recordBlockingTimeCalled = true ; }));
3580+
3581+ // Build a simple plan with a table scan.
3582+ core::PlanNodeId scanNodeId;
3583+ auto plan = PlanBuilder ()
3584+ .tableScan (ROW ({" c0" }, {BIGINT ()}))
3585+ .capturePlanNodeId (scanNodeId)
3586+ .planFragment ();
3587+
3588+ auto task = Task::create (
3589+ " blockedWaitTimeOnAbort" ,
3590+ plan,
3591+ 0 ,
3592+ core::QueryCtx::create (driverExecutor_.get ()),
3593+ Task::ExecutionMode::kParallel );
3594+
3595+ task->start (1 , 1 );
3596+
3597+ // Wait for the driver to become blocked waiting for splits.
3598+ auto startTime = std::chrono::steady_clock::now ();
3599+ while (BlockingState::numBlockedDrivers () == 0 ) {
3600+ std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
3601+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
3602+ std::chrono::steady_clock::now () - startTime)
3603+ .count ();
3604+ if (elapsed > 5 ) {
3605+ task->requestAbort ().wait ();
3606+ GTEST_SKIP () << " Operator did not block in time" ;
3607+ }
3608+ }
3609+
3610+ // Let some time pass while blocked to have measurable blocked time.
3611+ std::this_thread::sleep_for (std::chrono::milliseconds (kBlockTimeMs ));
3612+
3613+ // Abort the task while the operator is still blocked waiting for splits.
3614+ task->requestAbort ().wait ();
3615+
3616+ // Wait for the task to be fully aborted.
3617+ ASSERT_TRUE (waitForTaskAborted (task.get ()));
3618+
3619+ // Verify that recordBlockingTime was called in closeByTask.
3620+ // This is the key assertion that validates the fix.
3621+ EXPECT_TRUE (recordBlockingTimeCalled)
3622+ << " recordBlockingTime should be called in closeByTask when "
3623+ << " driver is terminated while blocked" ;
3624+
3625+ // Verify that blocked wait time was recorded despite the abort.
3626+ const auto stats = task->taskStats ().pipelineStats ;
3627+ ASSERT_FALSE (stats.empty ());
3628+ ASSERT_FALSE (stats[0 ].operatorStats .empty ());
3629+
3630+ // Find operator stats with blocked time recorded.
3631+ bool foundBlockedTime = false ;
3632+ for (const auto & opStats : stats[0 ].operatorStats ) {
3633+ if (opStats.blockedWallNanos > 0 ) {
3634+ foundBlockedTime = true ;
3635+ // Verify the blocked time is at least what we waited (with tolerance).
3636+ EXPECT_GE (opStats.blockedWallNanos , (kBlockTimeMs - 30 ) * 1'000'000 )
3637+ << " Blocked time should be at least " << (kBlockTimeMs - 30 ) << " ms" ;
3638+ break ;
3639+ }
3640+ }
3641+ EXPECT_TRUE (foundBlockedTime)
3642+ << " Operator should have recorded blocked time despite task abort" ;
3643+ }
3644+
35623645} // namespace facebook::velox::exec::test
0 commit comments