Skip to content

Commit ac4a27d

Browse files
committed
proper plan
1 parent f026269 commit ac4a27d

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,24 @@ Plan getPlan(
185185
if (plan.partitions.size() <= partition_index)
186186
plan.partitions.push_back({});
187187

188-
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file);
188+
auto [resolved_storage, resolved_key] = resolveObjectStorageForPath(
189+
persistent_table_components.table_location, data_file.file_path, object_storage, secondary_storages, context);
190+
191+
IcebergDataObjectInfoPtr data_object_info = std::make_shared<IcebergDataObjectInfo>(data_file, resolved_storage, resolved_key);
189192
std::shared_ptr<DataFilePlan> data_file_ptr;
190-
if (!plan.path_to_data_file.contains(manifest_file.manifest_file_absolute_path))
193+
std::string storage_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace();
194+
std::string composite_key = storage_identifier + "|" + resolved_key;
195+
if (!plan.path_to_data_file.contains(composite_key))
191196
{
192197
data_file_ptr = std::make_shared<DataFilePlan>(DataFilePlan{
193198
.data_object_info = data_object_info,
194199
.manifest_list = manifest_files[manifest_file.manifest_file_absolute_path],
195200
.patched_path = plan.generator.generateDataFileName()});
196-
plan.path_to_data_file[manifest_file.manifest_file_absolute_path] = data_file_ptr;
201+
plan.path_to_data_file[composite_key] = data_file_ptr;
197202
}
198203
else
199204
{
200-
data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_absolute_path];
205+
data_file_ptr = plan.path_to_data_file[composite_key];
201206
}
202207
plan.partitions[partition_index].push_back(data_file_ptr);
203208
plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back());
@@ -238,8 +243,9 @@ void writeDataFiles(
238243
auto delete_file_transform = std::make_shared<IcebergBitmapPositionDeleteTransform>(
239244
sample_block, data_file->data_object_info, object_storage, format_settings, context, table_location, secondary_storages);
240245

246+
ObjectStoragePtr storage_to_use = data_file->data_object_info->getObjectStorage().value_or(object_storage);
241247
StorageObjectStorage::ObjectInfo object_info(data_file->data_object_info->getPath());
242-
auto read_buffer = createReadBuffer(object_info, object_storage, context, getLogger("IcebergCompaction"));
248+
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));
243249

244250
const Settings & settings = context->getSettingsRef();
245251
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(

0 commit comments

Comments
 (0)