Skip to content

Commit ae6c714

Browse files
committed
almost working draft
1 parent b729e3f commit ae6c714

File tree

5 files changed

+259
-10
lines changed

5 files changed

+259
-10
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6928,6 +6928,9 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte
69286928
)", EXPERIMENTAL) \
69296929
DECLARE(Bool, allow_experimental_hybrid_table, false, R"(
69306930
Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine.
6931+
)", EXPERIMENTAL) \
6932+
DECLARE(Bool, hybrid_table_auto_cast_columns, false, R"(
6933+
Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types.
69316934
)", EXPERIMENTAL) \
69326935
DECLARE(Bool, allow_experimental_codecs, false, R"(
69336936
If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5454
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
5555
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
5656
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
57+
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
58+
{"hybrid_table_auto_cast_columns", false, false, "New setting to automatically cast Hybrid table columns when segments disagree on types."}
5759
});
5860
addSettingsChanges(settings_changes_history, "25.8",
5961
{

src/Interpreters/ClusterProxy/SelectStreamFactory.cpp

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
#include <Interpreters/Cluster.h>
1616
#include <Interpreters/DatabaseCatalog.h>
1717
#include <Interpreters/AddDefaultDatabaseVisitor.h>
18+
#include <Interpreters/addTypeConversionToAST.h>
1819
#include <Interpreters/RequiredSourceColumnsVisitor.h>
1920
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
2021
#include <DataTypes/ObjectUtils.h>
2122
#include <Client/IConnections.h>
2223
#include <Parsers/ASTSelectQuery.h>
2324
#include <Parsers/ASTSetQuery.h>
25+
#include <Parsers/ASTIdentifier.h>
2426
#include <Processors/QueryPlan/QueryPlan.h>
2527
#include <Processors/QueryPlan/ReadFromRemote.h>
2628
#include <Processors/QueryPlan/ExpressionStep.h>
@@ -60,6 +62,46 @@ namespace FailPoints
6062
namespace ClusterProxy
6163
{
6264

65+
namespace
66+
{
67+
void applyHybridCastsToAST(
68+
ASTPtr & node,
69+
const ColumnsDescription * metadata_columns,
70+
const NameSet * columns_to_cast)
71+
{
72+
if (!metadata_columns || !columns_to_cast || columns_to_cast->empty() || !node)
73+
return;
74+
75+
if (auto * func = node->as<ASTFunction>(); func && func->name == "_CAST")
76+
return;
77+
78+
if (auto * identifier = node->as<ASTIdentifier>())
79+
{
80+
String candidate = identifier->name();
81+
String short_name = candidate;
82+
83+
auto dot_pos = candidate.rfind('.');
84+
if (dot_pos != String::npos && dot_pos + 1 < candidate.size())
85+
short_name = candidate.substr(dot_pos + 1);
86+
87+
if (columns_to_cast->contains(short_name))
88+
{
89+
if (auto expected_column_opt = metadata_columns->tryGetPhysical(short_name))
90+
{
91+
auto cast_ast = addTypeConversionToAST(node->clone(), expected_column_opt->type->getName());
92+
const auto & alias = identifier->alias.empty() ? short_name : identifier->alias;
93+
cast_ast->setAlias(alias);
94+
node = cast_ast;
95+
return;
96+
}
97+
}
98+
}
99+
100+
for (auto & child : node->children)
101+
applyHybridCastsToAST(child, metadata_columns, columns_to_cast);
102+
}
103+
}
104+
63105
/// select query has database, table and table function names as AST pointers
64106
/// Creates a copy of query, changes database, table and table function names.
65107
ASTPtr rewriteSelectQuery(
@@ -68,7 +110,9 @@ ASTPtr rewriteSelectQuery(
68110
const std::string & remote_database,
69111
const std::string & remote_table,
70112
ASTPtr table_function_ptr,
71-
ASTPtr additional_filter)
113+
ASTPtr additional_filter,
114+
const NameSet * columns_to_cast,
115+
const ColumnsDescription * metadata_columns)
72116
{
73117
auto modified_query_ast = query->clone();
74118

@@ -122,6 +166,9 @@ ASTPtr rewriteSelectQuery(
122166

123167
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
124168
}
169+
170+
if (columns_to_cast && !columns_to_cast->empty() && metadata_columns)
171+
applyHybridCastsToAST(modified_query_ast, metadata_columns, columns_to_cast);
125172
}
126173

127174
/// To make local JOIN works, default database should be added to table names.

src/Interpreters/ClusterProxy/SelectStreamFactory.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
#include <Analyzer/IQueryTreeNode.h>
44
#include <Client/ConnectionPool.h>
55
#include <Core/QueryProcessingStage.h>
6+
#include <Core/Names.h>
67
#include <Interpreters/Cluster.h>
78
#include <Interpreters/StorageID.h>
89
#include <Parsers/IAST_fwd.h>
910
#include <Storages/IStorage_fwd.h>
11+
#include <Storages/ColumnsDescription.h>
1012
#include <Storages/StorageSnapshot.h>
1113

1214
namespace DB
@@ -42,7 +44,9 @@ ASTPtr rewriteSelectQuery(
4244
const std::string & remote_database,
4345
const std::string & remote_table,
4446
ASTPtr table_function_ptr = nullptr,
45-
ASTPtr additional_filter = nullptr);
47+
ASTPtr additional_filter = nullptr,
48+
const NameSet * columns_to_cast = nullptr,
49+
const ColumnsDescription * metadata_columns = nullptr);
4650

4751
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
4852
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;

0 commit comments

Comments
 (0)