Skip to content

Commit 2e942f9

Browse files
committed
desperate step: add debug output to see what happens in CI
maybe I'm onto something another attempt polish
1 parent 07f9c57 commit 2e942f9

19 files changed

+134
-104
lines changed

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore
148148
{
149149
const auto & path = isArchive() ? getPathToArchive() : getPath();
150150

151-
auto storage_to_use = object_storage_to_use.value_or(object_storage);
151+
auto storage_to_use = object_storage_to_use ? object_storage_to_use : object_storage;
152152

153153
if (ignore_non_existent_file)
154154
metadata = storage_to_use->tryGetObjectMetadata(path);

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,19 @@ struct PathWithMetadata
143143
/// Retry request after short pause
144144
CommandInTaskResponse command;
145145
std::optional<String> absolute_path;
146-
std::optional<ObjectStoragePtr> object_storage_to_use = std::nullopt;
146+
ObjectStoragePtr object_storage_to_use = nullptr;
147147

148148
PathWithMetadata() = default;
149149

150150
explicit PathWithMetadata(
151151
const String & command_or_path,
152152
std::optional<ObjectMetadata> metadata_ = std::nullopt,
153153
std::optional<String> absolute_path_ = std::nullopt,
154-
std::optional<ObjectStoragePtr> object_storage_to_use_ = std::nullopt)
154+
ObjectStoragePtr object_storage_to_use_ = nullptr)
155155
: relative_path(std::move(command_or_path))
156156
, metadata(std::move(metadata_))
157157
, command(relative_path)
158-
, absolute_path(absolute_path_)
158+
, absolute_path((absolute_path_.has_value() && !absolute_path_.value().empty()) ? absolute_path_ : std::nullopt)
159159
, object_storage_to_use(object_storage_to_use_)
160160
{
161161
if (command.is_parsed())
@@ -183,7 +183,7 @@ struct PathWithMetadata
183183

184184
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true);
185185

186-
std::optional<ObjectStoragePtr> getObjectStorage() const { return object_storage_to_use; }
186+
ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; }
187187
};
188188

