Skip to content

Commit 6c43da6

Browse files
committed
try to fix bugs
1 parent eea96b2 commit 6c43da6

File tree

14 files changed

+223
-92
lines changed

14 files changed

+223
-92
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Plan getPlan(
111111
IcebergHistory snapshots_info,
112112
const PersistentTableComponents & persistent_table_components,
113113
ObjectStoragePtr object_storage,
114-
std::map<String, DB::ObjectStoragePtr> secondary_storages,
114+
std::map<String, DB::ObjectStoragePtr> & secondary_storages,
115115
StorageObjectStorageConfigurationPtr configuration,
116116
ContextPtr context,
117117
CompressionMethod compression_method)
@@ -229,12 +229,14 @@ void writeDataFiles(
229229
ObjectStoragePtr object_storage,
230230
const std::optional<FormatSettings> & format_settings,
231231
ContextPtr context,
232-
StorageObjectStorageConfigurationPtr configuration)
232+
StorageObjectStorageConfigurationPtr configuration,
233+
const String & table_location,
234+
std::map<String, ObjectStoragePtr> & secondary_storages)
233235
{
234236
for (auto & [_, data_file] : initial_plan.path_to_data_file)
235237
{
236238
auto delete_file_transform = std::make_shared<IcebergBitmapPositionDeleteTransform>(
237-
sample_block, data_file->data_object_info, object_storage, format_settings, context);
239+
sample_block, data_file->data_object_info, object_storage, format_settings, context, table_location, secondary_storages);
238240

239241
StorageObjectStorage::ObjectInfo object_info(data_file->data_object_info->getPath());
240242
auto read_buffer = createReadBuffer(object_info, object_storage, context, getLogger("IcebergCompaction"));
@@ -513,7 +515,7 @@ void compactIcebergTable(
513515
IcebergHistory snapshots_info,
514516
const PersistentTableComponents & persistent_table_components,
515517
ObjectStoragePtr object_storage_,
516-
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
518+
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
517519
StorageObjectStorageConfigurationPtr configuration_,
518520
const std::optional<FormatSettings> & format_settings_,
519521
SharedHeader sample_block_,
@@ -525,7 +527,7 @@ void compactIcebergTable(
525527
if (plan.need_optimize)
526528
{
527529
auto old_files = getOldFiles(object_storage_, configuration_);
528-
writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_);
530+
writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_, persistent_table_components.table_location, secondary_storages_);
529531
writeMetadataFiles(plan, object_storage_, configuration_, context_, sample_block_);
530532
clearOldFiles(object_storage_, old_files);
531533
}

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void compactIcebergTable(
1515
IcebergHistory snapshots_info,
1616
const PersistentTableComponents & persistent_table_components,
1717
DB::ObjectStoragePtr object_storage_,
18-
std::map<String, DB::ObjectStoragePtr> secondary_storages_,
18+
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
1919
DB::StorageObjectStorageConfigurationPtr configuration_,
2020
const std::optional<DB::FormatSettings> & format_settings_,
2121
DB::SharedHeader sample_block_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransf
8383
ObjectStoragePtr object_storage,
8484
const SharedHeader & header,
8585
const std::optional<FormatSettings> & format_settings,
86-
ContextPtr context_)
86+
ContextPtr context_,
87+
const String & table_location,
88+
std::map<String, ObjectStoragePtr> & secondary_storages)
8789
{
8890
IcebergDataObjectInfoPtr self = shared_from_this();
8991
if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value)
90-
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
92+
return std::make_shared<IcebergStreamingPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9193
else
92-
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_);
94+
return std::make_shared<IcebergBitmapPositionDeleteTransform>(header, self, object_storage, format_settings, context_, table_location, secondary_storages);
9395
}
9496
}
9597

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
3232
ObjectStoragePtr object_storage,
3333
const SharedHeader & header,
3434
const std::optional<FormatSettings> & format_settings,
35-
ContextPtr context_);
35+
ContextPtr context_,
36+
const String & table_location,
37+
std::map<String, ObjectStoragePtr> & secondary_storages);
3638

