Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,16 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
AFL_ENSURE(stats.GetTasks().size() == 1);
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
AFL_ENSURE(taskStats.GetTaskId() == taskId);

// Extract lock stats from task extra stats (populated by read actors for broken locks)
if (taskStats.HasExtra()) {
NKqpProto::TKqpTaskExtraStats extraStats;
if (taskStats.GetExtra().UnpackTo(&extraStats)) {
LocksBrokenAsBreaker += extraStats.GetLockStats().GetBrokenAsBreaker();
LocksBrokenAsVictim += extraStats.GetLockStats().GetBrokenAsVictim();
}
}

auto stageId = TasksGraph->GetTask(taskId).StageId;
auto [it, inserted] = StageStats.try_emplace(stageId);
if (inserted) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4389,6 +4389,10 @@ class TKqpBufferWriteActor : public TActorBootstrapped<TKqpBufferWriteActor>, pu
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
if (ev->Get()->Record.HasTxStats()) {
LocksBrokenAsBreaker += ev->Get()->Record.GetTxStats().GetLocksBrokenAsBreaker();
LocksBrokenAsVictim += ev->Get()->Record.GetTxStats().GetLocksBrokenAsVictim();
}
TxManager->BreakLock(ev->Get()->Record.GetOrigin());
YQL_ENSURE(TxManager->BrokenLocks());
TxManager->SetError(ev->Get()->Record.GetOrigin());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/sys_view/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ YQL_LAST_ABI_VERSION()
SRCS(
ut_auth.cpp
ut_kqp.cpp
ut_tli.cpp
ut_common.cpp
ut_counters.cpp
ut_labeled.cpp
Expand Down
128 changes: 0 additions & 128 deletions ydb/core/sys_view/ut_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4344,134 +4344,6 @@ R"(CREATE TABLE `test_show_create` (
[[0u]];
])", ysonString);
}

Y_UNIT_TEST_TWIN(QueryMetricsLocksBroken, UseSink) {
TTestEnvSettings settings;
settings.EnableSVP = true;
settings.TableServiceConfig.SetEnableOltpSink(UseSink);
TTestEnv env(1, 2, settings);
CreateTenant(env, "Tenant1", true);

auto driverConfig = TDriverConfig()
.SetEndpoint(env.GetEndpoint())
.SetDiscoveryMode(EDiscoveryMode::Off)
.SetDatabase("/Root/Tenant1");
auto driver = TDriver(driverConfig);

TTableClient client(driver);
auto session = client.CreateSession().GetValueSync().GetSession();
auto victimSession = client.CreateSession().GetValueSync().GetSession();

// Create table and insert initial data
NKqp::AssertSuccessResult(session.ExecuteSchemeQuery(R"(
CREATE TABLE `/Root/Tenant1/TableLocks` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").GetValueSync());

NKqp::AssertSuccessResult(session.ExecuteDataQuery(
"UPSERT INTO `/Root/Tenant1/TableLocks` (Key, Value) VALUES (1u, \"Initial\")",
TTxControl::BeginTx().CommitTx()
).GetValueSync());

// Establish locks by reading in a transaction (victim)
std::optional<TTransaction> victimTx;
while (!victimTx) {
auto result = victimSession.ExecuteDataQuery(
"SELECT * FROM `/Root/Tenant1/TableLocks` WHERE Key = 1u /* victim-query */",
TTxControl::BeginTx()
).ExtractValueSync();
UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());

TString yson = FormatResultSetYson(result.GetResultSet(0));
if (yson == "[]") {
continue; // Data not visible yet, retry
}

victimTx = result.GetTransaction();
UNIT_ASSERT(victimTx);
}

// Breaker transaction: writes to key 1, breaking victim's read lock
NKqp::AssertSuccessResult(session.ExecuteDataQuery(
"UPSERT INTO `/Root/Tenant1/TableLocks` (Key, Value) VALUES (1u, \"BreakerValue\") /* lock-breaker */",
TTxControl::BeginTx().CommitTx()
).GetValueSync());

// Victim tries to commit with write to the same key
// This triggers lock validation, which fails because the lock on key 1 was broken
auto commitResult = victimSession.ExecuteDataQuery(
"UPSERT INTO `/Root/Tenant1/TableLocks` (Key, Value) VALUES (1u, \"VictimValue\") /* victim-commit */",
TTxControl::Tx(*victimTx).CommitTx()
).ExtractValueSync();

// Victim should be ABORTED because its locks were broken
UNIT_ASSERT_VALUES_EQUAL(commitResult.GetStatus(), EStatus::ABORTED);

// Wait for stats to be collected and check both LocksBrokenAsBreaker and LocksBrokenAsVictim
ui64 locksBrokenAsBreaker = 0;
ui64 locksBrokenAsVictim = 0;
bool foundBreaker = false;
bool foundVictim = false;

for (size_t iter = 0; iter < 30 && (!foundBreaker || !foundVictim); ++iter) {
// Query both breaker and victim metrics in one pass
auto it = client.StreamExecuteScanQuery(R"(
SELECT QueryText, LocksBrokenAsBreaker, LocksBrokenAsVictim
FROM `/Root/Tenant1/.sys/query_metrics_one_minute`
WHERE QueryText LIKE '%lock-breaker%' OR QueryText LIKE '%victim-commit%';
)").GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString ysonString = NKqp::StreamResultToYson(it);
Cerr << "Query metrics result: " << ysonString << Endl;

auto node = NYT::NodeFromYsonString(ysonString, ::NYson::EYsonType::Node);
UNIT_ASSERT(node.IsList());

for (const auto& row : node.AsList()) {
if (!row.IsList() || row.AsList().size() < 3) continue;

auto getStringValue = [](const NYT::TNode& n) -> TString {
if (n.IsList() && !n.AsList().empty()) {
return n.AsList()[0].AsString();
}
return n.AsString();
};
auto getUint64Value = [](const NYT::TNode& n) -> ui64 {
if (n.IsList() && !n.AsList().empty()) {
return n.AsList()[0].AsUint64();
}
return n.AsUint64();
};

TString queryText = getStringValue(row.AsList()[0]);
ui64 breaker = getUint64Value(row.AsList()[1]);
ui64 victim = getUint64Value(row.AsList()[2]);

if (queryText.Contains("lock-breaker") && !queryText.Contains("query_metrics")) {
locksBrokenAsBreaker = breaker;
foundBreaker = true;
}
if (queryText.Contains("victim-commit") && !queryText.Contains("query_metrics")) {
locksBrokenAsVictim = victim;
foundVictim = true;
}
}

if (!foundBreaker || !foundVictim) {
Sleep(TDuration::Seconds(5));
}
}

UNIT_ASSERT_C(foundBreaker, "Breaker not found in metrics");
UNIT_ASSERT_C(foundVictim, "Victim not found in metrics");

UNIT_ASSERT_VALUES_EQUAL(locksBrokenAsBreaker, 1u);
UNIT_ASSERT_VALUES_EQUAL(locksBrokenAsVictim, 1u);
}
}

Y_UNIT_TEST_SUITE(ShowCreateView) {
Expand Down
Loading
Loading