@@ -69,6 +69,7 @@ struct Plan
6969 IcebergHistory history;
7070 std::unordered_map<String, Int64> manifest_file_to_first_snapshot;
7171 std::unordered_map<String, std::vector<String>> manifest_list_to_manifest_files;
72+ std::unordered_map<String, std::unordered_set<Int64>> manifest_file_to_snapshots;
7273 std::unordered_map<Int64, std::vector<std::shared_ptr<DataFilePlan>>> snapshot_id_to_data_files;
7374 std::unordered_map<String, std::shared_ptr<DataFilePlan>> path_to_data_file;
7475 FileNamesGenerator generator;
@@ -156,7 +157,9 @@ Plan getPlan(
156157 {
157158 plan.manifest_list_to_manifest_files [snapshot.manifest_list_absolute_path ].push_back (manifest_file.manifest_file_absolute_path );
158159 if (!plan.manifest_file_to_first_snapshot .contains (manifest_file.manifest_file_absolute_path ))
160+ {
159161 plan.manifest_file_to_first_snapshot [manifest_file.manifest_file_absolute_path ] = snapshot.snapshot_id ;
162+ }
160163 auto manifest_file_content = getManifestFile (
161164 object_storage,
162165 configuration,
@@ -174,6 +177,8 @@ Plan getPlan(
174177 manifest_files[manifest_file.manifest_file_absolute_path ]->path = manifest_file.manifest_file_absolute_path ;
175178 }
176179 manifest_files[manifest_file.manifest_file_absolute_path ]->manifest_lists_path .push_back (snapshot.manifest_list_path );
180+ // Track which snapshots this manifest file belongs to
181+ plan.manifest_file_to_snapshots [manifest_file.manifest_file_absolute_path ].insert (snapshot.snapshot_id );
177182 auto data_files = manifest_file_content->getFilesWithoutDeleted (FileContentType::DATA);
178183 auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted (FileContentType::POSITION_DELETE);
179184 for (const auto & pos_delete_file : positional_delete_files)
@@ -403,6 +408,9 @@ void writeMetadataFiles(
403408 {
404409 manifest_entry->patched_path = plan.generator .generateManifestEntryName ();
405410 manifest_file_renamings[manifest_entry->path ] = manifest_entry->patched_path .path_in_metadata ;
411+
412+ std::vector<String> unique_data_filenames (data_filenames.begin (), data_filenames.end ());
413+
406414 auto buffer_manifest_entry = object_storage->writeObject (
407415 StoredObject (manifest_entry->patched_path .path_in_storage ),
408416 WriteMode::Rewrite,
@@ -420,7 +428,7 @@ void writeMetadataFiles(
420428 partition_columns,
421429 plan.partition_encoder .getPartitionValue (grouped_by_manifest_files_partitions[manifest_entry]),
422430 ChunkPartitioner (fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes (),
423- std::vector (data_filenames. begin (), data_filenames. end ()) ,
431+ unique_data_filenames ,
424432 manifest_entry->statistics ,
425433 sample_block_,
426434 snapshot,
@@ -453,12 +461,21 @@ void writeMetadataFiles(
453461 auto initial_manifest_entries = plan.manifest_list_to_manifest_files [initial_manifest_list_name];
454462 auto renamed_manifest_list = manifest_list_renamings[plan.history [i].manifest_list_path ];
455463 std::vector<String> renamed_manifest_entries;
464+ std::unordered_set<String> seen_manifest_entries; // Deduplicate manifest entries
456465 Int32 total_manifest_file_sizes = 0 ;
457466 for (const auto & initial_manifest_entry : initial_manifest_entries)
458467 {
459468 auto renamed_manifest_entry = manifest_file_renamings[initial_manifest_entry];
460469 if (!renamed_manifest_entry.empty ())
461470 {
471+ auto it = plan.manifest_file_to_snapshots .find (initial_manifest_entry);
472+ if (it != plan.manifest_file_to_snapshots .end () && !it->second .contains (plan.history [i].snapshot_id ))
473+ continue ;
474+
475+ if (seen_manifest_entries.contains (renamed_manifest_entry))
476+ continue ;
477+
478+ seen_manifest_entries.insert (renamed_manifest_entry);
462479 renamed_manifest_entries.push_back (renamed_manifest_entry);
463480 total_manifest_file_sizes += manifest_file_sizes[renamed_manifest_entry];
464481 }
0 commit comments