Skip to content

Commit bb85c02

Browse files
ksseniizvonand
authored andcommitted
Merge pull request ClickHouse#82131 from ClickHouse/fix-partiton-pruning-in-data-lake-cluster-functions
Fix partition pruning with data lake cluster functions
1 parent 5ccff48 commit bb85c02

15 files changed

+75
-19
lines changed

src/Planner/Planner.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include <Storages/StorageDistributed.h>
5353
#include <Storages/StorageDummy.h>
5454
#include <Storages/StorageMerge.h>
55+
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
5556

5657
#include <AggregateFunctions/IAggregateFunction.h>
5758

@@ -231,6 +232,11 @@ FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr &
231232
collect_filters = true;
232233
break;
233234
}
235+
if (typeid_cast<const StorageObjectStorageCluster *>(storage.get()))
236+
{
237+
collect_filters = true;
238+
break;
239+
}
234240
}
235241

236242
if (!collect_filters)

src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ void ReadFromObjectStorageStep::createIterator()
113113
auto context = getContext();
114114
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
115115
configuration, configuration->getQuerySettings(context), object_storage, distributed_processing,
116-
context, predicate, filter_actions_dag, virtual_columns, info.hive_partition_columns_to_read_from_file_path, nullptr, context->getFileProgressCallback());
116+
context, predicate, filter_actions_dag.has_value() ? &filter_actions_dag.value() : nullptr,
117+
virtual_columns, info.hive_partition_columns_to_read_from_file_path, nullptr, context->getFileProgressCallback());
117118
}
118119

119120

src/Storages/IStorageCluster.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t
117117
if (extension)
118118
return;
119119

120-
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
120+
extension = storage->getTaskIteratorExtension(
121+
predicate,
122+
filter_actions_dag.has_value() ? &filter_actions_dag.value() : query_info.filter_actions_dag.get(),
123+
context,
124+
number_of_replicas);
121125
}
122126

123127
/// The code executes on initiator

src/Storages/IStorageCluster.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,13 @@ class IStorageCluster : public IStorage
3434
size_t /*num_streams*/) override;
3535

3636
ClusterPtr getCluster(ContextPtr context) const;
37+
3738
/// Query is needed for pruning by virtual columns (_file, _path)
38-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
39+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
40+
const ActionsDAG::Node * predicate,
41+
const ActionsDAG * filter_actions_dag,
42+
const ContextPtr & context,
43+
size_t number_of_replicas) const = 0;
3944

4045
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4146

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,27 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
189189
}
190190
}
191191

192+
192193
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
193-
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
194+
const ActionsDAG::Node * predicate,
195+
const ActionsDAG * filter,
196+
const ContextPtr & local_context,
197+
const size_t number_of_replicas) const
194198
{
195199
auto iterator = StorageObjectStorageSource::createFileIterator(
196-
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
197-
local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
200+
configuration,
201+
configuration->getQuerySettings(local_context),
202+
object_storage,
203+
/* distributed_processing */false,
204+
local_context,
205+
predicate,
206+
filter,
207+
virtual_columns,
208+
hive_partition_columns_to_read_from_file_path,
209+
nullptr,
210+
local_context->getFileProgressCallback(),
211+
/*ignore_archive_globs=*/true,
212+
/*skip_object_metadata=*/true);
198213

199214
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
200215

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::string getName() const override;
2626

2727
RemoteQueryExecutor::Extension getTaskIteratorExtension(
28-
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
28+
const ActionsDAG::Node * predicate,
29+
const ActionsDAG * filter,
30+
const ContextPtr & context,
31+
size_t number_of_replicas) const override;
2932

3033
String getPathSample(ContextPtr context);
3134

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
137137
bool distributed_processing,
138138
const ContextPtr & local_context,
139139
const ActionsDAG::Node * predicate,
140-
const std::optional<ActionsDAG> & filter_actions_dag,
140+
const ActionsDAG * filter_actions_dag,
141141
const NamesAndTypesList & virtual_columns,
142142
const NamesAndTypesList & hive_columns,
143143
ObjectInfos * read_keys,
@@ -178,7 +178,7 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
178178
else if (configuration->supportsFileIterator())
179179
{
180180
return configuration->iterate(
181-
filter_actions_dag.has_value() ? &filter_actions_dag.value() : nullptr,
181+
filter_actions_dag,
182182
file_progress_callback,
183183
query_settings.list_object_keys_size,
184184
local_context);

src/Storages/ObjectStorage/StorageObjectStorageSource.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class StorageObjectStorageSource : public SourceWithKeyCondition
5757
bool distributed_processing,
5858
const ContextPtr & local_context,
5959
const ActionsDAG::Node * predicate,
60-
const std::optional<ActionsDAG> & filter_actions_dag,
60+
const ActionsDAG * filter_actions_dag,
6161
const NamesAndTypesList & virtual_columns,
6262
const NamesAndTypesList & hive_columns,
6363
ObjectInfos * read_keys,

src/Storages/StorageDistributed.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1318,7 +1318,8 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
13181318

13191319
/// Select query is needed for pruining on virtual columns
13201320
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1321-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1321+
auto extension = src_storage_cluster.getTaskIteratorExtension(
1322+
predicate, filter.has_value() ? &filter.value() : nullptr, local_context, number_of_replicas);
13221323

13231324
/// Here we take addresses from destination cluster and assume source table exists on these nodes
13241325
size_t replica_index = 0;

src/Storages/StorageFileCluster.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
9494
);
9595
}
9696

97-
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const
97+
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
98+
const ActionsDAG::Node * predicate,
99+
const ActionsDAG * /* filter */,
100+
const ContextPtr & context,
101+
const size_t) const
98102
{
99103
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context);
100104
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });

0 commit comments

Comments
 (0)