Skip to content

Commit f9168f6

Browse files
committed
Add object_metadata
1 parent 5bba307 commit f9168f6

File tree

8 files changed

+59
-21
lines changed

8 files changed

+59
-21
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, Configura
600600
data_files = result.data_files;
601601
schema = result.schema;
602602
partition_columns = result.partition_columns;
603+
object_storage = object_storage_;
603604

604605
LOG_TRACE(impl.log, "Found {} data files, {} partition files, schema: {}",
605606
data_files.size(), partition_columns.size(), schema.toString());
@@ -712,6 +713,14 @@ Field DeltaLakeMetadata::getFieldValue(const String & value, DataTypePtr data_ty
712713
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType());
713714
}
714715

716+
ObjectIterator DeltaLakeMetadata::iterate(
717+
const ActionsDAG * filter_dag,
718+
FileProgressCallback callback,
719+
size_t /* list_batch_size */) const
720+
{
721+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
722+
}
723+
715724
}
716725

717726
#endif

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,18 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
7979
static Field getFieldValue(const String & value, DataTypePtr data_type);
8080

8181
protected:
82-
Strings getDataFiles(const ActionsDAG *) const override { return data_files; }
82+
ObjectIterator iterate(
83+
const ActionsDAG * filter_dag,
84+
FileProgressCallback callback,
85+
size_t list_batch_size) const override;
8386

8487
private:
8588
mutable Strings data_files;
8689
NamesAndTypesList schema;
8790
DeltaLakePartitionColumns partition_columns;
91+
ObjectStoragePtr object_storage;
92+
93+
Strings getDataFiles(const ActionsDAG *) const { return data_files; }
8894
};
8995

9096
}

src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,12 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
9898
return data_files;
9999
}
100100

101+
ObjectIterator HudiMetadata::iterate(
102+
const ActionsDAG * filter_dag,
103+
FileProgressCallback callback,
104+
size_t /* list_batch_size */) const
105+
{
106+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
107+
}
108+
101109
}

src/Storages/ObjectStorage/DataLakes/HudiMetadata.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,18 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
3838
}
3939

4040
protected:
41-
Strings getDataFiles(const ActionsDAG * filter_dag) const override;
41+
ObjectIterator iterate(
42+
const ActionsDAG * filter_dag,
43+
FileProgressCallback callback,
44+
size_t list_batch_size) const override;
4245

4346
private:
4447
const ObjectStoragePtr object_storage;
4548
const ConfigurationObserverPtr configuration;
4649
mutable Strings data_files;
4750

4851
Strings getDataFilesImpl() const;
52+
Strings getDataFiles(const ActionsDAG * filter_dag) const;
4953
};
5054

5155
}

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ class KeysIterator : public IObjectIterator
1212
public:
1313
KeysIterator(
1414
Strings && data_files_,
15+
ObjectStoragePtr object_storage_,
1516
IDataLakeMetadata::FileProgressCallback callback_)
1617
: data_files(data_files_)
18+
, object_storage(object_storage_)
1719
, callback(callback_)
1820
{
1921
}
@@ -32,35 +34,30 @@ class KeysIterator : public IObjectIterator
3234
return nullptr;
3335

3436
auto key = data_files[current_index];
35-
ObjectMetadata object_metadata;
36-
// auto object_metadata = object_storage->getObjectMetadata(key);
37+
auto object_metadata = object_storage->getObjectMetadata(key);
3738

38-
// if (callback)
39-
// callback(FileProgress(0, object_metadata.size_bytes));
39+
if (callback)
40+
callback(FileProgress(0, object_metadata.size_bytes));
4041

4142
return std::make_shared<ObjectInfo>(key, std::move(object_metadata));
4243
}
4344
}
4445

4546
private:
4647
Strings data_files;
48+
ObjectStoragePtr object_storage;
4749
std::atomic<size_t> index = 0;
4850
IDataLakeMetadata::FileProgressCallback callback;
4951
};
5052

5153
}
5254

53-
ObjectIterator IDataLakeMetadata::iterate(
54-
const ActionsDAG * filter_dag,
55-
FileProgressCallback callback,
56-
size_t /* list_batch_size */) const
55+
ObjectIterator IDataLakeMetadata::createKeysIterator(
56+
Strings && data_files_,
57+
ObjectStoragePtr object_storage_,
58+
IDataLakeMetadata::FileProgressCallback callback_) const
5759
{
58-
return std::make_shared<KeysIterator>(getDataFiles(filter_dag), callback);
59-
}
60-
61-
Strings IDataLakeMetadata::getDataFiles(const ActionsDAG * /* filter_dag */) const
62-
{
63-
throwNotImplemented("getDataFiles");
60+
return std::make_shared<KeysIterator>(std::move(data_files_), object_storage_, callback_);
6461
}
6562

6663
}

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class IDataLakeMetadata : boost::noncopyable
2525
virtual ObjectIterator iterate(
2626
const ActionsDAG * /* filter_dag */,
2727
FileProgressCallback /* callback */,
28-
size_t /* list_batch_size */) const;
28+
size_t /* list_batch_size */) const = 0;
2929

3030
/// Table schema from data lake metadata.
3131
virtual NamesAndTypesList getTableSchema() const = 0;
@@ -50,9 +50,10 @@ class IDataLakeMetadata : boost::noncopyable
5050
virtual std::optional<size_t> totalBytes() const { return {}; }
5151

5252
protected:
53-
/// List all data files.
54-
/// For better parallelization, iterate() method should be used.
55-
virtual Strings getDataFiles(const ActionsDAG * filter_dag) const;
53+
ObjectIterator createKeysIterator(
54+
Strings && data_files_,
55+
ObjectStoragePtr object_storage_,
56+
IDataLakeMetadata::FileProgressCallback callback_) const;
5657

5758
[[noreturn]] void throwNotImplemented(std::string_view method) const
5859
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,14 @@ std::optional<size_t> IcebergMetadata::totalBytes() const
660660
return result;
661661
}
662662

663+
ObjectIterator IcebergMetadata::iterate(
664+
const ActionsDAG * filter_dag,
665+
FileProgressCallback callback,
666+
size_t /* list_batch_size */) const
667+
{
668+
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
669+
}
670+
663671
}
664672

665673
#endif

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
8484
std::optional<size_t> totalBytes() const override;
8585

8686
protected:
87-
Strings getDataFiles(const ActionsDAG * filter_dag) const override;
87+
ObjectIterator iterate(
88+
const ActionsDAG * filter_dag,
89+
FileProgressCallback callback,
90+
size_t list_batch_size) const override;
8891

8992
private:
9093
using ManifestEntryByDataFile = std::unordered_map<String, Iceberg::ManifestFilePtr>;
@@ -114,6 +117,8 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
114117

115118
mutable std::optional<Strings> cached_unprunned_files_for_last_processed_snapshot;
116119

120+
Strings getDataFiles(const ActionsDAG * filter_dag) const;
121+
117122
void updateState(const ContextPtr & local_context);
118123

119124
void updateSnapshot();

0 commit comments

Comments
 (0)