Skip to content

Commit c0a911c

Browse files
authored
simplify compile service invalidation (#26287)
1 parent 959cf5a commit c0a911c

File tree

10 files changed

+287
-159
lines changed

10 files changed

+287
-159
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include "kqp_compile_service_helpers.h"
2+
#include "google/protobuf/descriptor.pb.h"
3+
#include "util/generic/yexception.h"
4+
#include "util/string/builder.h"
5+
6+
#include <ydb/core/protos/table_service_config.pb.h>
7+
#include <ydb/core/protos/kqp_compile_settings.pb.h>
8+
9+
#include <google/protobuf/util/message_differencer.h>
10+
#include <google/protobuf/descriptor.h>
11+
12+
namespace NKikimr::NKqp {
13+
14+
std::vector<const google::protobuf::FieldDescriptor*> BuildFieldsToInvalidateCacheOnDiff(const google::protobuf::Descriptor* d) {
15+
std::vector<const google::protobuf::FieldDescriptor*> invalidateCacheOn;
16+
17+
for (int fieldIndex = 0; fieldIndex < d->field_count(); ++fieldIndex) {
18+
const auto* field = d->field(fieldIndex);
19+
if (field->options().GetExtension(NKikimrConfig::InvalidateCompileCache)) {
20+
invalidateCacheOn.push_back(field);
21+
}
22+
}
23+
24+
return invalidateCacheOn;
25+
}
26+
27+
28+
std::optional<TString> ShouldInvalidateCompileCache(const NKikimrConfig::TTableServiceConfig& prev, const NKikimrConfig::TTableServiceConfig& next)
29+
{
30+
TStringBuilder finalMessage;
31+
32+
bool ok = true;
33+
static const auto TopLevelOptions = BuildFieldsToInvalidateCacheOnDiff(NKikimrConfig::TTableServiceConfig::descriptor());
34+
static const auto RMOptions = BuildFieldsToInvalidateCacheOnDiff(NKikimrConfig::TTableServiceConfig::TResourceManager::descriptor());
35+
36+
{
37+
TString logMessage;
38+
::google::protobuf::util::MessageDifferencer differ;
39+
differ.ReportDifferencesToString(&logMessage);
40+
if (!differ.CompareWithFields(prev, next, TopLevelOptions, TopLevelOptions)) {
41+
ok = false;
42+
finalMessage << logMessage;
43+
}
44+
}
45+
46+
{
47+
TString logMessage;
48+
::google::protobuf::util::MessageDifferencer differ;
49+
differ.ReportDifferencesToString(&logMessage);
50+
if (!differ.CompareWithFields(prev.GetResourceManager(), next.GetResourceManager(), RMOptions, RMOptions)) {
51+
ok = false;
52+
finalMessage << logMessage;
53+
}
54+
}
55+
56+
if (ok) {
57+
return std::nullopt;
58+
}
59+
60+
return TString(finalMessage);
61+
}
62+
63+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include <util/generic/string.h>
5+
6+
namespace NKikimrConfig {
7+
class TTableServiceConfig;
8+
}
9+
10+
namespace NKikimr::NKqp {
11+
12+
std::optional<TString> ShouldInvalidateCompileCache(const NKikimrConfig::TTableServiceConfig& prev, const NKikimrConfig::TTableServiceConfig& next);
13+
14+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
2+
#include <ydb/core/kqp/compile_service/helpers/kqp_compile_service_helpers.h>
3+
#include <ydb/core/protos/table_service_config.pb.h>
4+
#include <ydb/core/protos/config.pb.h>
5+
#include <ydb/core/protos/kqp_compile_settings.pb.h>
6+
7+
#include <library/cpp/testing/unittest/registar.h>
8+
#include <google/protobuf/descriptor.h>
9+
#include "google/protobuf/descriptor.pb.h"
10+
11+
namespace NKikimr {
12+
namespace NKqp {
13+
14+
Y_UNIT_TEST_SUITE(KqpCompileServiceHelpers) {
15+
Y_UNIT_TEST(CheckInvalidator) {
16+
17+
NKikimrConfig::TTableServiceConfig prev;
18+
NKikimrConfig::TTableServiceConfig defaultEmpty;
19+
Cerr << ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty") << Endl;
20+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "empty");
21+
22+
prev.MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(31457280);
23+
24+
Cerr << prev.ShortUtf8DebugString() << Endl;
25+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
26+
Cerr << ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty") << Endl;
27+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "empty");
28+
29+
prev.MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(31457281);
30+
31+
Cerr << prev.ShortUtf8DebugString() << Endl;
32+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
33+
Cerr << ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty") << Endl;
34+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "modified: MkqlHeavyProgramMemoryLimit: 31457281 -> 31457280\n");
35+
36+
37+
prev.SetEnableKqpScanQuerySourceRead(true);
38+
Cerr << prev.ShortUtf8DebugString() << Endl;
39+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
40+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "modified: EnableKqpScanQuerySourceRead: true -> false\nmodified: MkqlHeavyProgramMemoryLimit: 31457281 -> 31457280\n");
41+
42+
prev.SetEnableKqpScanQuerySourceRead(true);
43+
prev.SetEnableKqpScanQueryStreamIdxLookupJoin(true);
44+
Cerr << prev.ShortUtf8DebugString() << Endl;
45+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
46+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "modified: EnableKqpScanQuerySourceRead: true -> false\nmodified: EnableKqpScanQueryStreamIdxLookupJoin: true -> false\nmodified: MkqlHeavyProgramMemoryLimit: 31457281 -> 31457280\n");
47+
48+
defaultEmpty.SetEnableKqpScanQuerySourceRead(true);
49+
defaultEmpty.SetEnableKqpScanQueryStreamIdxLookupJoin(true);
50+
Cerr << prev.ShortUtf8DebugString() << Endl;
51+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
52+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "modified: MkqlHeavyProgramMemoryLimit: 31457281 -> 31457280\n");
53+
54+
defaultEmpty.SetEnableKqpScanQuerySourceRead(true);
55+
defaultEmpty.MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(31457281);
56+
Cerr << prev.ShortUtf8DebugString() << Endl;
57+
Cerr << defaultEmpty.ShortUtf8DebugString() << Endl;
58+
UNIT_ASSERT_VALUES_EQUAL(ShouldInvalidateCompileCache(prev, defaultEmpty).value_or("empty"), "empty");
59+
}
60+
61+
Y_UNIT_TEST(OnlyRmAndTopLevelOptionsAreSupportedToInvalidate)
62+
{
63+
auto topLevel = NKikimrConfig::TTableServiceConfig::descriptor();
64+
auto rm = NKikimrConfig::TTableServiceConfig::TResourceManager::descriptor();
65+
66+
std::deque<const ::google::protobuf::Descriptor *> traversal;
67+
traversal.push_back(NKikimrConfig::TAppConfig::descriptor());
68+
std::unordered_set<const ::google::protobuf::Descriptor *> visited;
69+
70+
while(!traversal.empty()) {
71+
const auto* d = traversal.front();
72+
traversal.pop_front();
73+
74+
if (visited.contains(d)) {
75+
continue;
76+
}
77+
78+
visited.emplace(d);
79+
80+
for (int fieldIndex = 0; fieldIndex < d->field_count(); ++fieldIndex) {
81+
const auto* field = d->field(fieldIndex);
82+
if (field->options().GetExtension(NKikimrConfig::InvalidateCompileCache)) {
83+
UNIT_ASSERT_C(d->name() == topLevel->name() || d->name() == rm->name(),
84+
"Only TTableServiceConfig or ResourceManager fields can have InvalidateCompileCache extention, "
85+
"but " << d->name() << " found." <<
86+
"If you want more, update ShouldInvalidateCompileCache accordingly.");
87+
}
88+
89+
if (field->cpp_type() == ::google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE) {
90+
traversal.push_back(field->message_type());
91+
}
92+
}
93+
}
94+
UNIT_ASSERT(visited.contains(rm));
95+
UNIT_ASSERT(visited.contains(topLevel));
96+
}
97+
}
98+
}
99+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
UNITTEST_FOR(ydb/core/kqp/compile_service/helpers)
2+
3+
FORK_SUBTESTS()
4+
SPLIT_FACTOR(50)
5+
6+
IF (WITH_VALGRIND)
7+
SIZE(LARGE)
8+
TAG(ya:fat)
9+
ELSE()
10+
SIZE(MEDIUM)
11+
ENDIF()
12+
13+
SRCS(
14+
kqp_compile_cache_helpers_ut.cpp
15+
)
16+
17+
PEERDIR(
18+
ydb/core/protos
19+
library/cpp/testing/unittest
20+
)
21+
22+
23+
END()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
kqp_compile_service_helpers.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/protos
9+
)
10+
11+
END()
12+
13+
RECURSE_FOR_TESTS(ut)

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 9 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "kqp_compile_service.h"
2+
#include "helpers/kqp_compile_service_helpers.h"
23

