Skip to content

Commit 68ade36

Browse files
authored
support consumers' availability_period setting in the yql (#26297)
KIKIMR-24054
1 parent f9757b8 commit 68ade36

File tree

4 files changed

+128
-19
lines changed

4 files changed

+128
-19
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,10 @@ namespace {
470470
protoConsumer->set_important(FromString<bool>(
471471
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
472472
));
473+
} else if (name == "setAvailabilityPeriod"sv) {
474+
auto period = TDuration::MicroSeconds(FromString<ui64>(setting.Value().Cast<TCoInterval>().Literal().Value()));
475+
protoConsumer->mutable_availability_period()->set_seconds(period.Seconds());
476+
protoConsumer->mutable_availability_period()->set_nanos(period.NanoSecondsOfSecond());
473477
} else if (name == "setReadFromTs") {
474478
ui64 tsValue = 0;
475479
if(setting.Value().Maybe<TCoDatetime>()) {
@@ -515,6 +519,12 @@ namespace {
515519
protoConsumer->set_set_important(FromString<bool>(
516520
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
517521
));
522+
} else if (name == "setAvailabilityPeriod"sv) {
523+
auto period = TDuration::MicroSeconds(FromString<ui64>(setting.Value().Cast<TCoInterval>().Literal().Value()));
524+
protoConsumer->mutable_set_availability_period()->set_seconds(period.Seconds());
525+
protoConsumer->mutable_set_availability_period()->set_nanos(period.NanoSecondsOfSecond());
526+
} else if (name == "resetAvailabilityPeriod"sv) {
527+
protoConsumer->mutable_reset_availability_period();
518528
} else if (name == "setReadFromTs") {
519529
ui64 tsValue = 0;
520530
if(setting.Value().Maybe<TCoDatetime>()) {
@@ -1717,7 +1727,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
17171727
return SyncError();
17181728
} else if (constraint.Name().Value() == "default") {
17191729
if (table.Metadata->Kind == EKikimrTableKind::Olap) {
1720-
ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()),
1730+
ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()),
17211731
"Default values are not supported in column tables"));
17221732
return SyncError();
17231733
}
@@ -2079,7 +2089,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
20792089
break;
20802090
}
20812091
default:
2082-
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), TStringBuilder()
2092+
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), TStringBuilder()
20832093
<< "Unknown index setting: " << name.StringValue()));
20842094
return SyncError();
20852095
}

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
10541054
break;
10551055
}
10561056
default:
1057-
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder()
1057+
ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder()
10581058
<< "Unknown index setting: " << name.StringValue()));
10591059
return IGraphTransformer::TStatus::Error;
10601060
}
@@ -1689,9 +1689,9 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
16891689
}
16901690
static bool CheckConsumerSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
16911691
for (const auto& setting : settings) {
1692-
auto name = setting.Name().Value();
1693-
auto val = TString(setting.Value().Cast<TCoDataCtor>().Literal().template Cast<TCoAtom>().Value());
1692+
const auto name = setting.Name().Value();
16941693
if (name == "setSupportedCodecs") {
1694+
auto val = TString(setting.Value().Cast<TCoDataCtor>().Literal().template Cast<TCoAtom>().Value());
16951695
auto codecsList = GetTopicCodecsFromString(val);
16961696
if (codecsList.empty()) {
16971697
ctx.AddError(TIssue(ctx.GetPosition(setting.Value().Ref().Pos()),

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4125,7 +4125,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41254125
} else {
41264126
query = Sprintf(R"(
41274127
--!syntax_v1
4128-
ALTER TABLE `/Root/TestTable` ADD INDEX vector_idx%d
4128+
ALTER TABLE `/Root/TestTable` ADD INDEX vector_idx%d
41294129
GLOBAL USING vector_kmeans_tree
41304130
ON (Embedding)
41314131
WITH (%s);
@@ -4149,18 +4149,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41494149
// valid settings:
41504150
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10", "");
41514151

4152-
// unknown index setting:
4152+
// unknown index setting:
41534153
check("XxX=YyY, similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41544154
"Unknown index setting: xxx");
41554155
check("XxX=42, similarity=inner_product, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41564156
"Unknown index setting: xxx");
4157-
4157+
41584158
// distance:
41594159
check("distance=XxX, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41604160
"Invalid distance: xxx");
41614161
check("distance=42, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41624162
"Invalid distance: 42");
4163-
4163+
41644164
// similarity
41654165
check("similarity=XxX, vector_type=float, vector_dimension=1024, levels=3, clusters=10",
41664166
"Invalid similarity: xxx");
@@ -4182,7 +4182,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41824182
"Invalid vector_type: 42");
41834183
check("similarity=inner_product, vector_dimension=1024, levels=3, clusters=10",
41844184
"vector_type should be set");
4185-
4185+
41864186
// vector_dimension
41874187
check("similarity=inner_product, vector_type=float, vector_dimension=XxX, levels=3, clusters=10",
41884188
"Invalid vector_dimension: xxx");
@@ -4199,7 +4199,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
41994199
"Invalid vector_dimension: 99999999999999999999");
42004200
check("similarity=inner_product, vector_type=float, levels=3, clusters=10",
42014201
"vector_dimension should be set");
4202-
4202+
42034203
// levels
42044204
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=XxX, clusters=2",
42054205
"Invalid levels: xxx");
@@ -4235,13 +4235,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
42354235
"Invalid clusters: 99999999999999999999");
42364236
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=1",
42374237
"clusters should be set");
4238-
4238+
42394239
// clusters^levels
42404240
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=10, clusters=10",
42414241
"Invalid clusters^levels: 10^10 should be less than 1073741824");
42424242
check("similarity=inner_product, vector_type=float, vector_dimension=1024, levels=16, clusters=1024",
42434243
"Invalid clusters^levels: 1024^16 should be less than 1073741824");
4244-
4244+
42454245
// vector_dimension*clusters
42464246
check("similarity=inner_product, vector_type=float, vector_dimension=2048, levels=1, clusters=2048", "");
42474247
check("similarity=inner_product, vector_type=float, vector_dimension=2049, levels=1, clusters=2048",
@@ -11500,6 +11500,77 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
1150011500
}
1150111501
}
1150211502

