Skip to content

Commit 8fb2aa2

Browse files
committed
Fix column indexes
1 parent dd98094 commit 8fb2aa2

File tree

6 files changed

+82
-67
lines changed

6 files changed

+82
-67
lines changed

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info)
9292
{
9393
auto column = columns->getObject(static_cast<UInt32>(i));
9494

95-
Int32 id;
96-
if (column->has("id"))
97-
id = column->get("id");
95+
std::string name;
96+
if (column->has("name"))
97+
name = column->get("name").toString();
9898
else
9999
{
100-
LOG_WARNING(log, "Can't read column id, ignored");
100+
LOG_WARNING(log, "Can't read column name, ignored");
101101
continue;
102102
}
103103

@@ -117,20 +117,38 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info)
117117
}
118118
catch (const Exception & e)
119119
{
120-
LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", id, r, e.what());
120+
LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", name, r, e.what());
121121
}
122122
}
123123

124-
columns_info[id] = column_info;
124+
columns_info[name] = column_info;
125125
}
126126
}
127127
}
128128

129-
DataFileMetaInfo::DataFileMetaInfo(const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_)
129+
DataFileMetaInfo::DataFileMetaInfo(
130+
const IcebergSchemaProcessor & schema_processor,
131+
Int32 schema_id,
132+
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_)
130133
{
134+
std::vector<Int32> column_ids;
131135
for (const auto & column : columns_info_)
136+
column_ids.push_back(column.first);
137+
auto name_and_types = schema_processor.tryGetFieldsCharacteristics(schema_id, column_ids);
138+
std::unordered_map<Int32, std::string> name_by_index;
139+
for (const auto & name_and_type : name_and_types)
132140
{
133-
columns_info[column.first] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle};
141+
const auto name = name_and_type.getNameInStorage();
142+
auto index = schema_processor.tryGetColumnIDByName(schema_id, name);
143+
if (index.has_value())
144+
name_by_index[index.value()] = name;
145+
}
146+
147+
for (const auto & column : columns_info_)
148+
{
149+
auto i_name = name_by_index.find(column.first);
150+
if (i_name != name_by_index.end())
151+
columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle};
134152
}
135153
}
136154

@@ -145,7 +163,7 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const
145163
for (const auto & column : columns_info)
146164
{
147165
Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object();
148-
column_info->set("id", column.first);
166+
column_info->set("name", column.first);
149167
if (column.second.rows_count.has_value())
150168
column_info->set("rows", column.second.rows_count.value());
151169
if (column.second.nulls_count.has_value())

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <Storages/prepareReadingFromFormat.h>
99
#include <Poco/JSON/Object.h>
1010

11+
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
12+
1113
namespace Iceberg
1214
{
1315

@@ -29,7 +31,10 @@ class DataFileMetaInfo
2931
DataFileMetaInfo() = default;
3032

3133
// Extract metadata from Iceberg structure
32-
explicit DataFileMetaInfo(const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_);
34+
explicit DataFileMetaInfo(
35+
const IcebergSchemaProcessor & schema_processor,
36+
Int32 schema_id,
37+
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_);
3338

3439
// Deserialize from json in distributed requests
3540
explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info);
@@ -44,7 +49,7 @@ class DataFileMetaInfo
4449
std::optional<DB::Range> hyperrectangle;
4550
};
4651

47-
std::unordered_map<Int32, ColumnInfo> columns_info;
52+
std::unordered_map<std::string, ColumnInfo> columns_info;
4853
};
4954