3739
void addPositionDeleteObject(Iceberg::ManifestFileEntry position_delete_object)
3840
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
125125
++manifest_file_index;
126126
continue;
127127
}
128-
std::map<String, ObjectStoragePtr> empty_secondary_storages;
129128
current_manifest_file_content = Iceberg::getManifestFile(
130129
object_storage,
131130
configuration.lock(),
@@ -135,7 +134,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
135134
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
136135
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
137136
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
138-
empty_secondary_storages);
137+
secondary_storages);
139138
internal_data_index = 0;
140139
}
141140
auto files = files_generator(current_manifest_file_content);
@@ -202,7 +201,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
202201
const ActionsDAG * filter_dag_,
203202
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
204203
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
205-
PersistentTableComponents persistent_components_)
204+
PersistentTableComponents persistent_components_,
205+
std::map<String, ObjectStoragePtr> & secondary_storages_)
206206
: object_storage(object_storage_)
207207
, filter_dag(filter_dag_ ? std::make_shared<ActionsDAG>(filter_dag_->clone()) : nullptr)
208208
, local_context(local_context_)
@@ -224,6 +224,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
224224
, persistent_components(persistent_components_)
225225
, files_generator(files_generator_)
226226
, log(getLogger("IcebergIterator"))
227+
, secondary_storages(secondary_storages_)
227228
, manifest_file_content_type(manifest_file_content_type_)
228229
{
229230
}
@@ -236,7 +237,8 @@ IcebergIterator::IcebergIterator(
236237
IDataLakeMetadata::FileProgressCallback callback_,
237238
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
238239
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
239-
PersistentTableComponents persistent_components_)
240+
PersistentTableComponents persistent_components_,
241+
std::map<String, ObjectStoragePtr> & secondary_storages_)
240242
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
241243
, object_storage(std::move(object_storage_))
242244
, local_context(local_context_)
@@ -249,7 +251,8 @@ IcebergIterator::IcebergIterator(
249251
filter_dag.get(),
250252
table_snapshot_,
251253
data_snapshot_,
252-
persistent_components_)
254+
persistent_components_,
255+
secondary_storages_)
253256
, deletes_iterator(
254257
object_storage,
255258
local_context_,
@@ -265,7 +268,8 @@ IcebergIterator::IcebergIterator(
265268
filter_dag.get(),
266269
table_snapshot_,
267270
data_snapshot_,
268-
persistent_components_)
271+
persistent_components_,
272+
secondary_storages_)
269273
, blocking_queue(100)
270274
, producer_task(local_context_->getSchedulePool().createTask(
271275
"IcebergMetaReaderThread",
@@ -305,6 +309,7 @@ IcebergIterator::IcebergIterator(
305309
, compression_method(configuration_.lock()->getCompressionMethod())
306310
, persistent_components(persistent_components_)
307311
, table_schema_id(table_snapshot_->schema_id)
312+
, secondary_storages(secondary_storages_)
308313
{
309314
auto delete_file = deletes_iterator.next();
310315
while (delete_file.has_value())
@@ -331,9 +336,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
331336
if (blocking_queue.pop(manifest_file_entry))
332337
{
333338
// Resolve the data file path to get the correct storage and key
334-
std::map<String, ObjectStoragePtr> empty_secondary_storages;
335339
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
336-
persistent_components.table_location, manifest_file_entry.file_path, object_storage, empty_secondary_storages, local_context);
340+
persistent_components.table_location, manifest_file_entry.file_path, object_storage, secondary_storages, local_context);
337341

338342
// Create IcebergDataObjectInfo with resolved storage
339343
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class SingleThreadIcebergKeysIterator
4545
const ActionsDAG * filter_dag_,
4646
IcebergTableStateSnapshotPtr table_snapshot_,
4747
IcebergDataSnapshotPtr data_snapshot_,
48-
PersistentTableComponents persistent_components);
48+
PersistentTableComponents persistent_components,
49+
std::map<String, ObjectStoragePtr> & secondary_storages_);
4950

5051
std::optional<DB::Iceberg::ManifestFileEntry> next();
5152

@@ -62,7 +63,7 @@ class SingleThreadIcebergKeysIterator
6263
PersistentTableComponents persistent_components;
6364
FilesGenerator files_generator;
6465
LoggerPtr log;
65-
66+
std::map<String, ObjectStoragePtr> & secondary_storages;
6667

6768
// By Iceberg design it is difficult to avoid storing position deletes in memory.
6869
size_t manifest_file_index = 0;
@@ -90,7 +91,8 @@ class IcebergIterator : public IObjectIterator
9091
IDataLakeMetadata::FileProgressCallback callback_,
9192
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
9293
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
93-
Iceberg::PersistentTableComponents persistent_components_);
94+
Iceberg::PersistentTableComponents persistent_components_,
95+
std::map<String, ObjectStoragePtr> & secondary_storages_);
9496

