Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ ::avro::LogicalType GetMapLogicalType() {
std::call_once(flag, []() {
// Register the map logical type with the avro custom logical type registry.
// See https://github.com/apache/avro/pull/3326 for details.
::avro::CustomLogicalTypeRegistry::instance().registerType(
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
RegisterLogicalTypes();
});
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
}
Expand All @@ -73,6 +72,11 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {

} // namespace

void RegisterLogicalTypes() {
::avro::CustomLogicalTypeRegistry::instance().registerType(
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
}

std::string ToString(const ::avro::NodePtr& node) {
std::stringstream ss;
ss << *node;
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
/// \return True if the node has a map logical type, false otherwise.
bool HasMapLogicalType(const ::avro::NodePtr& node);

void RegisterLogicalTypes();

} // namespace iceberg::avro
12 changes: 7 additions & 5 deletions src/iceberg/file_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/util/string_utils.h"

namespace iceberg {

Expand Down Expand Up @@ -54,12 +55,13 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) {
}

/// \brief Convert a string to a FileFormatType
ICEBERG_EXPORT constexpr Result<FileFormatType> FileFormatTypeFromString(
ICEBERG_EXPORT inline Result<FileFormatType> FileFormatTypeFromString(
std::string_view str) noexcept {
if (str == "parquet") return FileFormatType::kParquet;
if (str == "avro") return FileFormatType::kAvro;
if (str == "orc") return FileFormatType::kOrc;
if (str == "puffin") return FileFormatType::kPuffin;
auto lower = internal::StringUtils::ToLower(str);
if (lower == "parquet") return FileFormatType::kParquet;
if (lower == "avro") return FileFormatType::kAvro;
if (lower == "orc") return FileFormatType::kOrc;
if (lower == "puffin") return FileFormatType::kPuffin;
return InvalidArgument("Invalid file format type: {}", str);
}

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition
kContent,
kFilePath,
kFileFormat,
SchemaField::MakeRequired(102, "partition", std::move(partition_type)),
SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)),
kRecordCount,
kFileSize,
kColumnSizes,
Expand Down Expand Up @@ -68,7 +68,7 @@ std::shared_ptr<StructType> ManifestEntry::TypeFromDataFileType(
std::shared_ptr<StructType> datafile_type) {
return std::make_shared<StructType>(std::vector<SchemaField>{
kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber,
SchemaField::MakeRequired(2, "data_file", std::move(datafile_type))});
SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))});
}

} // namespace iceberg
4 changes: 3 additions & 1 deletion src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ struct ICEBERG_EXPORT DataFile {
inline static const SchemaField kFilePath = SchemaField::MakeRequired(
100, "file_path", iceberg::string(), "Location URI with FS scheme");
inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
101, "file_format", iceberg::int32(), "File format name: avro, orc, or parquet");
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
inline static const std::string kPartitionField = "partition";
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
103, "record_count", iceberg::int64(), "Number of records in the file");
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
Expand Down Expand Up @@ -299,6 +300,7 @@ struct ICEBERG_EXPORT ManifestEntry {
SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
inline static const std::string kDataFileField = "data_file";

bool operator==(const ManifestEntry& other) const;

Expand Down
462 changes: 402 additions & 60 deletions src/iceberg/manifest_reader_internal.cc

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions src/iceberg/util/string_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <algorithm>
#include <ranges>
#include <string>

namespace iceberg::internal {

class StringUtils {
public:
static std::string ToLower(std::string_view str) {
std::string input(str);
// TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to
std::transform(input.begin(), input.end(), input.begin(), // NOLINT
[](char c) { return std::tolower(c); }); // NOLINT
return input;
}

static std::string ToUpper(std::string_view str) {
std::string input(str);
// TODO(xiao.dong) gcc 13.3 didn't support std::ranges::to
std::transform(input.begin(), input.end(), input.begin(), // NOLINT
[](char c) { return std::toupper(c); }); // NOLINT
return input;
}
};

} // namespace iceberg::internal
4 changes: 3 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main
add_test(NAME json_serde_test COMMAND json_serde_test)

add_executable(util_test)
target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc)
target_sources(util_test PRIVATE formatter_test.cc config_test.cc visit_type_test.cc
string_utils_test.cc)
target_link_libraries(util_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME util_test COMMAND util_test)

Expand All @@ -78,6 +79,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro_schema_test.cc
avro_stream_test.cc
manifest_list_reader_test.cc
manifest_reader_test.cc
test_common.cc)
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
Expand Down
121 changes: 121 additions & 0 deletions test/manifest_reader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/manifest_reader.h"

