Skip to content

Commit 5a9fcfc

Browse files
authored
Move LWTRACK before co_await to prevent ResponseEv race (#26347)
2 parents 2f95c2b + 85dde8c commit 5a9fcfc

File tree

3 files changed

+71
-80
lines changed

3 files changed

+71
-80
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
606606
const auto& databaseId = GetUserRequestContext()->DatabaseId;
607607
const auto& poolId = GetUserRequestContext()->PoolId.empty() ? NResourcePool::DEFAULT_POOL_ID : GetUserRequestContext()->PoolId;
608608

609+
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
610+
609611
if (!databaseId.empty() && (poolId != NResourcePool::DEFAULT_POOL_ID || AccountDefaultPoolInScheduler)) {
610612
const auto schedulerServiceId = MakeKqpSchedulerServiceId(SelfId().NodeId());
611613

@@ -628,6 +630,10 @@ class TKqpExecuterBase : public TActor<TDerived> {
628630
Query = (co_await ActorWaitForEvent<NScheduler::TEvQueryResponse>(TxId))->Get()->Query; // TODO: Y_DEFER
629631
}
630632

633+
if (!ResponseEv) {
634+
co_return;
635+
}
636+
631637
auto lockTxId = Request.AcquireLocksTxId;
632638
if (lockTxId.Defined() && *lockTxId == 0) {
633639
lockTxId = TxId;
@@ -645,7 +651,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
645651
break;
646652
}
647653

648-
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
649654
if (IsDebugLogEnabled()) {
650655
for (auto& tx : Request.Transactions) {
651656
LOG_D("Executing physical tx, type: " << (ui32) tx.Body->GetType() << ", stages: " << tx.Body->StagesSize());
Lines changed: 64 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,77 @@
1-
#include <ydb/core/kqp/common/kqp_yql.h>
21
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
3-
#include <ydb/core/kqp/host/kqp_host.h>
4-
#include <ydb/core/ydb_convert/ydb_convert.h>
5-
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
62

7-
#include <ydb/library/yql/dq/common/dq_value.h>
8-
#include <yql/essentials/core/services/mounts/yql_mounts.h>
3+
#include <ydb/core/kqp/common/events/events.h>
4+
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
5+
#include <ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.h>
96

10-
#include <library/cpp/protobuf/util/pb_io.h>
11-
#include <ydb/core/protos/config.pb.h>
7+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/status_codes.h>
128

13-
namespace NKikimr {
14-
namespace NKqp {
9+
namespace NKikimr::NKqp {
1510

16-
using namespace NYql;
17-
using namespace NYql::NNodes;
11+
using namespace NYdb;
1812
using namespace NYdb::NTable;
1913

20-
namespace {
21-
22-
[[maybe_unused]]
23-
NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway> gateway,
24-
const TKikimrConfiguration::TPtr& config, NActors::TActorId* actorSystem)
25-
{
26-
auto cluster = TString(DefaultKikimrClusterName);
27-
28-
TExprContext moduleCtx;
29-
IModuleResolver::TPtr moduleResolver;
30-
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));
31-
32-
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, NKikimrConfig::TQueryServiceConfig(), Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem, nullptr);
33-
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
34-
result.Issues().PrintTo(Cerr);
35-
UNIT_ASSERT(result.Success());
36-
37-
auto& phyQuery = result.PreparedQuery.GetPhysicalQuery();
38-
UNIT_ASSERT(phyQuery.TransactionsSize() == 1);
39-
return phyQuery.GetTransactions(0);
40-
}
41-
42-
[[maybe_unused]]
43-
TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) {
44-
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem();
45-
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", "/Root", TKqpGatewaySettings(),
46-
actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0),
47-
TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig->GetQueryServiceConfig());
48-
}
49-
50-
[[maybe_unused]]
51-
TKikimrParamsMap GetParamsMap(const NYdb::TParams& params) {
52-
TKikimrParamsMap paramsMap;
53-
54-
auto paramValues = params.GetValues();
55-
for (auto& pair : paramValues) {
56-
Ydb::TypedValue protoParam;
57-
protoParam.mutable_type()->CopyFrom(NYdb::TProtoAccessor::GetProto(pair.second.GetType()));
58-
protoParam.mutable_value()->CopyFrom(NYdb::TProtoAccessor::GetProto(pair.second));
59-
60-
NKikimrMiniKQL::TParams mkqlParam;
61-
ConvertYdbTypeToMiniKQLType(protoParam.type(), *mkqlParam.MutableType());
62-
ConvertYdbValueToMiniKQLValue(protoParam.type(), protoParam.value(), *mkqlParam.MutableValue());
63-
64-
paramsMap.insert(std::make_pair(pair.first, mkqlParam));
65-
}
66-
67-
return paramsMap;
68-
}
69-
70-
[[maybe_unused]]
71-
TKqpParamsRefMap GetParamRefsMap(const TKikimrParamsMap& paramsMap) {
72-
TKqpParamsRefMap refsMap;
14+
Y_UNIT_TEST_SUITE(KqpExecuter) {
7315

74-
for (auto& pair : paramsMap) {
75-
refsMap.emplace(std::make_pair(pair.first, NDq::TMkqlValueRef(pair.second)));
16+
/* Scenario:
17+
- Start query execution and receive TEvTxRequest.
18+
- When sending TEvAddQuery from executer to scheduler, immediately receive TEvAbortExecution.
19+
- Imitate receiving TEvQueryResponse before receiving self TEvPoison by executer.
20+
- Do not crash or get undefined behavior.
21+
*/
22+
Y_UNIT_TEST(TestSuddenAbortAfterReady) {
23+
TKikimrSettings settings = TKikimrSettings().SetUseRealThreads(false);
24+
settings.AppConfig.MutableTableServiceConfig()->MutableComputeSchedulerSettings()->SetAccountDefaultPool(true);
25+
26+
TKikimrRunner kikimr(settings);
27+
auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
28+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );
29+
30+
auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
31+
SELECT COUNT(*) FROM `/Root/TwoShard`;
32+
)")).GetValueSync();
33+
});
34+
UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
35+
auto dataQuery = prepareResult.GetQuery();
36+
37+
TActorId executerId, targetId;
38+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
39+
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
40+
{
41+
TStringStream ss;
42+
ss << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl;
43+
Cerr << ss.Str();
44+
}
45+
46+
if (ev->GetTypeRewrite() == TEvKqpExecuter::TEvTxRequest::EventType) {
47+
targetId = ActorIdFromProto(ev->Get<TEvKqpExecuter::TEvTxRequest>()->Record.GetTarget());
48+
}
49+
50+
if (ev->GetTypeRewrite() == NScheduler::TEvAddQuery::EventType) {
51+
executerId = ev->Sender;
52+
auto* abortExecution = new TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues());
53+
runtime.Send(new IEventHandle(ev->Sender, targetId, abortExecution));
54+
}
55+
56+
if (ev->GetTypeRewrite() == NActors::TEvents::TEvPoison::EventType && ev->Sender == executerId && ev->Recipient == executerId) {
57+
return TTestActorRuntime::EEventAction::DROP;
58+
}
59+
60+
return TTestActorRuntime::EEventAction::PROCESS;
61+
});
62+
63+
auto future = kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), TExecDataQuerySettings()).GetValueSync(); });
64+
65+
TDispatchOptions opts;
66+
opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
67+
return ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType;
68+
});
69+
runtime.DispatchEvents(opts);
70+
71+
auto result = runtime.WaitFuture(future);
72+
UNIT_ASSERT(!result.IsSuccess());
7673
}
7774

78-
return refsMap;
79-
}
80-
81-
} // namespace
82-
83-
Y_UNIT_TEST_SUITE(KqpExecuter) {
8475
// TODO: Test shard write shuffle.
8576
/*
8677
Y_UNIT_TEST(BlindWriteDistributed) {
@@ -150,5 +141,4 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
150141
*/
151142
}
152143

153-
} // namspace NKqp
154144
} // namespace NKikimr

ydb/core/kqp/executer_actor/ut/ya.make

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@ IF (SANITIZER_TYPE OR WITH_VALGRIND)
77
ENDIF()
88

99
SRCS(
10-
# kqp_executer_ut.cpp
10+
kqp_executer_ut.cpp
1111
)
1212

1313
PEERDIR(
14-
ydb/core/kqp
1514
ydb/core/kqp/common
16-
ydb/core/kqp/host
1715
ydb/core/kqp/ut/common
18-
ydb/public/sdk/cpp/src/client/proto
19-
ydb/library/yql/providers/common/http_gateway
2016
yql/essentials/sql/pg_dummy
2117
)
2218

0 commit comments

Comments
 (0)