Skip to content

Commit c5d68ad

Browse files
authored
Merge pull request ClickHouse#78775 from ClickHouse/refactor-code-around-data-lakes
Small refactoring around data lakes
2 parents bb33c6b + f9168f6 commit c5d68ad

13 files changed

+132
-78
lines changed

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
6464
ErrorCodes::FORMAT_VERSION_TOO_OLD,
6565
"Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query.");
6666
}
67-
if (!supportsFileIterator())
68-
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
6967
}
7068
}
7169

@@ -81,14 +79,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
8179
return std::nullopt;
8280
}
8381

84-
void implementPartitionPruning(const ActionsDAG & filter_dag) override
85-
{
86-
if (!current_metadata || !current_metadata->supportsPartitionPruning())
87-
return;
88-
BaseStorageConfiguration::setPaths(current_metadata->makePartitionPruning(filter_dag));
89-
}
90-
91-
9282
std::optional<size_t> totalRows() override
9383
{
9484
if (!current_metadata)
@@ -123,20 +113,11 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
123113
ContextPtr context) override
124114
{
125115
BaseStorageConfiguration::update(object_storage, context);
126-
if (updateMetadataObjectIfNeeded(object_storage, context))
127-
{
128-
if (!supportsFileIterator())
129-
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
130-
}
131-
116+
updateMetadataObjectIfNeeded(object_storage, context);
132117
return ColumnsDescription{current_metadata->getTableSchema()};
133118
}
134119

135-
bool supportsFileIterator() const override
136-
{
137-
chassert(current_metadata);
138-
return current_metadata->supportsFileIterator();
139-
}
120+
bool supportsFileIterator() const override { return true; }
140121

141122
ObjectIterator iterate(
142123
const ActionsDAG * filter_dag,

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, Configura
600600
data_files = result.data_files;
601601
schema = result.schema;
602602
partition_columns = result.partition_columns;
603+
object_storage = object_storage_;
603604

604605
LOG_TRACE(impl.log, "Found {} data files, {} partition files, schema: {}",
605606
data_files.size(), partition_columns.size(), schema.toString());
@@ -712,6 +713,14 @@ Field DeltaLakeMetadata::getFieldValue(const String & value, DataTypePtr data_ty
712713
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType());
713714
}
714715

716+
ObjectIterator DeltaLakeMetadata::iterate(
717+
const ActionsDAG * filter_dag,
718+
FileProgressCallback callback,
719+
size_t /* list_batch_size */) const
720+
{
721+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
722+
}
723+
715724
}
716725

717726
#endif

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
4141

4242
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
4343

44-
Strings getDataFiles() const override { return data_files; }
45-
4644
NamesAndTypesList getTableSchema() const override { return schema; }
4745

4846
DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
@@ -80,10 +78,19 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
8078
static DataTypePtr getFieldValue(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable);
8179
static Field getFieldValue(const String & value, DataTypePtr data_type);
8280

81+
protected:
82+
ObjectIterator iterate(
83+
const ActionsDAG * filter_dag,
84+
FileProgressCallback callback,
85+
size_t list_batch_size) const override;
86+
8387
private:
8488
mutable Strings data_files;
8589
NamesAndTypesList schema;
8690
DeltaLakePartitionColumns partition_columns;
91+
ObjectStoragePtr object_storage;
92+
93+
Strings getDataFiles(const ActionsDAG *) const { return data_files; }
8794
};
8895

8996
}

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
3232
return table_snapshot->update();
3333
}
3434

35-
Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
36-
{
37-
throwNotImplemented("getDataFiles()");
38-
}
39-
4035
ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
4136
const ActionsDAG * filter_dag,
4237
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
4040

4141
bool update(const ContextPtr & context) override;
4242

43-
Strings getDataFiles() const override;
44-
4543
NamesAndTypesList getTableSchema() const override;
4644

4745
NamesAndTypesList getReadSchema() const override;
@@ -61,8 +59,6 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
6159
settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]);
6260
}
6361

