Skip to content

Commit 42cfd73

Browse files
authored
YQ-4778 fixed streaming query creation with another operations (#26550)
1 parent e217045 commit 42cfd73

File tree

2 files changed

+33
-37
lines changed

2 files changed

+33
-37
lines changed

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,8 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
200200
}
201201

202202
TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext& ctx) override {
203-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
204-
<< "ModifyPermissions is not yet implemented for intent determination transformer"));
205-
return TStatus::Error;
203+
Y_UNUSED(ctx, node);
204+
return TStatus::Ok;
206205
}
207206

208207
TStatus HandleCreateBackupCollection(TKiCreateBackupCollection node, TExprContext& ctx) override {
@@ -242,27 +241,23 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
242241
}
243242

244243
TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override {
245-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
246-
<< "CreateUser is not yet implemented for intent determination transformer"));
247-
return TStatus::Error;
244+
Y_UNUSED(ctx, node);
245+
return TStatus::Ok;
248246
}
249247

250248
TStatus HandleAlterUser(TKiAlterUser node, TExprContext& ctx) override {
251-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
252-
<< "AlterUser is not yet implemented for intent determination transformer"));
253-
return TStatus::Error;
249+
Y_UNUSED(ctx, node);
250+
return TStatus::Ok;
254251
}
255252

256253
TStatus HandleDropUser(TKiDropUser node, TExprContext& ctx) override {
257-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
258-
<< "DropUser is not yet implemented for intent determination transformer"));
259-
return TStatus::Error;
254+
Y_UNUSED(ctx, node);
255+
return TStatus::Ok;
260256
}
261257

262258
TStatus HandleUpsertObject(TKiUpsertObject node, TExprContext& ctx) override {
263-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
264-
<< "UpsertObject is not yet implemented for intent determination transformer"));
265-
return TStatus::Error;
259+
Y_UNUSED(ctx, node);
260+
return TStatus::Ok;
266261
}
267262

268263
TStatus HandleCreateObject(TKiCreateObject node, TExprContext& ctx) override {
@@ -284,50 +279,42 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
284279
}
285280

286281
TStatus HandleCreateGroup(TKiCreateGroup node, TExprContext& ctx) override {
287-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
288-
<< "CreateGroup is not yet implemented for intent determination transformer"));
289-
return TStatus::Error;
282+
Y_UNUSED(ctx, node);
283+
return TStatus::Ok;
290284
}
291285

292286
TStatus HandleAlterGroup(TKiAlterGroup node, TExprContext& ctx) override {
293-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
294-
<< "AlterGroup is not yet implemented for intent determination transformer"));
295-
return TStatus::Error;
287+
Y_UNUSED(ctx, node);
288+
return TStatus::Ok;
296289
}
297290

298291
TStatus HandleRenameGroup(TKiRenameGroup node, TExprContext& ctx) override {
299-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
300-
<< "RenameGroup is not yet implemented for intent determination transformer"));
301-
return TStatus::Error;
292+
Y_UNUSED(ctx, node);
293+
return TStatus::Ok;
302294
}
303295

304296
TStatus HandleDropGroup(TKiDropGroup node, TExprContext& ctx) override {
305-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
306-
<< "DropGroup is not yet implemented for intent determination transformer"));
307-
return TStatus::Error;
297+
Y_UNUSED(ctx, node);
298+
return TStatus::Ok;
308299
}
309300

310301
TStatus HandlePgDropObject(TPgDropObject node, TExprContext& ctx) override {
311-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
312-
<< "PgDropObject is not yet implemented for intent determination transformer"));
313-
return TStatus::Error;
302+
Y_UNUSED(ctx, node);
303+
return TStatus::Ok;
314304
}
315305

316306
TStatus HandleCreateSecret(TKiCreateSecret node, TExprContext& ctx) override {
317-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
318-
<< "CreateSecret is not yet implemented for intent determination transformer"));
307+
Y_UNUSED(ctx, node);
319308
return TStatus::Ok;
320309
}
321310

322311
TStatus HandleAlterSecret(TKiAlterSecret node, TExprContext& ctx) override {
323-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
324-
<< "AlterSecret is not yet implemented for intent determination transformer"));
312+
Y_UNUSED(ctx, node);
325313
return TStatus::Ok;
326314
}
327315

328316
TStatus HandleDropSecret(TKiDropSecret node, TExprContext& ctx) override {
329-
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
330-
<< "DropSecret is not yet implemented for intent determination transformer"));
317+
Y_UNUSED(ctx, node);
331318
return TStatus::Ok;
332319
}
333320

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1385,7 +1385,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) {
13851385
}
13861386

13871387
Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
1388-
Y_UNIT_TEST_F(CreateAndAlterStreamingQuery, TStreamingTestFixture) {
1388+
Y_UNIT_TEST_F(CreateAndAlterStreamingQuery, TStreamingWithSchemaSecretsTestFixture) {
13891389
constexpr char inputTopicName[] = "createAndAlterStreamingQueryInputTopic";
13901390
constexpr char outputTopicName[] = "createAndAlterStreamingQueryOutputTopic";
13911391
CreateTopic(inputTopicName);
@@ -1396,7 +1396,9 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
13961396

13971397
constexpr char queryName[] = "streamingQuery";
13981398
ExecQuery(fmt::format(R"(
1399+
CREATE SECRET test_secret WITH (value = "1234");
13991400
CREATE TABLE test_table1 (Key Int32 NOT NULL, PRIMARY KEY (Key));
1401+
GRANT ALL ON `/Root/test_table1` TO `test@builtin`;
14001402
CREATE STREAMING QUERY `{query_name}` AS
14011403
DO BEGIN
14021404
INSERT INTO `{pq_source}`.`{output_topic}`
@@ -1415,6 +1417,13 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
14151417
"output_topic"_a = outputTopicName
14161418
));
14171419

1420+
{
1421+
const auto tableDesc = Navigate(GetRuntime(), GetRuntime().AllocateEdgeActor(), "/Root/test_table1", NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
1422+
const auto& table = tableDesc->ResultSet.at(0);
1423+
UNIT_ASSERT_VALUES_EQUAL(table.Kind, NSchemeCache::TSchemeCacheNavigate::EKind::KindTable);
1424+
UNIT_ASSERT(table.SecurityObject->CheckAccess(NACLib::GenericFull, NACLib::TUserToken("test@builtin", {})));
1425+
}
1426+
14181427
CheckScriptExecutionsCount(1, 1);
14191428
Sleep(TDuration::Seconds(1));
14201429

0 commit comments

Comments
 (0)