Skip to content

Commit 9015f22

Browse files
authored
Merge branch 'antalya-25.8' into mf_25.8_hybrid2
2 parents 6940b68 + d6bcd47 commit 9015f22

File tree

12 files changed

+122
-91
lines changed

12 files changed

+122
-91
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1
2+
raw_blob String
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Tags: no-parallel, no-fasttest, no-random-settings
2+
3+
INSERT INTO FUNCTION s3(
4+
s3_conn,
5+
filename='03631',
6+
format=Parquet,
7+
partition_strategy='hive',
8+
partition_columns_in_data_file=1) PARTITION BY (year, country) SELECT 'Brazil' as country, 2025 as year, 1 as id;
9+
10+
-- distinct because minio isn't cleaned up
11+
SELECT count(distinct year) FROM s3(s3_conn, filename='03631/**.parquet', format=RawBLOB) SETTINGS use_hive_partitioning=1;
12+
13+
DESCRIBE s3(s3_conn, filename='03631/**.parquet', format=RawBLOB) SETTINGS use_hive_partitioning=1;

src/Storages/IStorageCluster.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
573573
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
574574

575575
auto info = getQueryTreeInfo(query_info.query_tree, context);
576-
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
576+
if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/)
577577
return QueryProcessingStage::Enum::FetchColumns;
578578
}
579579

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
141141
auto files = files_generator(current_manifest_file_content);
142142
while (internal_data_index < files.size())
143143
{
144-
const auto & manifest_file_entry = files[internal_data_index++];
144+
auto & manifest_file_entry = files[internal_data_index++];
145145
if ((manifest_file_entry.schema_id != previous_entry_schema) && (use_partition_pruning))
146146
{
147147
previous_entry_schema = manifest_file_entry.schema_id;
@@ -164,7 +164,13 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
164164
switch (pruning_status)
165165
{
166166
case PruningReturnStatus::NOT_PRUNED:
167+
{
168+
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
169+
persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context);
170+
manifest_file_entry.storage_to_use = storage_to_use;
171+
manifest_file_entry.resolved_key = resolved_key;
167172
return manifest_file_entry;
173+
}
168174
case PruningReturnStatus::MIN_MAX_INDEX_PRUNED: {
169175
++min_max_index_pruned_files;
170176
break;
@@ -242,7 +248,6 @@ IcebergIterator::IcebergIterator(
242248
std::shared_ptr<SecondaryStorages> secondary_storages_)
243249
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
244250
, object_storage(std::move(object_storage_))
245-
, local_context(local_context_)
246251
, data_files_iterator(
247252
object_storage,
248253
local_context_,
@@ -336,11 +341,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
336341
Iceberg::ManifestFileEntry manifest_file_entry;
337342
if (blocking_queue.pop(manifest_file_entry))
338343
{
339-
// Resolve the data file path to get the correct storage and key
340-
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
341-
persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context);
342-
343-
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);
344+
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(
345+
manifest_file_entry, manifest_file_entry.storage_to_use, manifest_file_entry.resolved_key);
344346

345347
for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false))
346348
object_info->addPositionDeleteObject(position_delete);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ class IcebergIterator : public IObjectIterator
103103
private:
104104
std::unique_ptr<ActionsDAG> filter_dag;
105105
ObjectStoragePtr object_storage;
106-
ContextPtr local_context;
107106
Iceberg::SingleThreadIcebergKeysIterator data_files_iterator;
108107
Iceberg::SingleThreadIcebergKeysIterator deletes_iterator;
109108
ConcurrentBoundedQueue<Iceberg::ManifestFileEntry> blocking_queue;

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct ColumnInfo
2626
#include <Storages/KeyDescription.h>
2727
#include <Storages/MergeTree/KeyCondition.h>
2828
#include <Core/Field.h>
29+
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
2930

3031
#include <cstdint>
3132

@@ -84,6 +85,10 @@ struct ManifestFileEntry
8485
String file_format;
8586
std::optional<String> reference_data_file_path; // For position delete files only.
8687
std::optional<std::vector<Int32>> equality_ids;
88+
89+
// Resolved storage and key (set by SingleThreadIcebergKeysIterator)
90+
ObjectStoragePtr storage_to_use;
91+
String resolved_key;
8792
};
8893

