Skip to content

Commit 170e1f8

Browse files
committed
try to fix deletes
1 parent b078e30 commit 170e1f8

File tree

5 files changed

+41
-18
lines changed

5 files changed

+41
-18
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransf
8383
ObjectStoragePtr object_storage,
8484
const SharedHeader & header,
8585
const std::optional<FormatSettings> & format_settings,
86-
ContextPtr context_)
86+
ContextPtr context_,
87+
const String & table_location,
88+
std::map<String, ObjectStoragePtr> & secondary_storages)
8789
{
8890
IcebergDataObjectInfoPtr self = shared_from_this();
8991
if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value)
90-
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
92+
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9193
else
92-
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
94+
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9395
}
9496
}
9597

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
3232
ObjectStoragePtr object_storage,
3333
const SharedHeader & header,
3434
const std::optional<FormatSettings> & format_settings,
35-
ContextPtr context_);
35+
ContextPtr context_,
36+
const String & table_location,
37+
std::map<String, ObjectStoragePtr> & secondary_storages);
3638

3739
void addPositionDeleteObject(Iceberg::ManifestFileEntry position_delete_object)
3840
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ void IcebergMetadata::addDeleteTransformers(
10421042
LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->position_deletes_objects.size());
10431043
builder.addSimpleTransform(
10441044
[&](const SharedHeader & header)
1045-
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context); });
1045+
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context, persistent_components.table_location, secondary_storages); });
10461046
}
10471047
const auto & delete_files = iceberg_object_info->equality_deletes_objects;
10481048
if (!delete_files.empty())
@@ -1053,9 +1053,14 @@ void IcebergMetadata::addDeleteTransformers(
10531053
{
10541054
/// get header of delete file
10551055
Block delete_file_header;
1056-
ObjectInfo delete_file_object(delete_file.file_path);
1056+
// Resolve the delete file path to get the correct storage and key
1057+
// This handles cases where delete files are outside the table location
1058+
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
1059+
persistent_components.table_location, delete_file.file_path, object_storage, secondary_storages, local_context);
1060+
1061+
PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use);
10571062
{
1058-
auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log);
1063+
auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log);
10591064
auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context);
10601065
auto columns_with_names = schema_reader->readSchema();
10611066
ColumnsWithTypeAndName initial_header_data;

src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,19 @@ void IcebergPositionDeleteTransform::initializeDeleteSources()
7070
&& position_deletes_object.reference_data_file_path != iceberg_data_path)
7171
continue;
7272

73-
auto object_path = position_deletes_object.file_path;
74-
auto object_metadata = object_storage->getObjectMetadata(object_path);
75-
auto object_info = std::make_shared<ObjectInfo>(object_path, object_metadata);
73+
/// Resolve the position delete file path to get the correct storage and key
74+
/// This handles cases where delete files are outside the table location
75+
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
76+
table_location, position_deletes_object.file_path, object_storage, secondary_storages, context);
77+
78+
auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_delete_key);
79+
PathWithMetadata delete_file_object(resolved_delete_key, object_metadata, position_deletes_object.file_path, delete_storage_to_use);
7680

7781

7882
String format = position_deletes_object.file_format;
7983
Block initial_header;
8084
{
81-
std::unique_ptr<ReadBuffer> read_buf_schema = createReadBuffer(*object_info, object_storage, context, log);
85+
std::unique_ptr<ReadBuffer> read_buf_schema = createReadBuffer(delete_file_object, delete_storage_to_use, context, log);
8286
auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context);
8387
auto columns_with_names = schema_reader->readSchema();
8488
ColumnsWithTypeAndName initial_header_data;
@@ -89,9 +93,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources()
8993
initial_header = Block(initial_header_data);
9094
}
9195

92-
CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto");
96+
CompressionMethod compression_method = chooseCompressionMethod(resolved_delete_key, "auto");
9397

94-
delete_read_buffers.push_back(createReadBuffer(*object_info, object_storage, context, log));
98+
delete_read_buffers.push_back(createReadBuffer(delete_file_object, delete_storage_to_use, context, log));
9599

96100
auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList());
97101
ExpressionAnalyzer analyzer(where_ast, syntax_result, context);

src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ class IcebergPositionDeleteTransform : public ISimpleTransform
2727
IcebergDataObjectInfoPtr iceberg_object_info_,
2828
ObjectStoragePtr object_storage_,
2929
const std::optional<FormatSettings> & format_settings_,
30-
ContextPtr context_)
30+
ContextPtr context_,
31+
const String & table_location_,
32+
std::map<String, ObjectStoragePtr> & secondary_storages_)
3133
: ISimpleTransform(header_, header_, false)
3234
, header(header_)
3335
, iceberg_object_info(iceberg_object_info_)
3436
, object_storage(object_storage_)
3537
, format_settings(format_settings_)
3638
, context(context_)
39+
, table_location(table_location_)
40+
, secondary_storages(secondary_storages_)
3741
{
3842
initializeDeleteSources();
3943
}
@@ -52,6 +56,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform
5256
const ObjectStoragePtr object_storage;
5357
const std::optional<FormatSettings> format_settings;
5458
ContextPtr context;
59+
const String table_location;
60+
std::map<String, ObjectStoragePtr> & secondary_storages;
5561

5662
/// We need to keep the read buffers alive since the delete_sources depends on them.
5763
std::vector<std::unique_ptr<ReadBuffer>> delete_read_buffers;
@@ -66,8 +72,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo
6672
IcebergDataObjectInfoPtr iceberg_object_info_,
6773
ObjectStoragePtr object_storage_,
6874
const std::optional<FormatSettings> & format_settings_,
69-
ContextPtr context_)
70-
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_)
75+
ContextPtr context_,
76+
const String & table_location_,
77+
std::map<String, ObjectStoragePtr> & secondary_storages_)
78+
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_)
7179
{
7280
initialize();
7381
}
@@ -90,8 +98,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran
9098
IcebergDataObjectInfoPtr iceberg_object_info_,
9199
ObjectStoragePtr object_storage_,
92100
const std::optional<FormatSettings> & format_settings_,
93-
ContextPtr context_)
94-
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_)
101+
ContextPtr context_,
102+
const String & table_location_,
103+
std::map<String, ObjectStoragePtr> & secondary_storages_)
104+
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_)
95105
{
96106
initialize();
97107
}

0 commit comments

Comments
 (0)