Skip to content

Commit 7962533

Browse files
authored
kqp: add DqSourceStreamLookupJoin featureflag (#34055)
1 parent 7c749a5 commit 7962533

File tree

3 files changed

+42
-24
lines changed

3 files changed

+42
-24
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2011,7 +2011,7 @@ class TKqpHost : public IKqpHost {
20112011
return WaitAll(futures);
20122012
};
20132013
}
2014-
TypesCtx->StreamLookupJoin = true;
2014+
TypesCtx->StreamLookupJoin = Config->GetEnableDqSourceStreamLookupJoin();
20152015
}
20162016

20172017
InitPgProvider();

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2989,8 +2989,14 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
29892989
}, /* sort */ true);
29902990
}
29912991

2992-
Y_UNIT_TEST_F(StreamingQueryWithStreamLookupJoin, TStreamingTestFixture) {
2993-
SetupAppConfig().MutableQueryServiceConfig()->SetProgressStatsPeriodMs(0);
2992+
Y_UNIT_TEST_TWIN_F(StreamingQueryWithStreamLookupJoin, WithFeatureFlag, TStreamingTestFixture) {
2993+
{
2994+
auto& setupAppConfig = SetupAppConfig();
2995+
setupAppConfig.MutableQueryServiceConfig()->SetProgressStatsPeriodMs(0);
2996+
if (WithFeatureFlag) {
2997+
setupAppConfig.MutableTableServiceConfig()->SetEnableDqSourceStreamLookupJoin(true);
2998+
}
2999+
}
29943000

29953001
const auto connectorClient = SetupMockConnectorClient();
29963002
const auto pqGateway = SetupMockPqGateway();
@@ -3026,29 +3032,31 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
30263032
.DescribeCount = 2,
30273033
// Now List Split is done after type annotation, that is the
30283034
// reason why this value equal to 4 not 5
3029-
.ListSplitsCount = 4,
3035+
.ListSplitsCount = WithFeatureFlag ? 4 : 0,
30303036
.ValidateListSplitsArgs = false
30313037
});
30323038

3033-
ui64 readSplitsCount = 0;
3034-
const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"};
3035-
SetupMockConnectorTableData(connectorClient, {
3036-
.TableName = ydbTable,
3037-
.Columns = columns,
3038-
.NumberReadSplits = 3,
3039-
.ValidateReadSplitsArgs = false,
3040-
.ResultFactory = [&]() {
3041-
readSplitsCount += 1;
3042-
const auto payloadColumn = readSplitsCount < 3
3043-
? std::vector<std::string>{"P1", "P2", "P3"}
3044-
: std::vector<std::string>{"P4", "P5", "P6"};
3045-
3046-
return MakeRecordBatch(
3047-
MakeArray<arrow::BinaryBuilder>("fqdn", fqdnColumn, arrow::binary()),
3048-
MakeArray<arrow::BinaryBuilder>("payload", payloadColumn, arrow::binary())
3049-
);
3050-
}
3051-
});
3039+
if (WithFeatureFlag) {
3040+
ui64 readSplitsCount = 0;
3041+
const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"};
3042+
SetupMockConnectorTableData(connectorClient, {
3043+
.TableName = ydbTable,
3044+
.Columns = columns,
3045+
.NumberReadSplits = 3,
3046+
.ValidateReadSplitsArgs = false,
3047+
.ResultFactory = [&]() {
3048+
readSplitsCount += 1;
3049+
const auto payloadColumn = readSplitsCount < 3
3050+
? std::vector<std::string>{"P1", "P2", "P3"}
3051+
: std::vector<std::string>{"P4", "P5", "P6"};
3052+
3053+
return MakeRecordBatch(
3054+
MakeArray<arrow::BinaryBuilder>("fqdn", fqdnColumn, arrow::binary()),
3055+
MakeArray<arrow::BinaryBuilder>("payload", payloadColumn, arrow::binary())
3056+
);
3057+
}
3058+
});
3059+
}
30523060
}
30533061

30543062
constexpr char queryName[] = "streamingQuery";
@@ -3079,7 +3087,12 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
30793087
"ydb_table"_a = ydbTable,
30803088
"input_topic"_a = inputTopicName,
30813089
"output_topic"_a = outputTopicName
3082-
));
3090+
),
3091+
WithFeatureFlag ? EStatus::SUCCESS : EStatus::GENERIC_ERROR,
3092+
WithFeatureFlag ? "" : "Unsupported join strategy: streamlookup");
3093+
if (!WithFeatureFlag) {
3094+
return;
3095+
}
30833096

30843097
CheckScriptExecutionsCount(1, 1);
30853098

@@ -3923,6 +3936,10 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
39233936
}
39243937

39253938
Y_UNIT_TEST_F(CheckpointPropagationWithStreamLookupJoinHanging, TStreamingTestFixture) {
3939+
{
3940+
auto& setupAppConfig = SetupAppConfig();
3941+
setupAppConfig.MutableTableServiceConfig()->SetEnableDqSourceStreamLookupJoin(true);
3942+
}
39263943
const auto connectorClient = SetupMockConnectorClient();
39273944

39283945
constexpr char inputTopicName[] = "sljInputTopicName";

ydb/core/protos/table_service_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,4 +478,5 @@ message TTableServiceConfig {
478478
optional bool Antlr4ParserIsAmbiguityError = 119 [default = false, (InvalidateCompileCache) = true];
479479

480480
optional bool DqHashOperatorsUseBlocks = 120 [default = false, (InvalidateCompileCache) = true];
481+
optional bool EnableDqSourceStreamLookupJoin = 121 [default = false, (InvalidateCompileCache) = true];
481482
};

0 commit comments

Comments
 (0)