Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/file_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class ICEBERG_EXPORT FileFormatType {
kAvro,
kOrc,
kPuffin,
kUnknown = 99
};

/// \brief Convert a FileFormatType to a string
Expand Down
26 changes: 26 additions & 0 deletions src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@

namespace iceberg {

bool DataFile::operator==(const iceberg::DataFile& other) const {
return content == other.content && file_path == other.file_path &&
file_format == other.file_format && partition <=> other.partition == 0 &&
record_count == other.record_count &&
file_size_in_bytes == other.file_size_in_bytes &&
column_sizes == other.column_sizes && value_counts == other.value_counts &&
null_value_counts == other.null_value_counts &&
nan_value_counts == other.nan_value_counts &&
lower_bounds == other.lower_bounds && upper_bounds == other.upper_bounds &&
key_metadata == other.key_metadata && split_offsets == other.split_offsets &&
equality_ids == other.equality_ids && sort_order_id == other.sort_order_id &&
partition_spec_id == other.partition_spec_id &&
first_row_id == other.first_row_id &&
referenced_data_file == other.referenced_data_file &&
content_offset == other.content_offset &&
content_size_in_bytes == other.content_size_in_bytes;
}

bool ManifestEntry::operator==(const iceberg::ManifestEntry& other) const {
return status == other.status && snapshot_id == other.snapshot_id &&
sequence_number == other.sequence_number &&
file_sequence_number == other.file_sequence_number &&
(data_file && other.data_file && *data_file == *other.data_file) ||
(!data_file && !other.data_file);
}

std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
Expand Down
12 changes: 8 additions & 4 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ struct ICEBERG_EXPORT DataFile {
/// Field id: 134
/// Type of content stored by the data file: data, equality deletes, or position
/// deletes (all v1 files are data files)
Content content;
Content content = Content::kData;
/// Field id: 100
/// Full URI for the file with FS scheme
std::string file_path;
/// Field id: 101
/// File format type, avro, orc, parquet, or puffin
FileFormatType file_format;
FileFormatType file_format = FileFormatType::kUnknown;
/// Field id: 102
/// Partition data tuple, schema based on the partition spec output using partition
/// field ids
Expand Down Expand Up @@ -146,7 +146,7 @@ struct ICEBERG_EXPORT DataFile {
std::optional<int32_t> sort_order_id;
/// This field is not included in spec, so it is not serialized into the manifest file.
/// It is just store in memory representation used in process.
int32_t partition_spec_id;
int32_t partition_spec_id = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int32_t partition_spec_id = 0;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;

/// Field id: 142
/// The _row_id for the first row in the data file.
///
Expand Down Expand Up @@ -261,6 +261,8 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
"The length of referenced content stored in the file");

bool operator==(const DataFile& other) const;

static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
};

Expand All @@ -272,7 +274,7 @@ struct ICEBERG_EXPORT ManifestEntry {
/// Field id: 0
/// Used to track additions and deletions. Deletes are informational only and not used
/// in scans.
ManifestStatus status;
ManifestStatus status = ManifestStatus::kAdded;
/// Field id: 1
/// Snapshot id where the file was added, or deleted if status is 2. Inherited when
/// null.
Expand All @@ -297,6 +299,8 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());

bool operator==(const ManifestEntry& other) const;

static std::shared_ptr<StructType> TypeFromPartitionType(
std::shared_ptr<StructType> partition_type);
static std::shared_ptr<StructType> TypeFromDataFileType(
Expand Down
23 changes: 23 additions & 0 deletions src/iceberg/manifest_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@

namespace iceberg {

bool PartitionFieldSummary::operator==(
const iceberg::PartitionFieldSummary& other) const {
return contains_null == other.contains_null && contains_nan == other.contains_nan &&
lower_bound == other.lower_bound && upper_bound == other.upper_bound;
}

bool ManifestFile::operator==(const iceberg::ManifestFile& other) const {
return manifest_path == other.manifest_path &&
manifest_length == other.manifest_length &&
partition_spec_id == other.partition_spec_id && content == other.content &&
sequence_number == other.sequence_number &&
min_sequence_number == other.min_sequence_number &&
added_snapshot_id == other.added_snapshot_id &&
added_files_count == other.added_files_count &&
existing_files_count == other.existing_files_count &&
deleted_files_count == other.deleted_files_count &&
added_rows_count == other.added_rows_count &&
existing_rows_count == other.existing_rows_count &&
deleted_rows_count == other.deleted_rows_count &&
partitions == other.partitions && key_metadata == other.key_metadata &&
first_row_id == other.first_row_id;
}

const StructType& PartitionFieldSummary::Type() {
static const StructType kInstance{{
PartitionFieldSummary::kContainsNull,
Expand Down
20 changes: 12 additions & 8 deletions src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace iceberg {
struct ICEBERG_EXPORT PartitionFieldSummary {
/// Field id: 509
/// Whether the manifest contains at least one partition with a null value for the field
bool contains_null;
bool contains_null = false;
/// Field id: 518
/// Whether the manifest contains at least one partition with a NaN value for the field
std::optional<bool> contains_nan;
Expand All @@ -64,6 +64,8 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
inline static const SchemaField kUpperBound = SchemaField::MakeOptional(
511, "upper_bound", iceberg::binary(), "Partition upper bound for all files");

bool operator==(const PartitionFieldSummary& other) const;

static const StructType& Type();
};

Expand All @@ -83,26 +85,26 @@ struct ICEBERG_EXPORT ManifestFile {
std::string manifest_path;
/// Field id: 501
/// Length of the manifest file in bytes
int64_t manifest_length;
int64_t manifest_length = 0;
/// Field id: 502
/// ID of a partition spec used to write the manifest; must be listed in table metadata
/// partition-specs
int32_t partition_spec_id;
int32_t partition_spec_id = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int32_t partition_spec_id = 0;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;

/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Content content;
Content content = Content::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when reading v1
/// manifest lists
int64_t sequence_number;
int64_t sequence_number = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t sequence_number = 0;
int64_t sequence_number = TableMetadata::kInitialSequenceNumber;

/// Field id: 516
/// The minimum data sequence number of all live data or delete files in the manifest;
/// use 0 when reading v1 manifest lists
int64_t min_sequence_number;
int64_t min_sequence_number = 0;
/// Field id: 503
/// ID of the snapshot where the manifest file was added
int64_t added_snapshot_id;
int64_t added_snapshot_id = 0;
/// Field id: 504
/// Number of entries in the manifest that have status ADDED (1), when null this is
/// assumed to be non-zero
Expand Down Expand Up @@ -137,7 +139,7 @@ struct ICEBERG_EXPORT ManifestFile {
std::vector<uint8_t> key_metadata;
/// Field id: 520
/// The starting _row_id to assign to rows added by ADDED data files
int64_t first_row_id;
int64_t first_row_id = 0;

/// \brief Checks if this manifest file contains entries with ADDED status.
bool has_added_files() const { return added_files_count.value_or(1) > 0; }
Expand Down Expand Up @@ -188,6 +190,8 @@ struct ICEBERG_EXPORT ManifestFile {
520, "first_row_id", iceberg::int64(),
"Starting row ID to assign to new rows in ADDED data files");

bool operator==(const ManifestFile& other) const;

static const StructType& Type();
};

Expand Down
111 changes: 43 additions & 68 deletions test/manifest_list_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,46 @@ class ManifestListReaderTest : public TempFileTestBase {
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
}

std::vector<ManifestFile> prepare_test_manifest_list() {
std::vector<ManifestFile> manifest_files;
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
std::vector<std::string> paths = {"2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro",
"9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro",
"2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro",
"3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro"};
std::vector<int64_t> file_size = {7433, 7431, 7433, 7431};
std::vector<int64_t> snapshot_id = {7412193043800610213, 5485972788975780755,
1679468743751242972, 1579605567338877265};
std::vector<std::vector<uint8_t>> bounds = {{'x', ';', 0x07, 0x00},
{'(', 0x19, 0x07, 0x00},
{0xd0, 0xd4, 0x06, 0x00},
{0xb8, 0xd4, 0x06, 0x00}};
for (int i = 0; i < 4; ++i) {
ManifestFile manifest_file;
manifest_file.manifest_path = test_dir_prefix + paths[i];
manifest_file.manifest_length = file_size[i];
manifest_file.partition_spec_id = 0;
manifest_file.content = ManifestFile::Content::kData;
manifest_file.sequence_number = 4 - i;
manifest_file.min_sequence_number = 4 - i;
manifest_file.added_snapshot_id = snapshot_id[i];
manifest_file.added_files_count = 1;
manifest_file.existing_files_count = 0;
manifest_file.deleted_files_count = 0;
manifest_file.added_rows_count = 1;
manifest_file.existing_rows_count = 0;
manifest_file.deleted_rows_count = 0;
PartitionFieldSummary partition;
partition.contains_null = false;
partition.contains_nan = false;
partition.lower_bound = bounds[i];
partition.upper_bound = bounds[i];
manifest_file.partitions.emplace_back(partition);
manifest_files.emplace_back(manifest_file);
}
return manifest_files;
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
};
Expand All @@ -55,74 +95,9 @@ TEST_F(ManifestListReaderTest, BasicTest) {
auto read_result = manifest_reader->Files();
ASSERT_EQ(read_result.has_value(), true);
ASSERT_EQ(read_result.value().size(), 4);
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
for (const auto& file : read_result.value()) {
auto manifest_path = file.manifest_path.substr(test_dir_prefix.size());
if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 7412193043800610213);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 4);
ASSERT_EQ(file.min_sequence_number, 4);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
} else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 5485972788975780755);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 3);
ASSERT_EQ(file.min_sequence_number, 3);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
} else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1679468743751242972);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 2);
ASSERT_EQ(file.min_sequence_number, 2);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
} else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1579605567338877265);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 1);
ASSERT_EQ(file.min_sequence_number, 1);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
} else {
ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path;
}
ASSERT_EQ(file.partition_spec_id, 0);
ASSERT_EQ(file.content, ManifestFile::Content::kData);
ASSERT_EQ(file.added_files_count, 1);
ASSERT_EQ(file.existing_files_count, 0);
ASSERT_EQ(file.deleted_files_count, 0);
ASSERT_EQ(file.added_rows_count, 1);
ASSERT_EQ(file.existing_rows_count, 0);
ASSERT_EQ(file.deleted_rows_count, 0);
ASSERT_EQ(file.key_metadata.empty(), true);
}

auto expected_manifest_list = prepare_test_manifest_list();
ASSERT_EQ(read_result.value(), expected_manifest_list);
}

} // namespace iceberg
Loading