Skip to content

Commit 8d5ee53

Browse files
committed
Cleanup
1 parent 28c8048 commit 8d5ee53

File tree

11 files changed

+264
-389
lines changed

11 files changed

+264
-389
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ Plan getPlan(
177177
manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path;
178178
}
179179
manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
180-
// Track which snapshots this manifest file belongs to
181180
plan.manifest_file_to_snapshots[manifest_file.manifest_file_absolute_path].insert(snapshot.snapshot_id);
182181
auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA);
183182
auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE);
@@ -195,19 +194,18 @@ Plan getPlan(
195194

196195
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, resolved_storage, resolved_key);
197196
std::shared_ptr<DataFilePlan> data_file_ptr;
198-
std::string storage_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace();
199-
std::string composite_key = storage_identifier + "|" + resolved_key;
200-
if (!plan.path_to_data_file.contains(composite_key))
197+
std::string path_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace() + "|" + resolved_key;
198+
if (!plan.path_to_data_file.contains(path_identifier))
201199
{
202200
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
203201
.data_object_info = data_object_info,
204202
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
205203
.patched_path = plan.generator.generateDataFileName()});
206-
plan.path_to_data_file[composite_key] = data_file_ptr;
204+
plan.path_to_data_file[path_identifier] = data_file_ptr;
207205
}
208206
else
209207
{
210-
data_file_ptr = plan.path_to_data_file[composite_key];
208+
data_file_ptr = plan.path_to_data_file[path_identifier];
211209
}
212210
plan.partitions[partition_index].push_back(data_file_ptr);
213211
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,22 @@ namespace Setting
3737
extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes;
3838
};
3939

40+
namespace
41+
{
42+
String toupper(String & str)
43+
{
44+
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
45+
return str;
46+
}
47+
}
48+
4049
IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_)
4150
: PathWithMetadata(data_manifest_file_entry_.file_path, std::nullopt, data_manifest_file_entry_.file_path_key)
4251
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
4352
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
4453
, sequence_number(data_manifest_file_entry_.added_sequence_number)
4554
{
46-
if (!position_deletes_objects.empty() && Poco::toUpper(data_manifest_file_entry_.file_format) != "PARQUET")
55+
if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET")
4756
{
4857
throw Exception(
4958
ErrorCodes::NOT_IMPLEMENTED,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
2222
/// It is also used to create a filter for the data object in the position delete transform.
2323
explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_);
2424

25-
/// Constructor with resolved storage and key for files that may be outside table location
25+
/// Sometimes data files are located outside the table location and even in a different storage.
2626
explicit IcebergDataObjectInfo(
2727
Iceberg::ManifestFileEntry data_manifest_file_entry_,
2828
ObjectStoragePtr resolved_storage,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
135135
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
136136
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
137137
secondary_storages);
138-
current_files = files_generator(current_manifest_file_content);
139138
internal_data_index = 0;
140139
}
141-
while (internal_data_index < current_files.size())
140+
auto files = files_generator(current_manifest_file_content);
141+
while (internal_data_index < files.size())
142142
{
143-
const auto & manifest_file_entry = current_files[internal_data_index++];
143+
const auto & manifest_file_entry = files[internal_data_index++];
144144
if ((manifest_file_entry.schema_id != previous_entry_schema) && (use_partition_pruning))
145145
{
146146
previous_entry_schema = manifest_file_entry.schema_id;
@@ -175,7 +175,6 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
175175
}
176176
}
177177
current_manifest_file_content = nullptr;
178-
current_files.clear();
179178
current_pruner = std::nullopt;
180179
++manifest_file_index;
181180
internal_data_index = 0;
@@ -340,16 +339,14 @@ ObjectInfoPtr IcebergIterator::next(size_t)
340339
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
341340
persistent_components.table_location, manifest_file_entry.file_path, object_storage, secondary_storages, local_context);
342341

343-
// Create IcebergDataObjectInfo with resolved storage
344342
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);
343+
345344
for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false))
346-
{
347345
object_info->addPositionDeleteObject(position_delete);
348-
}
346+
349347
for (const auto & equality_delete : defineDeletesSpan(manifest_file_entry, equality_deletes_files, true))
350-
{
351348
object_info->addEqualityDeleteObject(equality_delete);
352-
}
349+
353350
object_info->setFileMetaInfo(std::make_shared<DataFileMetaInfo>(
354351
*persistent_components.schema_processor,
355352
table_schema_id, /// current schema id to use current column names

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ class SingleThreadIcebergKeysIterator
6969
size_t manifest_file_index = 0;
7070
size_t internal_data_index = 0;
7171
Iceberg::ManifestFilePtr current_manifest_file_content;
72-
std::vector<ManifestFileEntry> current_files;
7372
Int32 previous_entry_schema = -1;
7473
std::optional<Iceberg::ManifestFilesPruner> current_pruner;
7574

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,8 +1063,7 @@ void IcebergMetadata::addDeleteTransformers(
10631063
{
10641064
/// get header of delete file
10651065
Block delete_file_header;
1066-
// Resolve the delete file path to get the correct storage and key
1067-
// This handles cases where delete files are outside the table location
1066+
10681067
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
10691068
persistent_components.table_location, delete_file.file_path, object_storage, secondary_storages, local_context);
10701069

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable
7474
size_t total_size = 0;
7575
for (const auto & entry: manifest_file_cache_keys)
7676
{
77-
total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_absolute_path.capacity();
77+
total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_path.capacity() + entry.manifest_file_absolute_path.capacity();
7878
}
7979
return total_size;
8080
}

src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ Iceberg::ManifestFilePtr getManifestFile(
8585

8686
auto create_fn = [&, use_iceberg_metadata_cache]()
8787
{
88-
// Resolve the absolute path to get the correct storage and key_in_storage
8988
auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath(
9089
persistent_table_components.table_location, absolute_path, object_storage, secondary_storages, local_context);
9190

src/Storages/ObjectStorage/StorageObjectStorageSource.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri
179179
std::atomic_size_t index = 0;
180180
bool is_archive;
181181
ObjectStoragePtr object_storage;
182-
mutable std::map<String, ObjectStoragePtr> secondary_storages; // Sometimes data can be located on a different storage
182+
std::map<String, ObjectStoragePtr> secondary_storages; // Sometimes data can be located on a different storage
183183
/// path_to_archive -> archive reader.
184184
std::unordered_map<std::string, std::shared_ptr<IArchiveReader>> archive_readers;
185185
std::mutex archive_readers_mutex;

0 commit comments

Comments
 (0)