Skip to content

Commit 5bba307

Browse files
committed
Refactoring around data lakes
1 parent 4d2e00b commit 5bba307

12 files changed

+94
-78
lines changed

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
6464
ErrorCodes::FORMAT_VERSION_TOO_OLD,
6565
"Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query.");
6666
}
67-
if (!supportsFileIterator())
68-
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
6967
}
7068
}
7169

@@ -81,14 +79,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
8179
return std::nullopt;
8280
}
8381

84-
void implementPartitionPruning(const ActionsDAG & filter_dag) override
85-
{
86-
if (!current_metadata || !current_metadata->supportsPartitionPruning())
87-
return;
88-
BaseStorageConfiguration::setPaths(current_metadata->makePartitionPruning(filter_dag));
89-
}
90-
91-
9282
std::optional<size_t> totalRows() override
9383
{
9484
if (!current_metadata)
@@ -123,20 +113,11 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
123113
ContextPtr context) override
124114
{
125115
BaseStorageConfiguration::update(object_storage, context);
126-
if (updateMetadataObjectIfNeeded(object_storage, context))
127-
{
128-
if (!supportsFileIterator())
129-
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
130-
}
131-
116+
updateMetadataObjectIfNeeded(object_storage, context);
132117
return ColumnsDescription{current_metadata->getTableSchema()};
133118
}
134119

135-
bool supportsFileIterator() const override
136-
{
137-
chassert(current_metadata);
138-
return current_metadata->supportsFileIterator();
139-
}
120+
bool supportsFileIterator() const override { return true; }
140121

141122
ObjectIterator iterate(
142123
const ActionsDAG * filter_dag,

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
4141

4242
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
4343

44-
Strings getDataFiles() const override { return data_files; }
45-
4644
NamesAndTypesList getTableSchema() const override { return schema; }
4745

4846
DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
@@ -80,6 +78,9 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
8078
static DataTypePtr getFieldValue(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable);
8179
static Field getFieldValue(const String & value, DataTypePtr data_type);
8280

81+
protected:
82+
Strings getDataFiles(const ActionsDAG *) const override { return data_files; }
83+
8384
private:
8485
mutable Strings data_files;
8586
NamesAndTypesList schema;

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
3232
return table_snapshot->update();
3333
}
3434

35-
Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
36-
{
37-
throwNotImplemented("getDataFiles()");
38-
}
39-
4035
ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
4136
const ActionsDAG * filter_dag,
4237
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
4040

4141
bool update(const ContextPtr & context) override;
4242

43-
Strings getDataFiles() const override;
44-
4543
NamesAndTypesList getTableSchema() const override;
4644

4745
NamesAndTypesList getReadSchema() const override;
@@ -61,8 +59,6 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
6159
settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]);
6260
}
6361

64-
bool supportsFileIterator() const override { return true; }
65-
6662
ObjectIterator iterate(
6763
const ActionsDAG * filter_dag,
6864
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
9191
{
9292
}
9393

94-
Strings HudiMetadata::getDataFiles() const
94+
Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
9595
{
9696
if (data_files.empty())
9797
data_files = getDataFilesImpl();

src/Storages/ObjectStorage/DataLakes/HudiMetadata.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
1919

2020
HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
2121

22-
Strings getDataFiles() const override;
23-
2422
NamesAndTypesList getTableSchema() const override { return {}; }
2523

2624
bool operator ==(const IDataLakeMetadata & other) const override
@@ -39,6 +37,9 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
3937
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
4038
}
4139

40+
protected:
41+
Strings getDataFiles(const ActionsDAG * filter_dag) const override;
42+
4243
private:
4344
const ObjectStoragePtr object_storage;
4445
const ConfigurationObserverPtr configuration;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#include "IDataLakeMetadata.h"
2+
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
3+
4+
namespace DB
5+
{
6+
7+
namespace
8+
{
9+
10+
class KeysIterator : public IObjectIterator
11+
{
12+
public:
13+
KeysIterator(
14+
Strings && data_files_,
15+
IDataLakeMetadata::FileProgressCallback callback_)
16+
: data_files(data_files_)
17+
, callback(callback_)
18+
{
19+
}
20+
21+
size_t estimatedKeysCount() override
22+
{
23+
return data_files.size();
24+
}
25+
26+
ObjectInfoPtr next(size_t) override
27+
{
28+
while (true)
29+
{
30+
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
31+
if (current_index >= data_files.size())
32+
return nullptr;
33+
34+
auto key = data_files[current_index];
35+
ObjectMetadata object_metadata;
36+
// auto object_metadata = object_storage->getObjectMetadata(key);
37+
38+
// if (callback)
39+
// callback(FileProgress(0, object_metadata.size_bytes));
40+
41+
return std::make_shared<ObjectInfo>(key, std::move(object_metadata));
42+
}
43+
}
44+
45+
private:
46+
Strings data_files;
47+
std::atomic<size_t> index = 0;
48+
IDataLakeMetadata::FileProgressCallback callback;
49+
};
50+
51+
}
52+
53+
ObjectIterator IDataLakeMetadata::iterate(
54+
const ActionsDAG * filter_dag,
55+
FileProgressCallback callback,
56+
size_t /* list_batch_size */) const
57+
{
58+
return std::make_shared<KeysIterator>(getDataFiles(filter_dag), callback);
59+
}
60+
61+
Strings IDataLakeMetadata::getDataFiles(const ActionsDAG * /* filter_dag */) const
62+
{
63+
throwNotImplemented("getDataFiles");
64+
}
65+
66+
}

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,12 @@ class IDataLakeMetadata : boost::noncopyable
2020

2121
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
2222

23-
/// List all data files.
24-
/// For better parallelization, iterate() method should be used.
25-
virtual Strings getDataFiles() const = 0;
26-
/// Whether `iterate()` method is supported for the data lake.
27-
virtual bool supportsFileIterator() const { return false; }
2823
/// Return iterator to `data files`.
2924
using FileProgressCallback = std::function<void(FileProgress)>;
3025
virtual ObjectIterator iterate(
3126
const ActionsDAG * /* filter_dag */,
3227
FileProgressCallback /* callback */,
33-
size_t /* list_batch_size */) const { throwNotImplemented("iterate()"); }
28+
size_t /* list_batch_size */) const;
3429

