Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ namespace iceberg {

bool ManifestEntry::operator==(const ManifestEntry& other) const {
return status == other.status && snapshot_id == other.snapshot_id &&
sequence_number == other.sequence_number &&
file_sequence_number == other.file_sequence_number &&
(data_file && other.data_file && *data_file == *other.data_file) ||
(!data_file && !other.data_file);
sequence_number == other.sequence_number &&
file_sequence_number == other.file_sequence_number &&
((data_file && other.data_file && *data_file == *other.data_file) ||
(!data_file && !other.data_file));
}

std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
if (!partition_type) {
partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
}
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
kFilePath,
Expand Down
16 changes: 9 additions & 7 deletions src/iceberg/manifest_reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,17 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
break;
case 3: {
if (view_of_file_field->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
return InvalidManifest("Field:{} should be a list.", field_name);
return InvalidManifest("Field:{} should be a struct.", field_name);
}
auto view_of_partition = view_of_file_field->children[0];
for (size_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) {
if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
break;
if (view_of_file_field->n_children > 0) {
auto view_of_partition = view_of_file_field->children[0];
for (int64_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) {
if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
break;
}
ICEBERG_RETURN_UNEXPECTED(
ParseLiteral(view_of_partition, row_idx, manifest_entries));
}
ICEBERG_RETURN_UNEXPECTED(
ParseLiteral(view_of_partition, row_idx, manifest_entries));
}
} break;
case 4:
Expand Down
92 changes: 87 additions & 5 deletions test/manifest_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@

#include "iceberg/manifest_reader.h"

#include <cstddef>

#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_reader.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/schema.h"
#include "temp_file_test_base.h"
#include "test_common.h"

namespace iceberg {

class ManifestReaderTest : public TempFileTestBase {
class ManifestReaderV1Test : public TempFileTestBase {
protected:
static void SetUpTestSuite() { avro::AvroReader::Register(); }

Expand All @@ -45,7 +46,7 @@ class ManifestReaderTest : public TempFileTestBase {
avro::RegisterLogicalTypes();
}

std::vector<ManifestEntry> prepare_manifest_entries() {
std::vector<ManifestEntry> PrepareV1ManifestEntries() {
std::vector<ManifestEntry> manifest_entries;
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/";
std::vector<std::string> paths = {
Expand Down Expand Up @@ -102,7 +103,7 @@ class ManifestReaderTest : public TempFileTestBase {
std::shared_ptr<FileIO> file_io_;
};

TEST_F(ManifestReaderTest, BasicTest) {
TEST_F(ManifestReaderV1Test, V1PartitionedBasicTest) {
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
auto partition_schema =
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
Expand All @@ -115,7 +116,88 @@ TEST_F(ManifestReaderTest, BasicTest) {
auto read_result = manifest_reader->Entries();
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;

auto expected_entries = prepare_manifest_entries();
auto expected_entries = PrepareV1ManifestEntries();
ASSERT_EQ(read_result.value(), expected_entries);
}

class ManifestReaderV2Test : public TempFileTestBase {
protected:
static void SetUpTestSuite() { avro::AvroReader::Register(); }

void SetUp() override {
TempFileTestBase::SetUp();
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);

avro::RegisterLogicalTypes();
}

std::vector<ManifestEntry> prepareV2NonPartitionedManifestEntries() {
std::vector<ManifestEntry> manifest_entries;
std::string test_dir_prefix = "/tmp/db/db/v2_manifest_non_partitioned/data/";

std::vector<std::string> paths = {
"00000-0-b0f98903-6d21-45fd-9e0b-afbd4963e365-0-00001.parquet"};

std::vector<int64_t> file_sizes = {1344};
std::vector<int64_t> record_counts = {4};

std::vector<std::map<int32_t, std::vector<uint8_t>>> lower_bounds = {
{{1, {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {'r', 'e', 'c', 'o', 'r', 'd', '_', 'f', 'o', 'u', 'r'}},
{3, {'d', 'a', 't', 'a', '_', 'c', 'o', 'n', 't', 'e', 'n', 't', '_', '1'}},
{4, {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xdc, 0x5e, 0x40}}}};

std::vector<std::map<int32_t, std::vector<uint8_t>>> upper_bounds = {
{{1, {0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
{2, {'r', 'e', 'c', 'o', 'r', 'd', '_', 't', 'w', 'o'}},
{3, {'d', 'a', 't', 'a', '_', 'c', 'o', 'n', 't', 'e', 'n', 't', '_', '4'}},
{4, {0x14, 0xae, 0x47, 0xe1, 0x7a, 0x8c, 0x7c, 0x40}}}};

manifest_entries.emplace_back(
ManifestEntry{.status = ManifestStatus::kAdded,
.snapshot_id = 679879563479918846LL,
.sequence_number = std::nullopt,
.file_sequence_number = std::nullopt,
.data_file = std::make_shared<DataFile>(
DataFile{.file_path = test_dir_prefix + paths[0],
.file_format = FileFormatType::kParquet,
.record_count = record_counts[0],
.file_size_in_bytes = file_sizes[0],
.column_sizes = {{1, 56}, {2, 73}, {3, 66}, {4, 67}},
.value_counts = {{1, 4}, {2, 4}, {3, 4}, {4, 4}},
.null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}},
.nan_value_counts = {{4, 0}},
.lower_bounds = lower_bounds[0],
.upper_bounds = upper_bounds[0],
.key_metadata = {},
.split_offsets = {4},
.equality_ids = {},
.sort_order_id = 0,
.first_row_id = std::nullopt,
.referenced_data_file = std::nullopt,
.content_offset = std::nullopt,
.content_size_in_bytes = std::nullopt})});
return manifest_entries;
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
};

TEST_F(ManifestReaderV2Test, V2NonPartitionedBasicTest) {
std::string path = GetResourcePath("2ddf1bc9-830b-4015-aced-c060df36f150-m0.avro");

auto manifest_reader_result = ManifestReader::MakeReader(path, file_io_, nullptr);
ASSERT_EQ(manifest_reader_result.has_value(), true)
<< manifest_reader_result.error().message;

auto manifest_reader = std::move(manifest_reader_result.value());
auto read_result = manifest_reader->Entries();
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;
ASSERT_EQ(read_result.value().size(), 1);

auto expected_entries = prepareV2NonPartitionedManifestEntries();
ASSERT_EQ(read_result.value(), expected_entries);
}

Expand Down
Binary file not shown.
Loading