189189
struct ObjectKeyWithMetadata

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
5151
auto object = std::make_shared<ObjectInfo>(path);
5252
object->data_lake_metadata = data_lake_metadata;
5353
object->file_meta_info = file_meta_info;
54-
object->absolute_path = absolute_path;
54+
if (absolute_path.has_value() && !absolute_path.value().empty())
55+
object->absolute_path = absolute_path;
56+
5557
return object;
5658
}
5759

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ Plan getPlan(
112112
IcebergHistory snapshots_info,
113113
const PersistentTableComponents & persistent_table_components,
114114
ObjectStoragePtr object_storage,
115-
std::map<String, DB::ObjectStoragePtr> & secondary_storages,
115+
SecondaryStorages & secondary_storages,
116116
StorageObjectStorageConfigurationPtr configuration,
117117
ContextPtr context,
118118
CompressionMethod compression_method)
@@ -239,14 +239,16 @@ void writeDataFiles(
239239
ContextPtr context,
240240
StorageObjectStorageConfigurationPtr configuration,
241241
const String & table_location,
242-
std::map<String, ObjectStoragePtr> & secondary_storages)
242+
SecondaryStorages & secondary_storages)
243243
{
244244
for (auto & [_, data_file] : initial_plan.path_to_data_file)
245245
{
246246
auto delete_file_transform = std::make_shared<IcebergBitmapPositionDeleteTransform>(
247247
sample_block, data_file->data_object_info, object_storage, format_settings, context, table_location, secondary_storages);
248248

249-
ObjectStoragePtr storage_to_use = data_file->data_object_info->getObjectStorage().value_or(object_storage);
249+
ObjectStoragePtr storage_to_use = data_file->data_object_info->getObjectStorage();
250+
if (!storage_to_use)
251+
storage_to_use = object_storage;
250252
StorageObjectStorage::ObjectInfo object_info(data_file->data_object_info->getPath());
251253
auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction"));
252254

@@ -536,7 +538,7 @@ void compactIcebergTable(
536538
IcebergHistory snapshots_info,
537539
const PersistentTableComponents & persistent_table_components,
538540
ObjectStoragePtr object_storage_,
539-
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
541+
SecondaryStorages & secondary_storages_,
540542
StorageObjectStorageConfigurationPtr configuration_,
541543
const std::optional<FormatSettings> & format_settings_,
542544
SharedHeader sample_block_,

src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
66
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
77
#include <Storages/ObjectStorage/StorageObjectStorage.h>
8+
#include <Storages/ObjectStorage/Utils.h>
89

910

1011
namespace DB::Iceberg
@@ -15,7 +16,7 @@ void compactIcebergTable(
1516
IcebergHistory snapshots_info,
1617
const PersistentTableComponents & persistent_table_components,
1718
DB::ObjectStoragePtr object_storage_,
18-
std::map<String, DB::ObjectStoragePtr> & secondary_storages_,
19+
SecondaryStorages & secondary_storages_,
1920
DB::StorageObjectStorageConfigurationPtr configuration_,
2021
const std::optional<DB::FormatSettings> & format_settings_,
2122
DB::SharedHeader sample_block_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,15 @@ namespace Setting
3737
extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes;
3838
};
3939

40-
namespace
41-
{
42-
String toupper(String & str)
43-
{
44-
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
45-
return str;
46-
}
47-
}
4840

4941
IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_)
50-
: PathWithMetadata(data_manifest_file_entry_.file_path, std::nullopt, data_manifest_file_entry_.file_path_key)
42+
: PathWithMetadata(data_manifest_file_entry_.file_path, std::nullopt,
43+
data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key))
5144
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
5245
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
5346
, sequence_number(data_manifest_file_entry_.added_sequence_number)
5447
{
55-
if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET")
48+
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
5649
{
5750
throw Exception(
5851
ErrorCodes::NOT_IMPLEMENTED,
@@ -65,17 +58,14 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(
6558
Iceberg::ManifestFileEntry data_manifest_file_entry_,
6659
ObjectStoragePtr resolved_storage,
6760
const String & resolved_key)
68-
: PathWithMetadata(resolved_key, std::nullopt, data_manifest_file_entry_.file_path, resolved_storage)
61+
: PathWithMetadata(resolved_key, std::nullopt,
62+
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
63+
resolved_storage)
6964
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
7065
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
7166
, sequence_number(data_manifest_file_entry_.added_sequence_number)
7267
{
73-
auto toupper = [](String & str)
74-
{
75-
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
76-
return str;
77-
};
78-
if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET")
68+
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
7969
{
8070
throw Exception(
8171
ErrorCodes::NOT_IMPLEMENTED,
@@ -90,7 +80,7 @@ std::shared_ptr<ISimpleTransform> IcebergDataObjectInfo::getPositionDeleteTransf
9080
const std::optional<FormatSettings> & format_settings,
9181
ContextPtr context_,
9282
const String & table_location,
93-
std::map<String, ObjectStoragePtr> & secondary_storages)
83+
SecondaryStorages & secondary_storages)
9484
{
9585
IcebergDataObjectInfoPtr self = shared_from_this();
9686
if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value)

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
1010
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteObject.h>
11+
#include <Storages/ObjectStorage/Utils.h>
1112
#include <base/defines.h>
1213

1314

@@ -34,7 +35,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
3435
const std::optional<FormatSettings> & format_settings,
3536
ContextPtr context_,
3637
const String & table_location,
37-
std::map<String, ObjectStoragePtr> & secondary_storages);
38+
SecondaryStorages & secondary_storages);
3839

3940
void addPositionDeleteObject(Iceberg::ManifestFileEntry position_delete_object)
4041
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <IO/ReadBufferFromString.h>
2525
#include <IO/ReadHelpers.h>
2626
#include <Interpreters/Context.h>
27+
#include <Disks/DiskType.h>
2728

2829
#include <IO/CompressedReadBufferWrapper.h>
2930
#include <Interpreters/ExpressionActions.h>
@@ -134,7 +135,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
134135
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
135136
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
136137
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
137-
secondary_storages);
138+
*secondary_storages);
138139
internal_data_index = 0;
139140
}
140141
auto files = files_generator(current_manifest_file_content);
@@ -202,7 +203,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
202203
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
203204
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
204205
PersistentTableComponents persistent_components_,
205-
std::map<String, ObjectStoragePtr> & secondary_storages_)
206+
std::shared_ptr<SecondaryStorages> secondary_storages_)
206207
: object_storage(object_storage_)
207208
, filter_dag(filter_dag_ ? std::make_shared<ActionsDAG>(filter_dag_->clone()) : nullptr)
208209
, local_context(local_context_)
@@ -238,7 +239,7 @@ IcebergIterator::IcebergIterator(
238239
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
239240
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
240241
PersistentTableComponents persistent_components_,
241-
std::map<String, ObjectStoragePtr> & secondary_storages_)
242+
std::shared_ptr<SecondaryStorages> secondary_storages_)
242243
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
243244
, object_storage(std::move(object_storage_))
244245
, local_context(local_context_)
@@ -337,7 +338,7 @@ ObjectInfoPtr IcebergIterator::next(size_t)
337338
{
338339
// Resolve the data file path to get the correct storage and key
339340
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
340-
persistent_components.table_location, manifest_file_entry.file_path, object_storage, secondary_storages, local_context);
341+
persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context);
341342

342343
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);
343344

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h>
2626
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
2727
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
28+
#include <Storages/ObjectStorage/Utils.h>
2829