64-
bool supportsFileIterator() const override { return true; }
65-
6662
ObjectIterator iterate(
6763
const ActionsDAG * filter_dag,
6864
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
9191
{
9292
}
9393

94-
Strings HudiMetadata::getDataFiles() const
94+
Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
9595
{
9696
if (data_files.empty())
9797
data_files = getDataFilesImpl();
9898
return data_files;
9999
}
100100

101+
ObjectIterator HudiMetadata::iterate(
102+
const ActionsDAG * filter_dag,
103+
FileProgressCallback callback,
104+
size_t /* list_batch_size */) const
105+
{
106+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
107+
}
108+
101109
}

src/Storages/ObjectStorage/DataLakes/HudiMetadata.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
1919

2020
HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
2121

22-
Strings getDataFiles() const override;
23-
2422
NamesAndTypesList getTableSchema() const override { return {}; }
2523

2624
bool operator ==(const IDataLakeMetadata & other) const override
@@ -39,12 +37,19 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
3937
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
4038
}
4139

40+
protected:
41+
ObjectIterator iterate(
42+
const ActionsDAG * filter_dag,
43+
FileProgressCallback callback,
44+
size_t list_batch_size) const override;
45+
4246
private:
4347
const ObjectStoragePtr object_storage;
4448
const ConfigurationObserverPtr configuration;
4549
mutable Strings data_files;
4650

4751
Strings getDataFilesImpl() const;
52+
Strings getDataFiles(const ActionsDAG * filter_dag) const;
4853
};
4954

5055
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include "IDataLakeMetadata.h"
2+
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
3+
4+
namespace DB
5+
{
6+
7+
namespace
8+
{
9+
10+
class KeysIterator : public IObjectIterator
11+
{
12+
public:
13+
KeysIterator(
14+
Strings && data_files_,
15+
ObjectStoragePtr object_storage_,
16+
IDataLakeMetadata::FileProgressCallback callback_)
17+
: data_files(data_files_)
18+
, object_storage(object_storage_)
19+
, callback(callback_)
20+
{
21+
}
22+
23+
size_t estimatedKeysCount() override
24+
{
25+
return data_files.size();
26+
}
27+
28+
ObjectInfoPtr next(size_t) override
29+
{
30+
while (true)
31+
{
32+
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
33+
if (current_index >= data_files.size())
34+
return nullptr;
35+
36+
auto key = data_files[current_index];
37+
auto object_metadata = object_storage->getObjectMetadata(key);
38+
39+
if (callback)
40+
callback(FileProgress(0, object_metadata.size_bytes));
41+
42+
return std::make_shared<ObjectInfo>(key, std::move(object_metadata));
43+
}
44+
}
45+
46+
private:
47+
Strings data_files;
48+
ObjectStoragePtr object_storage;
49+
std::atomic<size_t> index = 0;
50+
IDataLakeMetadata::FileProgressCallback callback;
51+
};
52+
53+
}
54+
55+
ObjectIterator IDataLakeMetadata::createKeysIterator(
56+
Strings && data_files_,
57+
ObjectStoragePtr object_storage_,
58+
IDataLakeMetadata::FileProgressCallback callback_) const
59+
{
60+
return std::make_shared<KeysIterator>(std::move(data_files_), object_storage_, callback_);
61+
}
62+
63+
}

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,12 @@ class IDataLakeMetadata : boost::noncopyable
2020

2121
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
2222

23-
/// List all data files.
24-
/// For better parallelization, iterate() method should be used.
25-
virtual Strings getDataFiles() const = 0;
26-
/// Whether `iterate()` method is supported for the data lake.
27-
virtual bool supportsFileIterator() const { return false; }
2823
/// Return iterator to `data files`.
2924
using FileProgressCallback = std::function<void(FileProgress)>;
3025
virtual ObjectIterator iterate(
3126
const ActionsDAG * /* filter_dag */,
3227
FileProgressCallback /* callback */,
33-
size_t /* list_batch_size */) const { throwNotImplemented("iterate()"); }
28+
size_t /* list_batch_size */) const = 0;
3429