9597
ObjectInfoPtr next(size_t) override;
9698

@@ -114,6 +116,7 @@ class IcebergIterator : public IObjectIterator
114116
std::mutex exception_mutex;
115117
Iceberg::PersistentTableComponents persistent_components;
116118
Int32 table_schema_id;
119+
std::map<String, ObjectStoragePtr> & secondary_storages;
117120
};
118121
}
119122

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -635,15 +635,15 @@ std::shared_ptr<NamesAndTypesList> IcebergMetadata::getInitialSchemaByPath(Conte
635635
: nullptr;
636636
}
637637

638-
std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextPtr context_, ObjectInfoPtr object_info) const
638+
std::shared_ptr<const ActionsDAG> IcebergMetadata::getSchemaTransformer(ContextPtr local_context, ObjectInfoPtr object_info) const
639639
{
640640
IcebergDataObjectInfo * iceberg_object_info = dynamic_cast<IcebergDataObjectInfo *>(object_info.get());
641641
SharedLockGuard lock(mutex);
642642
if (!iceberg_object_info)
643643
return nullptr;
644644
return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id)
645645
? persistent_components.schema_processor->getSchemaTransformationDagByIds(
646-
context_,
646+
local_context,
647647
iceberg_object_info->underlying_format_read_schema_id,
648648
relevant_snapshot_schema_id)
649649
: nullptr;
@@ -1012,7 +1012,8 @@ ObjectIterator IcebergMetadata::iterate(
10121012
callback,
10131013
table_snapshot,
10141014
relevant_snapshot,
1015-
persistent_components);
1015+
persistent_components,
1016+
secondary_storages);
10161017
}
10171018

10181019
NamesAndTypesList IcebergMetadata::getTableSchema() const
@@ -1042,7 +1043,7 @@ void IcebergMetadata::addDeleteTransformers(
10421043
LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->position_deletes_objects.size());
10431044
builder.addSimpleTransform(
10441045
[&](const SharedHeader & header)
1045-
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context); });
1046+
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context, persistent_components.table_location, secondary_storages); });
10461047
}
10471048
const auto & delete_files = iceberg_object_info->equality_deletes_objects;
10481049
if (!delete_files.empty())
@@ -1053,9 +1054,14 @@ void IcebergMetadata::addDeleteTransformers(
10531054
{
10541055
/// get header of delete file
10551056
Block delete_file_header;
1056-
ObjectInfo delete_file_object(delete_file.file_path);
1057+
// Resolve the delete file path to get the correct storage and key
1058+
// This handles cases where delete files are outside the table location
1059+
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
1060+
persistent_components.table_location, delete_file.file_path, object_storage, secondary_storages, local_context);
1061+
1062+
PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use);
10571063
{
1058-
auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log);
1064+
auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log);
10591065
auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context);
10601066
auto columns_with_names = schema_reader->readSchema();
10611067
ColumnsWithTypeAndName initial_header_data;

src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,19 @@ void IcebergPositionDeleteTransform::initializeDeleteSources()
7070
&& position_deletes_object.reference_data_file_path != iceberg_data_path)
7171
continue;
7272