11503+
Y_UNIT_TEST_TWIN(CreateAndAlterTopicAvailabilityPeriod, UseQueryService) {
11504+
TKikimrRunner kikimr;
11505+
auto queryClient = kikimr.GetQueryClient();
11506+
auto db = kikimr.GetTableClient();
11507+
auto session = db.CreateSession().GetValueSync().GetSession();
11508+
11509+
auto executeQuery = [&queryClient, &session](const TString& query) {
11510+
return ExecuteGeneric<UseQueryService>(queryClient, session, query);
11511+
};
11512+
11513+
// ok
11514+
{
11515+
const auto query = R"(
11516+
--!syntax_v1
11517+
CREATE TOPIC `/Root/topic` (
11518+
CONSUMER cons1 WITH (availability_period = Interval('PT1H'))
11519+
)
11520+
)";
11521+
const auto result = executeQuery(query);
11522+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11523+
}
11524+
{
11525+
const auto query = R"(
11526+
--!syntax_v1
11527+
ALTER TOPIC `/Root/topic`
11528+
ALTER CONSUMER cons1 SET (availability_period = Interval('PT9H'))
11529+
)";
11530+
const auto result = executeQuery(query);
11531+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11532+
}
11533+
{
11534+
const auto query = R"(
11535+
--!syntax_v1
11536+
ALTER TOPIC `/Root/topic`
11537+
DROP CONSUMER cons1,
11538+
ADD CONSUMER cons2 WITH (availability_period = Interval('PT8H'))
11539+
)";
11540+
const auto result = executeQuery(query);
11541+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11542+
}
11543+
{
11544+
const auto query = R"(
11545+
--!syntax_v1
11546+
ALTER TOPIC `/Root/topic`
11547+
ALTER CONSUMER cons2 RESET (availability_period)
11548+
)";
11549+
const auto result = executeQuery(query);
11550+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11551+
}
11552+
// bad
11553+
{
11554+
const auto query = R"(
11555+
--!syntax_v1
11556+
ALTER TOPIC `/Root/topic`
11557+
ALTER CONSUMER cons2 SET (availability_period = 0)
11558+
)";
11559+
const auto result = executeQuery(query);
11560+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
11561+
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Interval type is expected", result.GetIssues().ToString());
11562+
}
11563+
{
11564+
const auto query = R"(
11565+
--!syntax_v1
11566+
ALTER TOPIC `/Root/topic`
11567+
ADD CONSUMER cons_neg WITH (availability_period = Interval('-PT8H'))
11568+
)";
11569+
const auto result = executeQuery(query);
11570+
UNIT_ASSERT_VALUES_UNEQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
11571+
}
11572+
}
11573+
1150311574
Y_UNIT_TEST(DisableResourcePools) {
1150411575
NKikimrConfig::TAppConfig config;
1150511576
config.MutableFeatureFlags()->SetEnableResourcePools(false);

ydb/services/persqueue_v1/topic_yql_ut.cpp

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
6969
const char *query = R"__(
7070
CREATE TOPIC `/Root/PQ/rt3.dc1--legacy--topic1` (
7171
CONSUMER c1,
72-
CONSUMER c2 WITH (important = true, read_from = 100, supported_codecs = 'RAW, LZOP, GZIP')
72+
CONSUMER c2 WITH (important = true, read_from = 100, supported_codecs = 'RAW, LZOP, GZIP'),
73+
CONSUMER c4 WITH (availability_period = Interval('PT9H'))
7374
) WITH (min_active_partitions = 2,
7475
max_active_partitions = 5,
7576
auto_partitioning_stabilization_window = Interval('PT1M'),
@@ -104,7 +105,7 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
104105
UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT));
105106
UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2);
106107