2930
namespace DB
3031
{
@@ -46,7 +47,7 @@ class SingleThreadIcebergKeysIterator
4647
IcebergTableStateSnapshotPtr table_snapshot_,
4748
IcebergDataSnapshotPtr data_snapshot_,
4849
PersistentTableComponents persistent_components,
49-
std::map<String, ObjectStoragePtr> & secondary_storages_);
50+
std::shared_ptr<SecondaryStorages> secondary_storages_);
5051

5152
std::optional<DB::Iceberg::ManifestFileEntry> next();
5253

@@ -63,7 +64,7 @@ class SingleThreadIcebergKeysIterator
6364
PersistentTableComponents persistent_components;
6465
FilesGenerator files_generator;
6566
LoggerPtr log;
66-
std::map<String, ObjectStoragePtr> & secondary_storages;
67+
std::shared_ptr<SecondaryStorages> secondary_storages;
6768

6869
// By Iceberg design it is difficult to avoid storing position deletes in memory.
6970
size_t manifest_file_index = 0;
@@ -92,7 +93,7 @@ class IcebergIterator : public IObjectIterator
9293
Iceberg::IcebergTableStateSnapshotPtr table_snapshot_,
9394
Iceberg::IcebergDataSnapshotPtr data_snapshot_,
9495
Iceberg::PersistentTableComponents persistent_components_,
95-
std::map<String, ObjectStoragePtr> & secondary_storages_);
96+
std::shared_ptr<SecondaryStorages> secondary_storages_);
9697

9798
ObjectInfoPtr next(size_t) override;
9899

@@ -116,7 +117,7 @@ class IcebergIterator : public IObjectIterator
116117
std::mutex exception_mutex;
117118
Iceberg::PersistentTableComponents persistent_components;
118119
Int32 table_schema_id;
119-
std::map<String, ObjectStoragePtr> & secondary_storages;
120+
std::shared_ptr<SecondaryStorages> secondary_storages; // Sometimes data or manifests can be located on another storage
120121
};
121122
}
122123

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ IcebergMetadata::IcebergMetadata(
133133
IcebergMetadataFilesCachePtr cache_ptr,
134134
CompressionMethod metadata_compression_method_)
135135
: object_storage(std::move(object_storage_))
136+
, secondary_storages(std::make_shared<SecondaryStorages>())
136137
, configuration(std::move(configuration_))
137138
, persistent_components(PersistentTableComponents{
138139
.schema_processor = std::make_shared<IcebergSchemaProcessor>(context_),
@@ -502,7 +503,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
502503
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
503504

504505
String manifest_list_path = snapshot->getValue<String>(f_manifest_list);
505-
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath(persistent_components.table_location, manifest_list_path, object_storage, secondary_storages, local_context);
506+
auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath(persistent_components.table_location, manifest_list_path, object_storage, *secondary_storages, local_context);
506507

507508
relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
508509
getManifestList(
@@ -549,7 +550,7 @@ bool IcebergMetadata::optimize(const StorageMetadataPtr & metadata_snapshot, Con
549550
snapshots_info,
550551
persistent_components,
551552
object_storage,
552-
secondary_storages,
553+
*secondary_storages,
553554
configuration_ptr,
554555
format_settings,
555556
sample_block,
@@ -930,7 +931,7 @@ std::optional<size_t> IcebergMetadata::totalRows(ContextPtr local_context) const
930931
manifest_list_entry.manifest_file_absolute_path,
931932
manifest_list_entry.added_sequence_number,
932933
manifest_list_entry.added_snapshot_id,
933-
secondary_storages);
934+
*secondary_storages);
934935
auto data_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA);
935936
auto position_deletes_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE);
936937
if (!data_count.has_value() || !position_deletes_count.has_value())
@@ -971,7 +972,7 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
971972
manifest_list_entry.manifest_file_absolute_path,
972973
manifest_list_entry.added_sequence_number,
973974
manifest_list_entry.added_snapshot_id,
974-
secondary_storages);
975+
*secondary_storages);
975976
auto count = manifest_file_ptr->getBytesCountInAllDataFilesExcludingDeleted();
976977
if (!count.has_value())
977978
return {};
@@ -1052,7 +1053,7 @@ void IcebergMetadata::addDeleteTransformers(
10521053
LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->position_deletes_objects.size());
10531054
builder.addSimpleTransform(
10541055
[&](const SharedHeader & header)
1055-
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context, persistent_components.table_location, secondary_storages); });
1056+
{ return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context, persistent_components.table_location, *secondary_storages); });
10561057
}
10571058
const auto & delete_files = iceberg_object_info->equality_deletes_objects;
10581059
if (!delete_files.empty())
@@ -1065,7 +1066,7 @@ void IcebergMetadata::addDeleteTransformers(
10651066
Block delete_file_header;
10661067

10671068
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
1068-
persistent_components.table_location, delete_file.file_path, object_storage, secondary_storages, local_context);
1069+
persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context);
10691070

10701071
PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use);
10711072
{

0 commit comments

Comments
 (0)