3530
/// Table schema from data lake metadata.
3631
virtual NamesAndTypesList getTableSchema() const = 0;
@@ -39,9 +34,6 @@ class IDataLakeMetadata : boost::noncopyable
3934
/// Return nothing if read schema is the same as table schema.
4035
virtual NamesAndTypesList getReadSchema() const { return {}; }
4136

42-
virtual bool supportsPartitionPruning() { return false; }
43-
virtual Strings makePartitionPruning(const ActionsDAG &) { throwNotImplemented("makePartitionPrunning()"); }
44-
4537
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String &) const { return {}; }
4638
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String &) const { return {}; }
4739

@@ -56,7 +48,12 @@ class IDataLakeMetadata : boost::noncopyable
5648

5749
virtual std::optional<size_t> totalRows() const { return {}; }
5850
virtual std::optional<size_t> totalBytes() const { return {}; }
51+
5952
protected:
53+
/// List all data files.
54+
/// For better parallelization, iterate() method should be used.
55+
virtual Strings getDataFiles(const ActionsDAG * filter_dag) const;
56+
6057
[[noreturn]] void throwNotImplemented(std::string_view method) const
6158
{
6259
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Method `{}` is not implemented", method);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace Setting
4949
{
5050
extern const SettingsInt64 iceberg_timestamp_ms;
5151
extern const SettingsInt64 iceberg_snapshot_id;
52+
extern const SettingsBool use_iceberg_partition_pruning;
5253
}
5354

5455

@@ -557,18 +558,23 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
557558
return manifest_list_ptr;
558559
}
559560

560-
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
561+
Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
561562
{
562563
if (!relevant_snapshot)
563564
return {};
564565

565-
if (!filter_dag && cached_unprunned_files_for_last_processed_snapshot.has_value())
566+
bool use_partition_pruning = filter_dag && getContext()->getSettingsRef()[Setting::use_iceberg_partition_pruning];
567+
568+
if (!use_partition_pruning && cached_unprunned_files_for_last_processed_snapshot.has_value())
566569
return cached_unprunned_files_for_last_processed_snapshot.value();
567570

568571
Strings data_files;
569572
for (const auto & manifest_file_ptr : *(relevant_snapshot->manifest_list))
570573
{
571-
ManifestFilesPruner pruner(schema_processor, relevant_snapshot_schema_id, filter_dag, *manifest_file_ptr, getContext());
574+
ManifestFilesPruner pruner(
575+
schema_processor, relevant_snapshot_schema_id,
576+
use_partition_pruning ? filter_dag : nullptr,
577+
*manifest_file_ptr, getContext());
572578
const auto & data_files_in_manifest = manifest_file_ptr->getFiles();
573579
for (const auto & manifest_file_entry : data_files_in_manifest)
574580
{
@@ -583,7 +589,7 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
583589
}
584590
}
585591

586-
if (!filter_dag)
592+
if (!use_partition_pruning)
587593
{
588594
cached_unprunned_files_for_last_processed_snapshot = data_files;
589595
return cached_unprunned_files_for_last_processed_snapshot.value();
@@ -592,16 +598,6 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
592598
return data_files;
593599
}
594600

595-
Strings IcebergMetadata::makePartitionPruning(const ActionsDAG & filter_dag)
596-
{
597-
auto configuration_ptr = configuration.lock();
598-
if (!configuration_ptr)
599-
{
600-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
601-
}
602-
return getDataFilesImpl(&filter_dag);
603-
}
604-
605601
std::optional<size_t> IcebergMetadata::totalRows() const
606602
{
607603
auto configuration_ptr = configuration.lock();

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
4040
Int32 format_version_,
4141
const Poco::JSON::Object::Ptr & metadata_object);
4242

43-
44-
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
45-
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
46-
/// without changing metadata file). Drops on every snapshot update.
47-
Strings getDataFiles() const override { return getDataFilesImpl(nullptr); }
48-
4943
/// Get table schema parsed from metadata.
5044
NamesAndTypesList getTableSchema() const override
5145
{
@@ -86,13 +80,12 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
8680

8781
bool update(const ContextPtr & local_context) override;
8882

89-
Strings makePartitionPruning(const ActionsDAG & filter_dag) override;
90-
91-
bool supportsPartitionPruning() override { return true; }
92-
9383
std::optional<size_t> totalRows() const override;
9484
std::optional<size_t> totalBytes() const override;
9585

86+
protected:
87+
Strings getDataFiles(const ActionsDAG * filter_dag) const override;
88+
9689
private:
9790
using ManifestEntryByDataFile = std::unordered_map<String, Iceberg::ManifestFilePtr>;
9891
using ManifestFilesStorage = std::unordered_map<String, Iceberg::ManifestFilePtr>;
@@ -140,8 +133,6 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
140133

141134
Poco::JSON::Object::Ptr readJSON(const String & metadata_file_path, const ContextPtr & local_context) const;
142135

143-
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;
144-
145136
Iceberg::ManifestFilePtr tryGetManifestFile(const String & filename) const;
146137
};
147138
}

0 commit comments

Comments
 (0)