5055
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,10 @@ DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, C
11251125
{
11261126
data_files.push_back(DataFileInfo(std::get<DataFileEntry>(manifest_file_entry.file).file_name));
11271127
if (use_iceberg_read_optimization)
1128-
data_files.back().file_meta_info = std::make_shared<DataFileMetaInfo>(manifest_file_entry.columns_infos);
1128+
data_files.back().file_meta_info = std::make_shared<DataFileMetaInfo>(
1129+
schema_processor,
1130+
relevant_snapshot_schema_id,
1131+
manifest_file_entry.columns_infos);
11291132
}
11301133
}
11311134
}

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 44 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,11 @@ Chunk StorageObjectStorageSource::generate()
340340
{
341341
for (const auto & constant_column : reader.constant_columns_with_values)
342342
{
343+
LOG_DEBUG(log, "Restore constant column '{}' index {} with value '{}'",
344+
constant_column.second.name_and_type.name,
345+
constant_column.first,
346+
constant_column.second.value
347+
);
343348
chunk.addColumn(constant_column.first,
344349
constant_column.second.name_and_type.type->createColumnConst(
345350
chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst());
@@ -543,77 +548,64 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
543548
std::map<size_t, ConstColumnWithValue> constant_columns_with_values;
544549
std::unordered_set<String> constant_columns;
545550

551+
NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns;
552+
546553
std::unordered_map<String, std::pair<size_t, NameAndTypePair>> requested_columns_list;
547554
{
548555
size_t column_index = 0;
549-
for (const auto & column : read_from_format_info.requested_columns)
556+
for (const auto & column : requested_columns_copy)
550557
requested_columns_list[column.getNameInStorage()] = std::make_pair(column_index++, column);
551558
}
552559

553-
std::unordered_map<Int32, String> physical_columns_names;
554-
Int32 column_counter = 0;
555-
/// In Iceberg metadata columns' numbers starts from 1, so preincrement used
556-
for (const auto & column : read_from_format_info.physical_columns)
557-
physical_columns_names[++column_counter] = column.getNameInStorage();
558-
/// now column_counter contains maximum column index
559-
560-
NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns;
561-
562560
if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization])
563561
{
564-
auto file_meta_data = object_info->getFileMetaInfo();
565-
if (file_meta_data.has_value())
562+
auto schema = configuration->tryGetTableStructureFromMetadata();
563+
if (schema.has_value())
566564
{
567-
for (const auto & column : file_meta_data.value()->columns_info)
565+
auto file_meta_data = object_info->getFileMetaInfo();
566+
if (file_meta_data.has_value())
568567
{
569-
if (column.second.hyperrectangle.has_value())
568+
for (const auto & column : file_meta_data.value()->columns_info)
570569
{
571-
if (column.second.hyperrectangle.value().isPoint())
570+
if (column.second.hyperrectangle.has_value())
572571
{
573-
auto column_id = column.first;
574-
575-
if (column_id <= 0 || column_id > column_counter)
576-
{ /// Something wrong, ignore file metadata
577-
LOG_WARNING(log, "Incorrect column ID: {}, ignoring file metadata", column_id);
578-
constant_columns.clear();
579-
break;
572+
if (column.second.hyperrectangle.value().isPoint())
573+
{
574+
auto column_name = column.first;
575+
576+
auto i_column = requested_columns_list.find(column_name);
577+
if (i_column == requested_columns_list.end())
578+
continue;
579+
580+
/// isPoint() method checks that left==right
581+
constant_columns_with_values[i_column->second.first] =
582+
ConstColumnWithValue{
583+
i_column->second.second,
584+
column.second.hyperrectangle.value().left
585+
};
586+
constant_columns.insert(column_name);
587+
588+
LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value '{}'",
589+
object_info->getPath(),
590+
column_name,
591+
i_column->second.first,
592+
i_column->second.second.type,
593+
column.second.hyperrectangle.value().left.dump());
580594
}
581-
582-
const auto & column_name = physical_columns_names[column_id];
583-
584-
auto i_column = requested_columns_list.find(column_name);
585-
if (i_column == requested_columns_list.end())
586-
continue;
587-
588-
/// isPoint() method checks that left==right
589-
constant_columns_with_values[i_column->second.first] =
590-
ConstColumnWithValue{
591-
i_column->second.second,
592-
column.second.hyperrectangle.value().left
593-
};
594-
constant_columns.insert(column_name);
595-
596-
LOG_DEBUG(log, "In file {} constant column {} with value {}",
597-
object_info->getPath(), column_name, column.second.hyperrectangle.value().left.dump());
598595
}
599596
}
600597
}
601-
}
602598

603-
if (!constant_columns.empty())
604-
{
605-
size_t original_columns = requested_columns_copy.size();
606-
requested_columns_copy = requested_columns_copy.eraseNames(constant_columns);
607-
if (requested_columns_copy.size() + constant_columns.size() != original_columns)
599+
if (!constant_columns.empty())
608600
{
609-
LOG_WARNING(log, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]",
610-
object_info->getPath(), constant_columns);
611-
requested_columns_copy = read_from_format_info.requested_columns;
612-
constant_columns.clear();
613-
constant_columns_with_values.clear();
601+
size_t original_columns = requested_columns_copy.size();
602+
requested_columns_copy = requested_columns_copy.eraseNames(constant_columns);
603+
if (requested_columns_copy.size() + constant_columns.size() != original_columns)
604+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]",
605+
object_info->getPath(), constant_columns);
606+
if (requested_columns_copy.empty())
607+
need_only_count = true;
614608
}
615-
else if (requested_columns_copy.empty())
616-
need_only_count = true;
617609
}
618610
}
619611

src/Storages/prepareReadingFromFormat.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ ReadFromFormatInfo prepareReadingFromFormat(
8888
/// Create header for InputFormat with columns that will be read from the data.
8989
info.format_header = storage_snapshot->getSampleBlockForColumns(info.columns_description.getNamesOfPhysical());
9090
info.serialization_hints = getSerializationHintsForFileLikeStorage(storage_snapshot->metadata, context);
91-
info.physical_columns = storage_snapshot->metadata->getColumns().getAllPhysical();
9291
return info;
9392
}
9493

src/Storages/prepareReadingFromFormat.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ namespace DB
2626
SerializationInfoByName serialization_hints;
2727
/// The list of hive partition columns. It shall be read from the path regardless if it is present in the file
2828
NamesAndTypesList hive_partition_columns_to_read_from_file_path;
29-
/// The list of all physical columns is source. Required sometimes for some read optimization.
30-
NamesAndTypesList physical_columns;
3129
};
3230

3331
struct PrepareReadingFromFormatHiveParams

0 commit comments

Comments
 (0)