Skip to content

Commit 7f1cc56

Browse files
committed
works for file
1 parent cf964c0 commit 7f1cc56

File tree

16 files changed

+322
-117
lines changed

16 files changed

+322
-117
lines changed

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,7 @@ void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore
148148
{
149149
const auto & path = isArchive() ? getPathToArchive() : getPath();
150150

151-
auto storage_to_use = object_storage;
152-
153-
if (auto storage_for_file = getObjectStorage(); storage_for_file.has_value())
154-
storage_to_use = storage_for_file.value();
151+
auto storage_to_use = object_storage_to_use.value_or(object_storage);
155152

156153
if (ignore_non_existent_file)
157154
metadata = storage_to_use->tryGetObjectMetadata(path);

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct Plan
6969
IcebergHistory history;
7070
std::unordered_map<String, Int64> manifest_file_to_first_snapshot;
7171
std::unordered_map<String, std::vector<String>> manifest_list_to_manifest_files;
72+
std::unordered_map<String, std::unordered_set<Int64>> manifest_file_to_snapshots;
7273
std::unordered_map<Int64, std::vector<std::shared_ptr<DataFilePlan>>> snapshot_id_to_data_files;
7374
std::unordered_map<String, std::shared_ptr<DataFilePlan>> path_to_data_file;
7475
FileNamesGenerator generator;
@@ -111,7 +112,7 @@ Plan getPlan(
111112
IcebergHistory snapshots_info,
112113
const PersistentTableComponents & persistent_table_components,
113114
ObjectStoragePtr object_storage,
114-
std::map<String, DB::ObjectStoragePtr> secondary_storages,
115+
std::map<String, DB::ObjectStoragePtr> & secondary_storages,
115116
StorageObjectStorageConfigurationPtr configuration,
116117
ContextPtr context,
117118
CompressionMethod compression_method)
@@ -156,7 +157,9 @@ Plan getPlan(
156157
{
157158
plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path);
158159
if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path))
160+
{
159161
plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id;
162+
}
160163
auto manifest_file_content = getManifestFile(
161164
object_storage,
162165
configuration,
@@ -174,6 +177,8 @@ Plan getPlan(
174177
manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path;
175178
}
176179
manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_path);
180+
// Track which snapshots this manifest file belongs to
181+
plan.manifest_file_to_snapshots[manifest_file.manifest_file_absolute_path].insert(snapshot.snapshot_id);
177182
auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA);
178183
auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE);
179184
for (const auto & pos_delete_file : positional_delete_files)
@@ -185,19 +190,24 @@ Plan getPlan(
185190
if (plan.partitions.size() <= partition_index)
186191
plan.partitions.push_back({});
187192

188-
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file);
193+
auto [resolved_storage, resolved_key] = resolveObjectStorageForPath(
194+
persistent_table_components.table_location, data_file.file_path, object_storage, secondary_storages, context);
195+
196+
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, resolved_storage, resolved_key);
189197
std::shared_ptr<DataFilePlan> data_file_ptr;
190-
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_absolute_path))
198+
std::string storage_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace();
199+
std::string composite_key = storage_identifier + "|" + resolved_key;
200+
if (!plan.path_to_data_file.contains(composite_key))
191201
{
192202
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
193203
.data_object_info = data_object_info,
194204
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
195205
.patched_path = plan.generator.generateDataFileName()});
196-
plan.path_to_data_file[manifest_file.manifest_file_absolute_path] = data_file_ptr;
206+
plan.path_to_data_file[composite_key] = data_file_ptr;
197207
}
198208
else
199209
{
200-
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_absolute_path];
210+
data_file_ptr = plan.path_to_data_file[composite_key];
201211
}
202212
plan.partitions[partition_index].push_back(data_file_ptr);
203213
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
@@ -229,15 +239,18 @@ void writeDataFiles(
229239
ObjectStoragePtr object_storage,
230240
const std::optional<FormatSettings> & format_settings,
231241
ContextPtr context,
232-
StorageObjectStorageConfigurationPtr configuration)
242+
StorageObjectStorageConfigurationPtr configuration,
243+
const String & table_location,
244+
std::map<String, ObjectStoragePtr> & secondary_storages)
233245
{
234246
for (auto & [_, data_file] : initial_plan.path_to_data_file)
235247
{
236248
auto delete_file_transform = std::make_shared<IcebergBitmapPositionDeleteTransform>(
237-
sample_block, data_file->data_object_info, object_storage, format_settings, context);
249+
sample_block, data_file->data_object_info, object_storage, format_settings, context, table_location, secondary_storages);
238250

251+
ObjectStoragePtr storage_to_use = data_file->data_object_info->getObjectStorage().value_or(object_storage);
239252
StorageObjectStorage::ObjectInfo object_info(data_file->data_object_info->getPath());
240-
auto read_buffer = createReadBuffer(object_info, object_storage, context, getLogger("IcebergCompaction"));
253+
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));
241254

