|
1 | 1 | #include "kqp_query_compiler.h"
|
2 | 2 |
|
| 3 | +#include <ydb/core/base/table_index.h> |
3 | 4 | #include <ydb/core/kqp/common/kqp_yql.h>
|
4 | 5 | #include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
|
5 | 6 | #include <ydb/core/kqp/opt/kqp_opt.h>
|
|
8 | 9 | #include <ydb/core/kqp/query_compiler/kqp_olap_compiler.h>
|
9 | 10 | #include <ydb/core/kqp/query_data/kqp_predictor.h>
|
10 | 11 | #include <ydb/core/kqp/query_data/kqp_request_predictor.h>
|
11 |
| -#include <ydb/core/ydb_convert/ydb_convert.h> |
12 |
| - |
13 |
| -#include <ydb/core/base/table_index.h> |
14 | 12 | #include <ydb/core/scheme/scheme_tabledefs.h>
|
| 13 | +#include <ydb/core/ydb_convert/ydb_convert.h> |
15 | 14 | #include <ydb/library/mkql_proto/mkql_proto.h>
|
16 |
| - |
17 |
| -#include <yql/essentials/core/dq_integration/yql_dq_integration.h> |
18 | 15 | #include <ydb/library/yql/dq/opt/dq_opt.h>
|
19 | 16 | #include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
|
20 | 17 | #include <ydb/library/yql/dq/tasks/dq_task_program.h>
|
21 |
| -#include <yql/essentials/minikql/mkql_node_serialization.h> |
22 |
| -#include <yql/essentials/providers/common/mkql/yql_type_mkql.h> |
23 |
| -#include <yql/essentials/providers/common/provider/yql_provider_names.h> |
24 |
| -#include <yql/essentials/providers/common/structured_token/yql_token_builder.h> |
| 18 | +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> |
25 | 19 | #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
|
26 | 20 | #include <ydb/library/yql/providers/s3/statistics/yql_s3_statistics.h>
|
| 21 | + |
| 22 | +#include <yql/essentials/core/dq_integration/yql_dq_integration.h> |
27 | 23 | #include <yql/essentials/core/yql_opt_utils.h>
|
28 | 24 | #include <yql/essentials/core/yql_type_helpers.h>
|
| 25 | +#include <yql/essentials/minikql/mkql_node_serialization.h> |
| 26 | +#include <yql/essentials/providers/common/mkql/yql_type_mkql.h> |
| 27 | +#include <yql/essentials/providers/common/provider/yql_provider_names.h> |
| 28 | +#include <yql/essentials/providers/common/structured_token/yql_token_builder.h> |
29 | 29 |
|
30 | 30 |
|
31 | 31 | namespace NKikimr {
|
@@ -589,6 +589,14 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro
|
589 | 589 | return issues;
|
590 | 590 | }
|
591 | 591 |
|
| 592 | +TStringBuf RemoveJoinAliases(TStringBuf keyName) { |
| 593 | + if (const auto idx = keyName.find_last_of('.'); idx != TString::npos) { |
| 594 | + return keyName.substr(idx + 1); |
| 595 | + } |
| 596 | + |
| 597 | + return keyName; |
| 598 | +} |
| 599 | + |
592 | 600 | class TKqpQueryCompiler : public IKqpQueryCompiler {
|
593 | 601 | public:
|
594 | 602 | TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
|
@@ -795,7 +803,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
|
795 | 803 | auto connection = input.Cast<TDqConnection>();
|
796 | 804 |
|
797 | 805 | auto& protoInput = *stageProto.AddInputs();
|
798 |
| - FillConnection(connection, stagesMap, protoInput, ctx, tablesMap, physicalStageByID); |
| 806 | + FillConnection(connection, stagesMap, protoInput, ctx, tablesMap, physicalStageByID, &stage, inputIndex); |
799 | 807 | protoInput.SetInputIndex(inputIndex);
|
800 | 808 | }
|
801 | 809 | }
|
@@ -1017,7 +1025,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
|
1017 | 1025 |
|
1018 | 1026 | auto& resultProto = *txProto.AddResults();
|
1019 | 1027 | auto& connectionProto = *resultProto.MutableConnection();
|
1020 |
| - FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap, physicalStageByID); |
| 1028 | + FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap, physicalStageByID, nullptr, 0); |
1021 | 1029 |
|
1022 | 1030 | const TTypeAnnotationNode* itemType = nullptr;
|
1023 | 1031 | switch (connectionProto.GetTypeCase()) {
|
@@ -1452,7 +1460,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
|
1452 | 1460 | NKqpProto::TKqpPhyConnection& connectionProto,
|
1453 | 1461 | TExprContext& ctx,
|
1454 | 1462 | THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap,
|
1455 |
| - THashMap<ui64, NKqpProto::TKqpPhyStage*>& physicalStageByID |
| 1463 | + THashMap<ui64, NKqpProto::TKqpPhyStage*>& physicalStageByID, |
| 1464 | + const TDqPhyStage* stage, |
| 1465 | + ui32 inputIndex |
1456 | 1466 | ) {
|
1457 | 1467 | auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId());
|
1458 | 1468 | YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: "
|
@@ -1819,6 +1829,59 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
|
1819 | 1829 | return;
|
1820 | 1830 | }
|
1821 | 1831 |
|
| 1832 | + if (auto maybeDqSourceStreamLookup = connection.Maybe<TDqCnStreamLookup>()) { |
| 1833 | + const auto streamLookup = maybeDqSourceStreamLookup.Cast(); |
| 1834 | + const auto lookupSourceWrap = streamLookup.RightInput().Cast<TDqLookupSourceWrap>(); |
| 1835 | + |
| 1836 | + const TStringBuf dataSourceCategory = lookupSourceWrap.DataSource().Category(); |
| 1837 | + const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory); |
| 1838 | + YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\""); |
| 1839 | + NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration(); |
| 1840 | + YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\""); |
| 1841 | + |
| 1842 | + auto& dqSourceLookupCn = *connectionProto.MutableDqSourceStreamLookup(); |
| 1843 | + auto& lookupSource = *dqSourceLookupCn.MutableLookupSource(); |
| 1844 | + auto& lookupSourceSettings = *lookupSource.MutableSettings(); |
| 1845 | + auto& lookupSourceType = *lookupSource.MutableType(); |
| 1846 | + dqIntegration->FillLookupSourceSettings(lookupSourceWrap.Ref(), lookupSourceSettings, lookupSourceType); |
| 1847 | + YQL_ENSURE(!lookupSourceSettings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node"); |
| 1848 | + YQL_ENSURE(lookupSourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node"); |
| 1849 | + |
| 1850 | + const auto& streamLookupOutput = streamLookup.Output(); |
| 1851 | + const auto connectionInputRowType = GetSeqItemType(streamLookupOutput.Ref().GetTypeAnn()); |
| 1852 | + YQL_ENSURE(connectionInputRowType->GetKind() == ETypeAnnotationKind::Struct); |
| 1853 | + const auto connectionOutputRowType = GetSeqItemType(streamLookup.Ref().GetTypeAnn()); |
| 1854 | + YQL_ENSURE(connectionOutputRowType->GetKind() == ETypeAnnotationKind::Struct); |
| 1855 | + YQL_ENSURE(stage); |
| 1856 | + dqSourceLookupCn.SetConnectionInputRowType(NYql::NCommon::GetSerializedTypeAnnotation(connectionInputRowType)); |
| 1857 | + dqSourceLookupCn.SetConnectionOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(connectionOutputRowType)); |
| 1858 | + dqSourceLookupCn.SetLookupRowType(NYql::NCommon::GetSerializedTypeAnnotation(lookupSourceWrap.RowType().Ref().GetTypeAnn())); |
| 1859 | + dqSourceLookupCn.SetInputStageRowType(NYql::NCommon::GetSerializedTypeAnnotation(GetSeqItemType(streamLookupOutput.Stage().Program().Ref().GetTypeAnn()))); |
| 1860 | + dqSourceLookupCn.SetOutputStageRowType(NYql::NCommon::GetSerializedTypeAnnotation(GetSeqItemType(stage->Program().Args().Arg(inputIndex).Ref().GetTypeAnn()))); |
| 1861 | + |
| 1862 | + const TString leftLabel(streamLookup.LeftLabel()); |
| 1863 | + dqSourceLookupCn.SetLeftLabel(leftLabel); |
| 1864 | + dqSourceLookupCn.SetRightLabel(streamLookup.RightLabel().StringValue()); |
| 1865 | + dqSourceLookupCn.SetJoinType(streamLookup.JoinType().StringValue()); |
| 1866 | + dqSourceLookupCn.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows())); |
| 1867 | + dqSourceLookupCn.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL())); |
| 1868 | + dqSourceLookupCn.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows())); |
| 1869 | + |
| 1870 | + if (const auto maybeMultiget = streamLookup.IsMultiget()) { |
| 1871 | + dqSourceLookupCn.SetIsMultiGet(FromString<bool>(maybeMultiget.Cast())); |
| 1872 | + } |
| 1873 | + |
| 1874 | + for (const auto& key : streamLookup.LeftJoinKeyNames()) { |
| 1875 | + *dqSourceLookupCn.AddLeftJoinKeyNames() = leftLabel ? RemoveJoinAliases(key) : key; |
| 1876 | + } |
| 1877 | + |
| 1878 | + for (const auto& key : streamLookup.RightJoinKeyNames()) { |
| 1879 | + *dqSourceLookupCn.AddRightJoinKeyNames() = RemoveJoinAliases(key); |
| 1880 | + } |
| 1881 | + |
| 1882 | + return; |
| 1883 | + } |
| 1884 | + |
1822 | 1885 | YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
|
1823 | 1886 | }
|
1824 | 1887 |
|
|
0 commit comments