73-
auto object_path = position_deletes_object.file_path;
74-
auto object_metadata = object_storage->getObjectMetadata(object_path);
75-
auto object_info = std::make_shared<ObjectInfo>(object_path, object_metadata);
73+
/// Resolve the position delete file path to get the correct storage and key
74+
/// This handles cases where delete files are outside the table location
75+
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
76+
table_location, position_deletes_object.file_path, object_storage, secondary_storages, context);
77+
78+
auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_delete_key);
79+
PathWithMetadata delete_file_object(resolved_delete_key, object_metadata, position_deletes_object.file_path, delete_storage_to_use);
7680

7781

7882
String format = position_deletes_object.file_format;
7983
Block initial_header;
8084
{
81-
std::unique_ptr<ReadBuffer> read_buf_schema = createReadBuffer(*object_info, object_storage, context, log);
85+
std::unique_ptr<ReadBuffer> read_buf_schema = createReadBuffer(delete_file_object, delete_storage_to_use, context, log);
8286
auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context);
8387
auto columns_with_names = schema_reader->readSchema();
8488
ColumnsWithTypeAndName initial_header_data;
@@ -89,9 +93,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources()
8993
initial_header = Block(initial_header_data);
9094
}
9195

92-
CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto");
96+
CompressionMethod compression_method = chooseCompressionMethod(resolved_delete_key, "auto");
9397

94-
delete_read_buffers.push_back(createReadBuffer(*object_info, object_storage, context, log));
98+
delete_read_buffers.push_back(createReadBuffer(delete_file_object, delete_storage_to_use, context, log));
9599

96100
auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList());
97101
ExpressionAnalyzer analyzer(where_ast, syntax_result, context);

src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ class IcebergPositionDeleteTransform : public ISimpleTransform
2727
IcebergDataObjectInfoPtr iceberg_object_info_,
2828
ObjectStoragePtr object_storage_,
2929
const std::optional<FormatSettings> & format_settings_,
30-
ContextPtr context_)
30+
ContextPtr context_,
31+
const String & table_location_,
32+
std::map<String, ObjectStoragePtr> & secondary_storages_)
3133
: ISimpleTransform(header_, header_, false)
3234
, header(header_)
3335
, iceberg_object_info(iceberg_object_info_)
3436
, object_storage(object_storage_)
3537
, format_settings(format_settings_)
3638
, context(context_)
39+
, table_location(table_location_)
40+
, secondary_storages(secondary_storages_)
3741
{
3842
initializeDeleteSources();
3943
}
@@ -52,6 +56,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform
5256
const ObjectStoragePtr object_storage;
5357
const std::optional<FormatSettings> format_settings;
5458
ContextPtr context;
59+
const String table_location;
60+
std::map<String, ObjectStoragePtr> & secondary_storages;
5561

5662
/// We need to keep the read buffers alive since the delete_sources depends on them.
5763
std::vector<std::unique_ptr<ReadBuffer>> delete_read_buffers;
@@ -66,8 +72,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo
6672
IcebergDataObjectInfoPtr iceberg_object_info_,
6773
ObjectStoragePtr object_storage_,
6874
const std::optional<FormatSettings> & format_settings_,
69-
ContextPtr context_)
70-
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_)
75+
ContextPtr context_,
76+
const String & table_location_,
77+
std::map<String, ObjectStoragePtr> & secondary_storages_)
78+
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_)
7179
{
7280
initialize();
7381
}
@@ -90,8 +98,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran
9098
IcebergDataObjectInfoPtr iceberg_object_info_,
9199
ObjectStoragePtr object_storage_,
92100
const std::optional<FormatSettings> & format_settings_,
93-
ContextPtr context_)
94-
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_)
101+
ContextPtr context_,
102+
const String & table_location_,
103+
std::map<String, ObjectStoragePtr> & secondary_storages_)
104+
: IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_)
95105
{
96106
initialize();
97107
}

0 commit comments

Comments
 (0)