242255
const Settings & settings = context->getSettingsRef();
243256
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(
@@ -395,6 +408,9 @@ void writeMetadataFiles(
395408
{
396409
manifest_entry->patched_path = plan.generator.generateManifestEntryName();
397410
manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata;
411+
412+
std::vector<String> unique_data_filenames(data_filenames.begin(), data_filenames.end());
413+
398414
auto buffer_manifest_entry = object_storage->writeObject(
399415
StoredObject(manifest_entry->patched_path.path_in_storage),
400416
WriteMode::Rewrite,
@@ -412,7 +428,7 @@ void writeMetadataFiles(
412428
partition_columns,
413429
plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]),
414430
ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(),
415-
std::vector(data_filenames.begin(), data_filenames.end()),
431+
unique_data_filenames,
416432
manifest_entry->statistics,
417433
sample_block_,
418434
snapshot,
@@ -441,16 +457,25 @@ void writeMetadataFiles(
441457
if (plan.history[i].added_files == 0)
442458
continue;
443459

444-
auto initial_manifest_list_name = plan.history[i].manifest_list_path;
460+
auto initial_manifest_list_name = plan.history[i].manifest_list_absolute_path;
445461
auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name];
446-
auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name];
462+
auto renamed_manifest_list = manifest_list_renamings[plan.history[i].manifest_list_path];
447463
std::vector<String> renamed_manifest_entries;
464+
std::unordered_set<String> seen_manifest_entries; // Deduplicate manifest entries
448465
Int32 total_manifest_file_sizes = 0;
449466
for (const auto & initial_manifest_entry : initial_manifest_entries)
450467
{
451468
auto renamed_manifest_entry = manifest_file_renamings[initial_manifest_entry];
452469
if (!renamed_manifest_entry.empty())
453470
{
471+
auto it = plan.manifest_file_to_snapshots.find(initial_manifest_entry);
472+
if (it != plan.manifest_file_to_snapshots.end() && !it->second.contains(plan.history[i].snapshot_id))
473+
continue;
474+
475+
if (seen_manifest_entries.contains(renamed_manifest_entry))
476+
continue;
477+
478+
seen_manifest_entries.insert(renamed_manifest_entry);
454479
renamed_manifest_entries.push_back(renamed_manifest_entry);
455480
total_manifest_file_sizes += manifest_file_sizes[renamed_manifest_entry];
456481
}
@@ -513,7 +538,7 @@ void compactIcebergTable(
513538
IcebergHistory snapshots_info,
514539
const PersistentTableComponents & persistent_table_components,
515540
ObjectStoragePtr object_storage_,
516-
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
541+
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
517542
StorageObjectStorageConfigurationPtr configuration_,
518543
const std::optional<FormatSettings> & format_settings_,
519544
SharedHeader sample_block_,
@@ -525,7 +550,7 @@ void compactIcebergTable(
525550
if (plan.need_optimize)
526551
{
527552
auto old_files = getOldFiles(object_storage_, configuration_);
528-
writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_);
553+
writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_, persistent_table_components.table_location, secondary_storages_);
529554
writeMetadataFiles(plan, object_storage_, configuration_, context_, sample_block_);
530555
clearOldFiles(object_storage_, old_files);
531556
}

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void compactIcebergTable(
1515
IcebergHistory snapshots_info,
1616
const PersistentTableComponents & persistent_table_components,
1717
DB::ObjectStoragePtr object_storage_,
18-
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
18+
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
1919
DB::StorageObjectStorageConfigurationPtr configuration_,
2020
const std::optional<DB::FormatSettings> & format_settings_,
2121
DB::SharedHeader sample_block_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <Poco/String.h>
12
#include "config.h"
23

34
#if USE_AVRO
@@ -42,12 +43,7 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man
4243
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
4344
, sequence_number(data_manifest_file_entry_.added_sequence_number)
4445
{
45-
auto toupper = [](String & str)
46-
{
47-
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
48-
return str;
49-
};
50-
if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET")
46+
if (!position_deletes_objects.empty() && Poco::toUpper(data_manifest_file_entry_.file_format) != "PARQUET")
5147
{
5248
throw Exception(
5349
ErrorCodes::NOT_IMPLEMENTED,
@@ -83,13 +79,15 @@ std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransf
8379
ObjectStoragePtr object_storage,
8480
const SharedHeader & header,
8581
const std::optional<FormatSettings> & format_settings,
86-
ContextPtr context_)
82+
ContextPtr context_,
83+
const String & table_location,
84+
std::map<String, ObjectStoragePtr> & secondary_storages)
8785
{
8886
IcebergDataObjectInfoPtr self = shared_from_this();
8987
if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value)
90-
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
88+
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9189
else
92-
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
90+
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9391
}
9492
}
9593

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
3232
ObjectStoragePtr object_storage,
3333
const SharedHeader & header,
3434
const std::optional<FormatSettings> & format_settings,
35-
ContextPtr context_);
35+
ContextPtr context_,
36+
const String & table_location,
37+
std::map<String, ObjectStoragePtr> & secondary_storages);
3638

3739
void addPositionDeleteObject(Iceberg::ManifestFileEntry position_delete_object)
3840
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
125125
++manifest_file_index;
126126
continue;
127127
}
128-
std::map<String, ObjectStoragePtr> empty_secondary_storages;
129128
current_manifest_file_content = Iceberg::getManifestFile(
130129
object_storage,
131130
configuration.lock(),
@@ -135,7 +134,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
135134
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
136135
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
137136
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
138-
empty_secondary_storages);
137+
secondary_storages);
139138
internal_data_index = 0;
140139
}
141140
auto files = files_generator(current_manifest_file_content);
@@ -202,7 +201,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
202201
const ActionsDAG * filter_dag_,
203202
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
204203
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
205-
PersistentTableComponents persistent_components_)
204+
PersistentTableComponents persistent_components_,
205+
std::map<String, ObjectStoragePtr> & secondary_storages_)
206206
: object_storage(object_storage_)
207207
, filter_dag(filter_dag_ ? std::make_shared<ActionsDAG>(filter_dag_->clone()) : nullptr)
208208
, local_context(local_context_)
@@ -224,6 +224,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
224224
, persistent_components(persistent_components_)
225225
, files_generator(files_generator_)
226226
, log(getLogger("IcebergIterator"))
227+
, secondary_storages(secondary_storages_)
227228
, manifest_file_content_type(manifest_file_content_type_)
228229
{
229230
}
@@ -236,7 +237,8 @@ IcebergIterator::IcebergIterator(
236237
IDataLakeMetadata::FileProgressCallback callback_,
237238
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
238239
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
239-
PersistentTableComponents persistent_components_)
240+
PersistentTableComponents persistent_components_,
241+
std::map<String, ObjectStoragePtr> & secondary_storages_)
240242
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
241243
, object_storage(std::move(object_storage_))
242244
, local_context(local_context_)
@@ -249,7 +251,8 @@ IcebergIterator::IcebergIterator(
249251
filter_dag.get(),
250252
table_snapshot_,
251253
data_snapshot_,
252-
persistent_components_)
254+
persistent_components_,
255+
secondary_storages_)
253256
, deletes_iterator(
254257
object_storage,
255258
local_context_,
@@ -265,7 +268,8 @@ IcebergIterator::IcebergIterator(
265268
filter_dag.get(),
266269
table_snapshot_,
267270
data_snapshot_,
268-
persistent_components_)
271+
persistent_components_,
272+
secondary_storages_)
269273
, blocking_queue(100)
270274
, producer_task(local_context_->getSchedulePool().createTask(
271275
"IcebergMetaReaderThread",
@@ -305,6 +309,7 @@ IcebergIterator::IcebergIterator(
305309
, compression_method(configuration_.lock()->getCompressionMethod())
306310
, persistent_components(persistent_components_)
307311
, table_schema_id(table_snapshot_->schema_id)
312+
, secondary_storages(secondary_storages_)
308313
{
309314
auto delete_file = deletes_iterator.next();
310315
while (delete_file.has_value())
@@ -331,9 +336,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
331336
if (blocking_queue.pop(manifest_file_entry))
332337
{
333338
// Resolve the data file path to get the correct storage and key
334-
std::map<String, ObjectStoragePtr> empty_secondary_storages;
335339
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
336-
persistent_components.table_location, manifest_file_entry.file_path, object_storage, empty_secondary_storages, local_context);
340+
persistent_components.table_location, manifest_file_entry.file_path, object_storage, secondary_storages, local_context);
337341

338342
// Create IcebergDataObjectInfo with resolved storage
339343
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class SingleThreadIcebergKeysIterator
4545
const ActionsDAG * filter_dag_,
4646
IcebergTableStateSnapshotPtr table_snapshot_,
4747
IcebergDataSnapshotPtr data_snapshot_,
48-
PersistentTableComponents persistent_components);
48+
PersistentTableComponents persistent_components,
49+
std::map<String, ObjectStoragePtr> & secondary_storages_);
4950

5051
std::optional<DB::Iceberg::ManifestFileEntry> next();
5152

@@ -62,7 +63,7 @@ class SingleThreadIcebergKeysIterator
6263
PersistentTableComponents persistent_components;
6364
FilesGenerator files_generator;
6465
LoggerPtr log;
65-
66+
std::map<String, ObjectStoragePtr> & secondary_storages;
6667

6768
// By Iceberg design it is difficult to avoid storing position deletes in memory.
6869
size_t manifest_file_index = 0;
@@ -90,7 +91,8 @@ class IcebergIterator : public IObjectIterator
9091
IDataLakeMetadata::FileProgressCallback callback_,
9192
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
9293
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
93-
Iceberg::PersistentTableComponents persistent_components_);
94+
Iceberg::PersistentTableComponents persistent_components_,
95+
std::map<String, ObjectStoragePtr> & secondary_storages_);
9496

9597
ObjectInfoPtr next(size_t) override;
9698

@@ -114,6 +116,7 @@ class IcebergIterator : public IObjectIterator
114116
std::mutex exception_mutex;
115117
Iceberg::PersistentTableComponents persistent_components;
116118
Int32 table_schema_id;
119+
std::map<String, ObjectStoragePtr> & secondary_storages;
117120
};
118121
}
119122

0 commit comments

Comments
 (0)