107-
UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2);
108+
UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 3);
108109

109110
auto& consumer0 = describe.GetConsumers()[0];
110111
UNIT_ASSERT_VALUES_EQUAL(consumer0.GetName(), "c1");
@@ -115,6 +116,11 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
115116
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), true);
116117
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 3);
117118
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 100 * 1000);
119+
120+
auto& consumer4 = describe.GetConsumers()[2];
121+
UNIT_ASSERT_VALUES_EQUAL(consumer4.GetName(), "c4");
122+
UNIT_ASSERT_VALUES_EQUAL(consumer4.GetImportant(), false);
123+
UNIT_ASSERT_VALUES_EQUAL(consumer4.GetAvailabilityPeriodMs(), TDuration::Hours(9).MilliSeconds());
118124
}
119125
auto expectedDescr = pqGroup.GetPQTabletConfig();
120126
{
@@ -125,11 +131,13 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
125131

126132
expectedDescr.MutablePartitionStrategy()->SetMinPartitionCount(3);
127133
expectedDescr.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000);
134+
expectedDescr.MutableConsumers(2)->SetAvailabilityPeriodMs(TDuration::Hours(48).MilliSeconds());
128135
}
129136

130137
const char *query2 = R"__(
131138
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
132139
ALTER CONSUMER c2 SET (read_from = Timestamp('2021-01-01T01:01:01Z')),
140+
ALTER CONSUMER c4 SET (availability_period = Interval('PT48H')),
133141
SET (min_active_partitions = 3,
134142
retention_period = Interval('PT2H'),
135143
retention_storage_mb = 10,
@@ -138,6 +146,7 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
138146
partition_write_speed_bytes_per_second = 9001
139147
);
140148
)__";
149+
Cerr << "\nRun query: \n" << query2 << Endl;
141150
server.AnnoyingClient->RunYqlSchemeQuery(query2);
142151
auto pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription()
143152
.GetPersQueueGroup();
@@ -153,9 +162,10 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
153162
DROP CONSUMER c1,
154163
ALTER CONSUMER c2 SET (important = false, read_from = Datetime('2021-01-01T01:01:01Z'), supported_codecs = 'RAW, GZIP'),
155164
ADD CONSUMER c3 WITH (important = true),
165+
ALTER CONSUMER c4 RESET (availability_period),
156166
SET (supported_codecs = 'RAW');
157167
)__";
158-
Cerr << "\nRun query: \n" << query2 << Endl;
168+
Cerr << "\nRun query: \n" << query3 << Endl;
159169
server.AnnoyingClient->RunYqlSchemeQuery(query3);
160170

161171
pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription()
@@ -169,15 +179,20 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
169179
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100501);
170180
UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001);
171181

172-
UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2);
182+
UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 3);
173183

174184
auto& consumer1 = describe.GetConsumers(0);
175185
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2");
176186
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), false);
177187
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 1609462861000);
178188
UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 2);
179189

180-
auto& consumer2 = describe.GetConsumers(1);
190+
auto& consumer4 = describe.GetConsumers(1);
191+
UNIT_ASSERT_VALUES_EQUAL(consumer4.GetName(), "c4");
192+
UNIT_ASSERT_VALUES_EQUAL(consumer4.GetImportant(), false);
193+
UNIT_ASSERT_VALUES_EQUAL(consumer4.HasAvailabilityPeriodMs(), false);
194+
195+
auto& consumer2 = describe.GetConsumers(2);
181196
UNIT_ASSERT_VALUES_EQUAL(consumer2.GetName(), "c3");
182197
UNIT_ASSERT_VALUES_EQUAL(consumer2.GetImportant(), true);
183198
UNIT_ASSERT_VALUES_EQUAL(consumer2.GetCodec().IdsSize(), 0);
@@ -271,7 +286,20 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
271286
)__";
272287
server.AnnoyingClient->RunYqlSchemeQuery(query, false);
273288
}
274-
289+
{
290+
const char *query = R"__(
291+
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
292+
ADD CONSUMER c4 WITH (availability_period = true);
293+
)__";
294+
server.AnnoyingClient->RunYqlSchemeQuery(query, false);
295+
}
296+
{
297+
const char *query = R"__(
298+
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
299+
ADD CONSUMER c4 WITH (availability_period = Interval('PT9H'), important = true);
300+
)__";
301+
server.AnnoyingClient->RunYqlSchemeQuery(query, false);
302+
}
275303
}
276304
};
277305

0 commit comments

Comments
 (0)