3530
/// Table schema from data lake metadata.
3631
virtual NamesAndTypesList getTableSchema() const = 0;
@@ -39,9 +34,6 @@ class IDataLakeMetadata : boost::noncopyable
3934
/// Return nothing if read schema is the same as table schema.
4035
virtual NamesAndTypesList getReadSchema() const { return {}; }
4136

42-
virtual bool supportsPartitionPruning() { return false; }
43-
virtual Strings makePartitionPruning(const ActionsDAG &) { throwNotImplemented("makePartitionPrunning()"); }
44-
4537
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String &) const { return {}; }
4638
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String &) const { return {}; }
4739

@@ -56,7 +48,13 @@ class IDataLakeMetadata : boost::noncopyable
5648

5749
virtual std::optional<size_t> totalRows() const { return {}; }
5850
virtual std::optional<size_t> totalBytes() const { return {}; }
51+
5952
protected:
53+
ObjectIterator createKeysIterator(
54+
Strings && data_files_,
55+
ObjectStoragePtr object_storage_,
56+
IDataLakeMetadata::FileProgressCallback callback_) const;
57+
6058
[[noreturn]] void throwNotImplemented(std::string_view method) const
6159
{
6260
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Method `{}` is not implemented", method);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace Setting
4949
{
5050
extern const SettingsInt64 iceberg_timestamp_ms;
5151
extern const SettingsInt64 iceberg_snapshot_id;
52+
extern const SettingsBool use_iceberg_partition_pruning;
5253
}
5354

5455

@@ -571,18 +572,23 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
571572
return manifest_list_ptr;
572573
}
573574

574-
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
575+
Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
575576
{
576577
if (!relevant_snapshot)
577578
return {};
578579

579-
if (!filter_dag && cached_unprunned_files_for_last_processed_snapshot.has_value())
580+
bool use_partition_pruning = filter_dag && getContext()->getSettingsRef()[Setting::use_iceberg_partition_pruning];
581+
582+
if (!use_partition_pruning && cached_unprunned_files_for_last_processed_snapshot.has_value())
580583
return cached_unprunned_files_for_last_processed_snapshot.value();
581584

582585
Strings data_files;
583586
for (const auto & manifest_file_ptr : *(relevant_snapshot->manifest_list))
584587
{
585-
ManifestFilesPruner pruner(schema_processor, relevant_snapshot_schema_id, filter_dag, *manifest_file_ptr, getContext());
588+
ManifestFilesPruner pruner(
589+
schema_processor, relevant_snapshot_schema_id,
590+
use_partition_pruning ? filter_dag : nullptr,
591+
*manifest_file_ptr, getContext());
586592
const auto & data_files_in_manifest = manifest_file_ptr->getFiles();
587593
for (const auto & manifest_file_entry : data_files_in_manifest)
588594
{
@@ -597,7 +603,7 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
597603
}
598604
}
599605

600-
if (!filter_dag)
606+
if (!use_partition_pruning)
601607
{
602608
cached_unprunned_files_for_last_processed_snapshot = data_files;
603609
return cached_unprunned_files_for_last_processed_snapshot.value();
@@ -606,16 +612,6 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
606612
return data_files;
607613
}
608614

609-
Strings IcebergMetadata::makePartitionPruning(const ActionsDAG & filter_dag)
610-
{
611-
auto configuration_ptr = configuration.lock();
612-
if (!configuration_ptr)
613-
{
614-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
615-
}
616-
return getDataFilesImpl(&filter_dag);
617-
}
618-
619615
std::optional<size_t> IcebergMetadata::totalRows() const
620616
{
621617
auto configuration_ptr = configuration.lock();
@@ -678,6 +674,14 @@ std::optional<size_t> IcebergMetadata::totalBytes() const
678674
return result;
679675
}
680676

677+
ObjectIterator IcebergMetadata::iterate(
678+
const ActionsDAG * filter_dag,
679+
FileProgressCallback callback,
680+
size_t /* list_batch_size */) const
681+
{
682+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
683+
}
684+
681685
}
682686

683687
#endif

0 commit comments

Comments
 (0)