Skip to content

Commit 7d5a42c

Browse files
authored
Merge branch 'antalya-25.8' into fp_antalya_25_8_export_mt_part
2 parents 97cd90f + 410f2f3 commit 7d5a42c

26 files changed

+1455
-69
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
---
2+
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
3+
slug: /engines/table-engines/special/tiered-distributed
4+
title: 'Hybrid Table Engine'
5+
sidebar_label: 'Hybrid'
6+
sidebar_position: 11
7+
---
8+
9+
# Hybrid table engine
10+
11+
`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
12+
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
13+
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.
14+
15+
It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.
16+
17+
Typical use cases include:
18+
19+
- Zero-downtime migrations where "old" and "new" replicas temporarily overlap.
20+
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
21+
- Gradual roll-outs where only a subset of rows should be served from a new backend.
22+
23+
By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
24+
25+
## Engine definition
26+
27+
```sql
28+
CREATE TABLE [IF NOT EXISTS] [db.]table_name
29+
(
30+
column1 type1,
31+
column2 type2,
32+
...
33+
)
34+
ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...])
35+
```
36+
37+
You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements.
38+
39+
### Arguments and behaviour
40+
41+
- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
42+
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
43+
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
44+
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
45+
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
46+
47+
## Example: local cluster plus S3 historical tier
48+
49+
The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
50+
51+
```sql
52+
-- Local MergeTree table that keeps current data
53+
CREATE OR REPLACE TABLE btc_blocks_local
54+
(
55+
`hash` FixedString(64),
56+
`version` Int64,
57+
`mediantime` DateTime64(9),
58+
`nonce` Int64,
59+
`bits` FixedString(8),
60+
`difficulty` Float64,
61+
`chainwork` FixedString(64),
62+
`size` Int64,
63+
`weight` Int64,
64+
`coinbase_param` String,
65+
`number` Int64,
66+
`transaction_count` Int64,
67+
`merkle_root` FixedString(64),
68+
`stripped_size` Int64,
69+
`timestamp` DateTime64(9),
70+
`date` Date
71+
)
72+
ENGINE = MergeTree
73+
ORDER BY (timestamp)
74+
PARTITION BY toYYYYMM(date);
75+
76+
-- Hybrid table that unions the local shard with historical data in S3
77+
CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
78+
remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01',
79+
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
80+
) AS btc_blocks_local;
81+
82+
-- Writes target the first (remote) layer
83+
INSERT INTO btc_blocks
84+
SELECT *
85+
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
86+
WHERE date BETWEEN '2025-09-01' AND '2025-09-30';
87+
88+
-- Reads seamlessly combine both predicates
89+
SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3
90+
SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3)
91+
SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always
92+
93+
94+
-- Run analytic queries as usual
95+
SELECT
96+
date,
97+
count(),
98+
uniqExact(CAST(hash, 'Nullable(String)')) AS hashes,
99+
sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen
100+
FROM btc_blocks
101+
WHERE date BETWEEN '2025-08-01' AND '2025-09-30'
102+
GROUP BY date
103+
ORDER BY date;
104+
```
105+
106+
Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 158 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,31 @@ bool GlueCatalog::tryGetTableMetadata(
317317
{
318318
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
319319
}
320+
else if (table_outcome.GetStorageDescriptor().LocationHasBeenSet())
321+
{
322+
const auto & location = table_outcome.GetStorageDescriptor().GetLocation();
323+
324+
std::string location_with_slash = location;
325+
if (!location_with_slash.ends_with('/'))
326+
location_with_slash += '/';
327+
328+
// Resolve the actual metadata file path based on table location
329+
std::string resolved_metadata_path = resolveMetadataPathFromTableLocation(location_with_slash, result);
330+
if (resolved_metadata_path.empty())
331+
{
332+
result.setTableIsNotReadable(fmt::format("Could not determine metadata_location of table `{}`. ",
333+
database_name + "." + table_name));
334+
}
335+
else
336+
{
337+
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = resolved_metadata_path});
338+
}
339+
}
320340
else
321341
{
322-
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
323-
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
324-
database_name + "." + table_name));
342+
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
343+
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
344+
database_name + "." + table_name));
325345
}
326346
};
327347

@@ -415,37 +435,41 @@ bool GlueCatalog::empty() const
415435
bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
416436
{
417437
String metadata_path;
438+
String metadata_uri;
418439
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
419440
table_specific_properties.has_value())
420441
{
421442
metadata_path = table_specific_properties->iceberg_metadata_file_location;
443+
metadata_uri = metadata_path;
422444
if (metadata_path.starts_with("s3:/"))
423445
metadata_path = metadata_path.substr(5);
424446

425-
// Delete bucket
447+
// Delete bucket from path
426448
std::size_t pos = metadata_path.find('/');
427449
if (pos != std::string::npos)
428450
metadata_path = metadata_path.substr(pos + 1);
429451
}
430452
else
431-
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");
453+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to read table metadata, reason why table is unreadable: {}", table_metadata.getReasonWhyTableIsUnreadable());
432454

433-
if (!metadata_objects.get(metadata_path))
455+
if (!metadata_objects.get(metadata_uri))
434456
{
435457
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
436458
DB::ASTs args = storage->engine->arguments->children;
437459

438-
auto table_endpoint = settings.storage_endpoint;
460+
String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri;
461+
439462
if (args.empty())
440-
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
463+
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
441464
else
442-
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);
465+
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);
443466

444-
if (args.size() == 1 && table_metadata.hasStorageCredentials())
467+
if (args.size() == 1)
445468
{
446-
auto storage_credentials = table_metadata.getStorageCredentials();
447-
if (storage_credentials)
448-
storage_credentials->addCredentialsToEngineArgs(args);
469+
if (table_metadata.hasStorageCredentials())
470+
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
471+
else if (!credentials.IsExpiredOrEmpty())
472+
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
449473
}
450474

451475
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
@@ -464,9 +488,9 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
464488
Poco::JSON::Parser parser;
465489
Poco::Dynamic::Var result = parser.parse(metadata_file);
466490
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
467-
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
491+
metadata_objects.set(metadata_uri, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
468492
}
469-
auto metadata_object = *metadata_objects.get(metadata_path);
493+
auto metadata_object = *metadata_objects.get(metadata_uri);
470494
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
471495
auto schemas = metadata_object->getArray(DB::Iceberg::f_schemas);
472496
for (size_t i = 0; i < schemas->size(); ++i)
@@ -487,6 +511,125 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
487511
return false;
488512
}
489513

514+
/// This function tries two resolve the metadata file path by following means:
515+
/// 1. Tries to read version-hint.text to get the latest version.
516+
/// 2. Lists all *.metadata.json files in the metadata directory and takes the most recent one.
517+
String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const
518+
{
519+
// Construct path to version-hint.text
520+
String version_hint_path = table_location + "metadata/version-hint.text";
521+
522+
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
523+
DB::ASTs args = storage->engine->arguments->children;
524+
525+
String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : table_location;
526+
if (args.empty())
527+
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
528+
else
529+
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);
530+
531+
if (args.size() == 1 && table_metadata.hasStorageCredentials())
532+
{
533+
auto storage_credentials = table_metadata.getStorageCredentials();
534+
if (storage_credentials)
535+
storage_credentials->addCredentialsToEngineArgs(args);
536+
}
537+
538+
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
539+
storage_settings->loadFromSettingsChanges(settings.allChanged());
540+
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
541+
configuration->initialize(args, getContext(), false);
542+
543+
auto object_storage = configuration->createObjectStorage(getContext(), true);
544+
const auto & read_settings = getContext()->getReadSettings();
545+
546+
try
547+
{
548+
// Try to read version-hint.text to get the latest version
549+
String version_hint_object_path = version_hint_path;
550+
if (version_hint_object_path.starts_with("s3://"))
551+
{
552+
version_hint_object_path = version_hint_object_path.substr(5);
553+
// Remove bucket from path
554+
std::size_t pos = version_hint_object_path.find('/');
555+
if (pos != std::string::npos)
556+
version_hint_object_path = version_hint_object_path.substr(pos + 1);
557+
}
558+
559+
DB::StoredObject version_hint_stored_object(version_hint_object_path);
560+
auto version_hint_buf = object_storage->readObject(version_hint_stored_object, read_settings);
561+
String version_str;
562+
readString(version_str, *version_hint_buf);
563+
564+
boost::algorithm::trim(version_str);
565+
566+
LOG_TRACE(log, "Read version {} from version-hint.text for table location '{}'", version_str, table_location);
567+
568+
return table_location + "metadata/v" + version_str + "-metadata.json";
569+
}
570+
catch (...)
571+
{
572+
LOG_TRACE(log, "Could not read version-hint.text from '{}', trying to find latest metadata file", version_hint_path);
573+
574+
try
575+
{
576+
String bucket_with_prefix;
577+
String metadata_dir = table_location + "metadata/";
578+
String metadata_dir_path = metadata_dir;
579+
580+
if (metadata_dir_path.starts_with("s3://"))
581+
{
582+
metadata_dir_path = metadata_dir_path.substr(5);
583+
// Remove bucket from path
584+
std::size_t pos = metadata_dir_path.find('/');
585+
if (pos != std::string::npos)
586+
{
587+
metadata_dir_path = metadata_dir_path.substr(pos + 1);
588+
bucket_with_prefix = table_location.substr(0, pos + 6);
589+
}
590+
}
591+
else
592+
return "";
593+
594+
// List all files in metadata directory
595+
DB::RelativePathsWithMetadata files;
596+
object_storage->listObjects(metadata_dir_path, files, 0);
597+
598+
// Filter for .metadata.json files and find the most recent one
599+
String latest_metadata_file;
600+
std::optional<DB::ObjectMetadata> latest_metadata;
601+
602+
for (const auto & file : files)
603+
{
604+
if (file->getPath().ends_with(".metadata.json"))
605+
{
606+
// Get file metadata to check last modified time
607+
if (!latest_metadata.has_value() ||
608+
(file->metadata->last_modified > latest_metadata->last_modified))
609+
{
610+
latest_metadata_file = file->getPath();
611+
latest_metadata = file->metadata;
612+
}
613+
}
614+
}
615+
616+
if (!latest_metadata_file.empty())
617+
{
618+
LOG_TRACE(log, "Found latest metadata file: {}", latest_metadata_file);
619+
return bucket_with_prefix + latest_metadata_file;
620+
}
621+
622+
LOG_TRACE(log, "No <...>.metadata.json files found,");
623+
return "";
624+
}
625+
catch (...)
626+
{
627+
LOG_TRACE(log, "Failed to list metadata directory");
628+
return "";
629+
}
630+
}
631+
}
632+
490633
void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) const
491634
{
492635
Aws::Glue::Model::CreateDatabaseRequest create_request;

src/Databases/DataLake/GlueCatalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
8181
/// This method allows to clarify the actual type of the timestamp column.
8282
bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const;
8383

84+
String resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const;
85+
8486
mutable DB::CacheBase<String, Poco::JSON::Object::Ptr> metadata_objects;
8587
};
8688

src/Interpreters/ClusterProxy/SelectStreamFactory.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ ASTPtr rewriteSelectQuery(
6767
const ASTPtr & query,
6868
const std::string & remote_database,
6969
const std::string & remote_table,
70-
ASTPtr table_function_ptr)
70+
ASTPtr table_function_ptr,
71+
ASTPtr additional_filter)
7172
{
7273
auto modified_query_ast = query->clone();
7374

@@ -80,8 +81,33 @@ ASTPtr rewriteSelectQuery(
8081

8182
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
8283
{
84+
// Apply additional filter if provided
85+
if (additional_filter)
86+
{
87+
if (select_query.where())
88+
{
89+
/// WHERE <old> AND <filter>
90+
select_query.setExpression(
91+
ASTSelectQuery::Expression::WHERE,
92+
makeASTFunction("and", select_query.where(), additional_filter->clone()));
93+
}
94+
else
95+
{
96+
/// No WHERE – simply set it
97+
select_query.setExpression(
98+
ASTSelectQuery::Expression::WHERE, additional_filter->clone());
99+
}
100+
}
101+
83102
if (table_function_ptr)
84-
select_query.addTableFunction(table_function_ptr);
103+
{
104+
select_query.addTableFunction(table_function_ptr->clone());
105+
106+
// Reset semantic table information for all column identifiers to prevent
107+
// RestoreQualifiedNamesVisitor from adding wrong table names
108+
ResetSemanticTableVisitor::Data data;
109+
ResetSemanticTableVisitor(data).visit(modified_query_ast);
110+
}
85111
else
86112
select_query.replaceDatabaseAndTable(remote_database, remote_table);
87113

@@ -93,6 +119,7 @@ ASTPtr rewriteSelectQuery(
93119
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as<ASTSelectQuery &>(), 0));
94120
data.remote_table.database = remote_database;
95121
data.remote_table.table = remote_table;
122+
96123
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
97124
}
98125
}

0 commit comments

Comments
 (0)