Skip to content

Commit 6d790e7

Browse files
authored
Allow empty inputs for stage compilation (#17009)
1 parent f4861d7 commit 6d790e7

File tree

2 files changed

+82
-11
lines changed

2 files changed

+82
-11
lines changed

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,6 +2211,73 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
22112211
UNIT_ASSERT(result.GetStatus() == NYdb::EStatus::OVERLOADED);
22122212
}
22132213
}
2214+
2215+
Y_UNIT_TEST(TableSinkWithSubquery) {
2216+
NKikimrConfig::TAppConfig appConfig;
2217+
auto settings = TKikimrSettings()
2218+
.SetAppConfig(appConfig)
2219+
.SetWithSampleTables(false);
2220+
2221+
TKikimrRunner kikimr(settings);
2222+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
2223+
2224+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
2225+
2226+
const TString query = R"(
2227+
CREATE TABLE `/Root/table1` (
2228+
p1 Utf8,
2229+
PRIMARY KEY (p1)
2230+
)
2231+
WITH (
2232+
STORE = ROW
2233+
);
2234+
2235+
CREATE TABLE `/Root/table2` (
2236+
p1 Utf8,
2237+
PRIMARY KEY (p1)
2238+
)
2239+
WITH (
2240+
STORE = ROW
2241+
);
2242+
)";
2243+
2244+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2245+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2246+
2247+
auto client = kikimr.GetQueryClient();
2248+
2249+
{
2250+
auto prepareResult = client.ExecuteQuery(R"(
2251+
UPSERT INTO `/Root/table1` (p1) VALUES ("a") , ("b"), ("c");
2252+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2253+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2254+
}
2255+
2256+
{
2257+
auto result = client.ExecuteQuery(R"(
2258+
$data2 = Cast(AsList() As List<Struct<p1: Utf8>>);
2259+
2260+
/* query */
2261+
SELECT d1.p1 AS p1,
2262+
FROM `/Root/table1` AS d1
2263+
CROSS JOIN AS_TABLE($data2) AS d2;
2264+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2265+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2266+
}
2267+
2268+
{
2269+
auto result = client.ExecuteQuery(R"(
2270+
$data2 = Cast(AsList() As List<Struct<p1: Utf8>>);
2271+
2272+
/* query */
2273+
INSERT INTO `/Root/table1`
2274+
SELECT d1.p1 AS p1,
2275+
FROM `/Root/table2` AS d1
2276+
CROSS JOIN AS_TABLE($data2) AS d2;
2277+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2278+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2279+
}
2280+
}
22142281
}
22152282

22162283
} // namespace NKqp

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -608,18 +608,22 @@ class TDqTaskRunner : public IDqTaskRunner {
608608
}
609609
}
610610

611-
auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true);
612-
if (transform) {
613-
transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {}, Stats->StartTs, InputConsumed, PgBuilder_.get());
614-
inputs.clear();
615-
inputs.emplace_back(transform->TransformOutput);
616-
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
617-
CreateInputUnionValue(transform->TransformOutput->GetInputType(), std::move(inputs), holderFactory,
618-
{inputStats, transform->TransformOutputType}, Stats->StartTs, InputConsumed));
611+
auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, false);
612+
if (entryNode) {
613+
if (transform) {
614+
transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {}, Stats->StartTs, InputConsumed, PgBuilder_.get());
615+
inputs.clear();
616+
inputs.emplace_back(transform->TransformOutput);
617+
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
618+
CreateInputUnionValue(transform->TransformOutput->GetInputType(), std::move(inputs), holderFactory,
619+
{inputStats, transform->TransformOutputType}, Stats->StartTs, InputConsumed));
620+
} else {
621+
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
622+
DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory,
623+
{inputStats, entry->InputItemTypes[i]}, Stats->StartTs, InputConsumed, PgBuilder_.get()));
624+
}
619625
} else {
620-
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
621-
DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory,
622-
{inputStats, entry->InputItemTypes[i]}, Stats->StartTs, InputConsumed, PgBuilder_.get()));
626+
// In some cases we don't need input. For example, for joining EmptyIterator with table.
623627
}
624628
}
625629

0 commit comments

Comments
 (0)