34
#include <ydb/core/actorlib_impl/long_timer.h>
45
#include <ydb/core/base/appdata.h>
@@ -12,6 +13,7 @@
1213
#include <ydb/core/kqp/host/kqp_translate.h>
1314
#include <ydb/library/aclib/aclib.h>
1415

16+
1517
#include <ydb/library/actors/core/actor_bootstrapped.h>
1618
#include <ydb/library/actors/wilson/wilson_span.h>
1719
#include <ydb/library/actors/core/hfunc.h>
@@ -27,7 +29,6 @@ namespace NKqp {
2729
using namespace NKikimrConfig;
2830
using namespace NYql;
2931

30-
3132
struct TKqpCompileSettings {
3233
TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
3334
const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
@@ -350,123 +351,20 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
350351
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
351352
auto &event = ev->Get()->Record;
352353

353-
bool allowMultiBroadcasts = TableServiceConfig.GetAllowMultiBroadcasts();
354-
bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
355-
bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
356-
357-
bool enableKqpScanQuerySourceRead = TableServiceConfig.GetEnableKqpScanQuerySourceRead();
358-
359-
bool defaultSyntaxVersion = TableServiceConfig.GetSqlVersion();
360-
361-
ui64 rangesLimit = TableServiceConfig.GetExtractPredicateRangesLimit();
362-
ui64 paramLimitSize = TableServiceConfig.GetExtractPredicateParameterListSizeLimit();
363-
ui64 idxLookupPointsLimit = TableServiceConfig.GetIdxLookupJoinPointsLimit();
364-
365-
bool allowOlapDataQuery = TableServiceConfig.GetAllowOlapDataQuery();
366-
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
367-
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
368-
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
369-
bool enableStreamWrite = TableServiceConfig.GetEnableStreamWrite();
370-
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
371-
bool enableDataShardCreateTableAs = TableServiceConfig.GetEnableDataShardCreateTableAs();
372-
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();
373-
374-
bool enableAstCache = TableServiceConfig.GetEnableAstCache();
375-
bool enableImplicitQueryParameterTypes = TableServiceConfig.GetEnableImplicitQueryParameterTypes();
376-
bool enablePgConstsToParams = TableServiceConfig.GetEnablePgConstsToParams();
377-
bool enablePerStatementQueryExecution = TableServiceConfig.GetEnablePerStatementQueryExecution();
378-
379-
auto mkqlHeavyLimit = TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit();
380-
381-
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
382-
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
383-
bool enableFoldUdfs = TableServiceConfig.GetEnableFoldUdfs();
384-
385-
bool defaultEnableShuffleElimination = TableServiceConfig.GetDefaultEnableShuffleElimination();
386-
387-
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
388-
bool enableSpilling = TableServiceConfig.GetEnableQueryServiceSpilling();
389-
390-
bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();
391-
392-
bool enableNewRBO = TableServiceConfig.GetEnableNewRBO();
393-
bool enableSpillingInHashJoinShuffleConnections = TableServiceConfig.GetEnableSpillingInHashJoinShuffleConnections();
394-
bool enableOlapScalarApply = TableServiceConfig.GetEnableOlapScalarApply();
395-
bool enableOlapSubstringPushdown = TableServiceConfig.GetEnableOlapSubstringPushdown();
396-
397-
bool enableIndexStreamWrite = TableServiceConfig.GetEnableIndexStreamWrite();
398-
399-
bool enableOlapPushdownProjections = TableServiceConfig.GetEnableOlapPushdownProjections();
400-
401-
bool enableTempTablesForUser = TableServiceConfig.GetEnableTempTablesForUser();
402-
403-
bool enableSimpleProgramsSinglePartitionOptimization = TableServiceConfig.GetEnableSimpleProgramsSinglePartitionOptimization();
404-
bool enableSimpleProgramsSinglePartitionOptimizationBroadPrograms = TableServiceConfig.GetEnableSimpleProgramsSinglePartitionOptimizationBroadPrograms();
405-
406-
ui32 defaultLangVer = TableServiceConfig.GetDefaultLangVer();
407-
408-
bool enableOlapPushdownAggregate = TableServiceConfig.GetEnableOlapPushdownAggregate();
409-
410-
bool enableOrderOptimizaionFSM = TableServiceConfig.GetEnableOrderOptimizaionFSM();
354+
auto diff = ShouldInvalidateCompileCache(TableServiceConfig, event.GetConfig().GetTableServiceConfig());
355+
if (diff.has_value()) {
356+
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
357+
"Query cache was invalidated due to config change, config change differencer output: "
358+
<< diff.value());
411359

412-
bool enableTopSortSelectIndex = TableServiceConfig.GetEnableTopSortSelectIndex();
413-
bool enablePointPredicateSortAutoSelectIndex = TableServiceConfig.GetEnablePointPredicateSortAutoSelectIndex();
360+
QueryCache->Clear();
361+
}
414362

415363
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
416364
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
417365

418366
auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
419367
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
420-
421-
if (TableServiceConfig.GetSqlVersion() != defaultSyntaxVersion ||
422-
TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin ||
423-
TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin() != enableKqpDataQueryStreamIdxLookupJoin ||
424-
TableServiceConfig.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||
425-
TableServiceConfig.GetAllowOlapDataQuery() != allowOlapDataQuery ||
426-
TableServiceConfig.GetEnableStreamWrite() != enableStreamWrite ||
427-
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
428-
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
429-
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
430-
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
431-
TableServiceConfig.GetEnableDataShardCreateTableAs() != enableDataShardCreateTableAs ||
432-
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
433-
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
434-
TableServiceConfig.GetExtractPredicateParameterListSizeLimit() != paramLimitSize ||
435-
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
436-
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
437-
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
438-
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
439-
TableServiceConfig.GetEnableConstantFolding() != enableConstantFolding ||
440-
TableServiceConfig.GetEnableFoldUdfs() != enableFoldUdfs ||
441-
TableServiceConfig.GetEnableAstCache() != enableAstCache ||
442-
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
443-
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
444-
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
445-
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW ||
446-
TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts ||
447-
TableServiceConfig.GetDefaultEnableShuffleElimination() != defaultEnableShuffleElimination ||
448-
TableServiceConfig.GetEnableQueryServiceSpilling() != enableSpilling ||
449-
TableServiceConfig.GetEnableNewRBO() != enableNewRBO ||
450-
TableServiceConfig.GetEnableSpillingInHashJoinShuffleConnections() != enableSpillingInHashJoinShuffleConnections ||
451-
TableServiceConfig.GetEnableOlapScalarApply() != enableOlapScalarApply ||
452-
TableServiceConfig.GetEnableOlapSubstringPushdown() != enableOlapSubstringPushdown ||
453-
TableServiceConfig.GetEnableIndexStreamWrite() != enableIndexStreamWrite ||
454-
TableServiceConfig.GetEnableOlapPushdownProjections() != enableOlapPushdownProjections ||
455-
TableServiceConfig.GetEnableTempTablesForUser() != enableTempTablesForUser ||
456-
TableServiceConfig.GetEnableSimpleProgramsSinglePartitionOptimization() != enableSimpleProgramsSinglePartitionOptimization ||
457-
TableServiceConfig.GetEnableSimpleProgramsSinglePartitionOptimizationBroadPrograms() != enableSimpleProgramsSinglePartitionOptimizationBroadPrograms ||
458-
TableServiceConfig.GetDefaultLangVer() != defaultLangVer ||
459-
TableServiceConfig.GetEnableOlapPushdownAggregate() != enableOlapPushdownAggregate ||
460-
TableServiceConfig.GetEnableOrderOptimizaionFSM() != enableOrderOptimizaionFSM ||
461-
TableServiceConfig.GetEnableTopSortSelectIndex() != enableTopSortSelectIndex ||
462-
TableServiceConfig.GetEnablePointPredicateSortAutoSelectIndex() != enablePointPredicateSortAutoSelectIndex)
463-
{
464-
465-
QueryCache->Clear();
466-
467-
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
468-
"Query cache was invalidated due to config change");
469-
}
470368
}
471369

472370
void HandleUndelivery(TEvents::TEvUndelivered::TPtr& ev) {

ydb/core/kqp/compile_service/ya.make

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@ PEERDIR(
1313
ydb/core/kqp/federated_query
1414
ydb/core/kqp/host
1515
ydb/core/ydb_convert
16+
ydb/core/kqp/compile_service/helpers
1617
)
1718

1819
YQL_LAST_ABI_VERSION()
1920

2021
END()
22+
23+
RECURSE(
24+
helpers
25+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import "google/protobuf/descriptor.proto";
2+
3+
package NKikimrConfig;
4+
option java_package = "ru.yandex.kikimr.proto";
5+
6+
extend google.protobuf.FieldOptions {
7+
// Flags marked with (RequireCompileCacheInvalidate) = true requires cache invalidation right after the config changes
8+
optional bool InvalidateCompileCache = 56682;
9+
}

0 commit comments

Comments
 (0)