#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io.h"
#include "iceberg/avro/avro_reader.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/schema.h"
#include "temp_file_test_base.h"
#include "test_common.h"

namespace iceberg {

class ManifestReaderTest : public TempFileTestBase {
protected:
static void SetUpTestSuite() { avro::AvroReader::Register(); }

void SetUp() override {
TempFileTestBase::SetUp();
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);

avro::RegisterLogicalTypes();
}

std::vector<ManifestEntry> prepare_manifest_entries() {
std::vector<ManifestEntry> manifest_entries;
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/";
std::vector<std::string> paths = {
"order_ts_hour=2021-01-27-00/"
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00001.parquet",
"order_ts_hour=2024-01-27-00/"
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00002.parquet",
"order_ts_hour=2023-01-26-00/"
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00003.parquet",
"order_ts_hour=2021-01-26-00/"
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00004.parquet"};
std::vector<int64_t> partitions = {447696, 473976, 465192, 447672};
std::vector<std::map<int32_t, std::vector<uint8_t>>> bounds = {
{{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{3, {0x12, 0xe2}},
{4, {0xc0, 'y', 0xe7, 0x98, 0xd6, 0xb9, 0x05, 0x00}}},
{{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{3, {0x12, 0xe3}},
{4, {0xc0, 0x19, '#', '=', 0xe2, 0x0f, 0x06, 0x00}}},
{{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{3, {0x0e, '"'}},
{4, {0xc0, 0xd9, '7', 0x93, 0x1f, 0xf3, 0x05, 0x00}}},
{{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{3, {0x0e, '!'}},
{4, {0xc0, 0x19, 0x10, '{', 0xc2, 0xb9, 0x05, 0x00}}},
};
for (int i = 0; i < 4; ++i) {
ManifestEntry entry;
entry.status = ManifestStatus::kAdded;
entry.snapshot_id = 6387266376565973956;
entry.data_file = std::make_shared<DataFile>();
entry.data_file->file_path = test_dir_prefix + paths[i];
entry.data_file->file_format = FileFormatType::kParquet;
entry.data_file->partition.emplace_back(Literal::Int(partitions[i]));
entry.data_file->record_count = 1;
entry.data_file->file_size_in_bytes = 1375;
entry.data_file->column_sizes = {{1, 49}, {2, 49}, {3, 49}, {4, 49}};
entry.data_file->value_counts = {{1, 1}, {2, 1}, {3, 1}, {4, 1}};
entry.data_file->null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}};
entry.data_file->split_offsets = {4};
entry.data_file->sort_order_id = 0;
entry.data_file->upper_bounds = bounds[i];
entry.data_file->lower_bounds = bounds[i];
manifest_entries.emplace_back(entry);
}
return manifest_entries;
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
};

TEST_F(ManifestReaderTest, BasicTest) {
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
auto partition_schema =
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
std::string path = GetResourcePath("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro");
auto manifest_reader_result =
ManifestReader::MakeReader(path, file_io_, partition_schema);
ASSERT_EQ(manifest_reader_result.has_value(), true)
<< manifest_reader_result.error().message;
auto manifest_reader = std::move(manifest_reader_result.value());
auto read_result = manifest_reader->Entries();
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;

auto expected_entries = prepare_manifest_entries();
ASSERT_EQ(read_result.value(), expected_entries);
}

} // namespace iceberg
Binary file not shown.
44 changes: 44 additions & 0 deletions test/string_utils_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/util/string_utils.h"

#include <gtest/gtest.h>

namespace iceberg {

TEST(StringUtilsTest, ToLower) {
ASSERT_EQ(internal::StringUtils::ToLower("AbC"), "abc");
ASSERT_EQ(internal::StringUtils::ToLower("A-bC"), "a-bc");
ASSERT_EQ(internal::StringUtils::ToLower("A_bC"), "a_bc");
ASSERT_EQ(internal::StringUtils::ToLower(""), "");
ASSERT_EQ(internal::StringUtils::ToLower(" "), " ");
ASSERT_EQ(internal::StringUtils::ToLower("123"), "123");
}

TEST(StringUtilsTest, ToUpper) {
ASSERT_EQ(internal::StringUtils::ToUpper("abc"), "ABC");
ASSERT_EQ(internal::StringUtils::ToUpper("A-bC"), "A-BC");
ASSERT_EQ(internal::StringUtils::ToUpper("A_bC"), "A_BC");
ASSERT_EQ(internal::StringUtils::ToUpper(""), "");
ASSERT_EQ(internal::StringUtils::ToUpper(" "), " ");
ASSERT_EQ(internal::StringUtils::ToUpper("123"), "123");
}

} // namespace iceberg
Loading