8994
/**

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
111111
auto next_file = files.back();
112112
files.pop_back();
113113

114-
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
114+
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
115115
auto it = unprocessed_files.find(file_path);
116116
if (it == unprocessed_files.end())
117117
continue;
@@ -221,7 +221,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
221221
auto next_file = it->second.first;
222222
unprocessed_files.erase(it);
223223

224-
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
224+
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
225225
LOG_TRACE(
226226
log,
227227
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
@@ -282,8 +282,8 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
282282
replica_to_files_to_be_processed.erase(number_of_current_replica);
283283
for (const auto & file : processed_file_list_ptr->second)
284284
{
285-
auto file_replica_idx = getReplicaForFile(file->getPath());
286-
unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx));
285+
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
286+
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
287287
connection_to_files[file_replica_idx].push_back(file);
288288
}
289289
}

src/Storages/ObjectStorage/Utils.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ std::pair<ObjectStoragePtr, std::string> getOrCreateStorageAndKey(
110110

111111
configure_fn(*cfg, config_prefix);
112112

113-
auto & factory = ObjectStorageFactory::instance();
114-
ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);
113+
ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);
115114

116115
{
117116
std::lock_guard lock(secondary_storages.mutex);

src/Storages/prepareReadingFromFormat.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,12 @@ ReadFromFormatInfo prepareReadingFromFormat(
234234
}
235235

236236
/// Create header for InputFormat with columns that will be read from the data.
237-
info.format_header = storage_snapshot->getSampleBlockForColumns(info.columns_description.getNamesOfPhysical());
237+
for (const auto & column : info.columns_description)
238+
{
239+
/// Never read hive partition columns from the data file. This fixes https://github.com/ClickHouse/ClickHouse/issues/87515
240+
if (!hive_parameters.hive_partition_columns_to_read_from_file_path_map.contains(column.name))
241+
info.format_header.insert(ColumnWithTypeAndName{column.type, column.name});
242+
}
238243

239244
info.serialization_hints = getSerializationHintsForFileLikeStorage(storage_snapshot->metadata, context);
240245

tests/integration/test_database_iceberg/test.py

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ def test_cluster_joins(started_cluster):
724724
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
725725
JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2
726726
ON t1.tag=t2.id
727+
WHERE t1.tag < 10 AND t2.id < 20
727728
ORDER BY ALL
728729
SETTINGS
729730
object_storage_cluster='cluster_simple',
@@ -733,29 +734,30 @@ def test_cluster_joins(started_cluster):
733734

734735
assert res == "Jack\tSparrow\nJohn\tDow\n"
735736

736-
res = node.query(
737-
f"""
738-
SELECT name
739-
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
740-
WHERE tag in (
741-
SELECT id
742-
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
743-
)
744-
ORDER BY ALL
745-
SETTINGS
746-
object_storage_cluster='cluster_simple',
747-
object_storage_cluster_join_mode='local'
748-
"""
749-
)
750-
751-
assert res == "Jack\nJohn\n"
737+
#res = node.query(
738+
# f"""
739+
# SELECT name
740+
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
741+
# WHERE tag in (
742+
# SELECT id
743+
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
744+
# )
745+
# ORDER BY ALL
746+
# SETTINGS
747+
# object_storage_cluster='cluster_simple',
748+
# object_storage_cluster_join_mode='local'
749+
# """
750+
#)
751+
752+
#assert res == "Jack\nJohn\n"
752753

753754
res = node.query(
754755
f"""
755756
SELECT t1.name,t2.second_name
756757
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
757758
JOIN `{table_name_local}` AS t2
758759
ON t1.tag=t2.id
760+
WHERE t1.tag < 10 AND t2.id < 20
759761
ORDER BY ALL
760762
SETTINGS
761763
object_storage_cluster='cluster_simple',
@@ -765,28 +767,29 @@ def test_cluster_joins(started_cluster):
765767

766768
assert res == "Jack\tBlack\nJohn\tSilver\n"
767769

768-
res = node.query(
769-
f"""
770-
SELECT name
771-
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
772-
WHERE tag in (
773-
SELECT id
774-
FROM `{table_name_local}`
775-
)
776-
ORDER BY ALL
777-
SETTINGS
778-
object_storage_cluster='cluster_simple',
779-
object_storage_cluster_join_mode='local'
780-
"""
781-
)
782-
783-
assert res == "Jack\nJohn\n"
770+
#res = node.query(
771+
# f"""
772+
# SELECT name
773+
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
774+
# WHERE tag in (
775+
# SELECT id
776+
# FROM `{table_name_local}`
777+
# )
778+
# ORDER BY ALL
779+
# SETTINGS
780+
# object_storage_cluster='cluster_simple',
781+
# object_storage_cluster_join_mode='local'
782+
# """
783+
#)
784+
785+
#assert res == "Jack\nJohn\n"
784786

785787
res = node.query(
786788
f"""
787789
SELECT t1.name,t2.second_name
788790
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
789791
CROSS JOIN `{table_name_local}` AS t2
792+
WHERE t1.tag < 10 AND t2.id < 20
790793
ORDER BY ALL
791794
SETTINGS
792795
object_storage_cluster='cluster_simple',

0 commit comments

Comments
 (0)