Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/expression/literal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ std::strong_ordering CompareFloat(T lhs, T rhs) {
return lhs_is_negative <=> rhs_is_negative;
}

bool Literal::operator==(const Literal& other) const { return (*this <=> other) == 0; }

// Three-way comparison operator
std::partial_ordering Literal::operator<=>(const Literal& other) const {
// If types are different, comparison is unordered
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/expression/literal.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class ICEBERG_EXPORT Literal {
/// was not valid
Result<Literal> CastTo(const std::shared_ptr<PrimitiveType>& target_type) const;

bool operator==(const Literal& other) const;

/// \brief Compare two PrimitiveLiterals. Both literals must have the same type
/// and should not be AboveMax or BelowMin.
std::partial_ordering operator<=>(const Literal& other) const;
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@

namespace iceberg {

bool ManifestEntry::operator==(const ManifestEntry& other) const {
return status == other.status && snapshot_id == other.snapshot_id &&
sequence_number == other.sequence_number &&
file_sequence_number == other.file_sequence_number &&
(data_file && other.data_file && *data_file == *other.data_file) ||
(!data_file && !other.data_file);
}

std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
Expand Down
13 changes: 9 additions & 4 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "iceberg/expression/literal.h"
#include "iceberg/file_format.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema_field.h"
#include "iceberg/type.h"
Expand Down Expand Up @@ -68,13 +69,13 @@ struct ICEBERG_EXPORT DataFile {
/// Field id: 134
/// Type of content stored by the data file: data, equality deletes, or position
/// deletes (all v1 files are data files)
Content content;
Content content = Content::kData;
/// Field id: 100
/// Full URI for the file with FS scheme
std::string file_path;
/// Field id: 101
/// File format type, avro, orc, parquet, or puffin
FileFormatType file_format;
FileFormatType file_format = FileFormatType::kParquet;
/// Field id: 102
/// Partition data tuple, schema based on the partition spec output using partition
/// field ids
Expand Down Expand Up @@ -146,7 +147,7 @@ struct ICEBERG_EXPORT DataFile {
std::optional<int32_t> sort_order_id;
/// This field is not included in spec, so it is not serialized into the manifest file.
/// It is just store in memory representation used in process.
int32_t partition_spec_id;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;
/// Field id: 142
/// The _row_id for the first row in the data file.
///
Expand Down Expand Up @@ -261,6 +262,8 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
"The length of referenced content stored in the file");

bool operator==(const DataFile& other) const = default;

static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
};

Expand All @@ -272,7 +275,7 @@ struct ICEBERG_EXPORT ManifestEntry {
/// Field id: 0
/// Used to track additions and deletions. Deletes are informational only and not used
/// in scans.
ManifestStatus status;
ManifestStatus status = ManifestStatus::kAdded;
/// Field id: 1
/// Snapshot id where the file was added, or deleted if status is 2. Inherited when
/// null.
Expand All @@ -297,6 +300,8 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());

bool operator==(const ManifestEntry& other) const;

static std::shared_ptr<StructType> TypeFromPartitionType(
std::shared_ptr<StructType> partition_type);
static std::shared_ptr<StructType> TypeFromDataFileType(
Expand Down
23 changes: 15 additions & 8 deletions src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
#include <utility>

#include "iceberg/iceberg_export.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/type.h"

namespace iceberg {
Expand All @@ -40,7 +43,7 @@ namespace iceberg {
struct ICEBERG_EXPORT PartitionFieldSummary {
/// Field id: 509
/// Whether the manifest contains at least one partition with a null value for the field
bool contains_null;
bool contains_null = true;
/// Field id: 518
/// Whether the manifest contains at least one partition with a NaN value for the field
std::optional<bool> contains_nan;
Expand All @@ -64,6 +67,8 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
inline static const SchemaField kUpperBound = SchemaField::MakeOptional(
511, "upper_bound", iceberg::binary(), "Partition upper bound for all files");

bool operator==(const PartitionFieldSummary& other) const = default;

static const StructType& Type();
};

Expand All @@ -83,26 +88,26 @@ struct ICEBERG_EXPORT ManifestFile {
std::string manifest_path;
/// Field id: 501
/// Length of the manifest file in bytes
int64_t manifest_length;
int64_t manifest_length = 0;
/// Field id: 502
/// ID of a partition spec used to write the manifest; must be listed in table metadata
/// partition-specs
int32_t partition_spec_id;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;
/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Content content;
Content content = Content::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when reading v1
/// manifest lists
int64_t sequence_number;
int64_t sequence_number = TableMetadata::kInitialSequenceNumber;
/// Field id: 516
/// The minimum data sequence number of all live data or delete files in the manifest;
/// use 0 when reading v1 manifest lists
int64_t min_sequence_number;
int64_t min_sequence_number = TableMetadata::kInitialSequenceNumber;
/// Field id: 503
/// ID of the snapshot where the manifest file was added
int64_t added_snapshot_id;
int64_t added_snapshot_id = Snapshot::kInvalidSnapshotId;
/// Field id: 504
/// Number of entries in the manifest that have status ADDED (1), when null this is
/// assumed to be non-zero
Expand Down Expand Up @@ -137,7 +142,7 @@ struct ICEBERG_EXPORT ManifestFile {
std::vector<uint8_t> key_metadata;
/// Field id: 520
/// The starting _row_id to assign to rows added by ADDED data files
int64_t first_row_id;
std::optional<int64_t> first_row_id;

/// \brief Checks if this manifest file contains entries with ADDED status.
bool has_added_files() const { return added_files_count.value_or(1) > 0; }
Expand Down Expand Up @@ -188,6 +193,8 @@ struct ICEBERG_EXPORT ManifestFile {
520, "first_row_id", iceberg::int64(),
"Starting row ID to assign to new rows in ADDED data files");

bool operator==(const ManifestFile& other) const = default;

static const StructType& Type();
};

Expand Down
111 changes: 43 additions & 68 deletions test/manifest_list_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,46 @@ class ManifestListReaderTest : public TempFileTestBase {
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
}

std::vector<ManifestFile> PrepareTestManifestList() {
std::vector<ManifestFile> manifest_files;
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
std::vector<std::string> paths = {"2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro",
"9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro",
"2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro",
"3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro"};
std::vector<int64_t> file_size = {7433, 7431, 7433, 7431};
std::vector<int64_t> snapshot_id = {7412193043800610213, 5485972788975780755,
1679468743751242972, 1579605567338877265};
std::vector<std::vector<uint8_t>> bounds = {{'x', ';', 0x07, 0x00},
{'(', 0x19, 0x07, 0x00},
{0xd0, 0xd4, 0x06, 0x00},
{0xb8, 0xd4, 0x06, 0x00}};
for (int i = 0; i < 4; ++i) {
ManifestFile manifest_file;
manifest_file.manifest_path = test_dir_prefix + paths[i];
manifest_file.manifest_length = file_size[i];
manifest_file.partition_spec_id = 0;
manifest_file.content = ManifestFile::Content::kData;
manifest_file.sequence_number = 4 - i;
manifest_file.min_sequence_number = 4 - i;
manifest_file.added_snapshot_id = snapshot_id[i];
manifest_file.added_files_count = 1;
manifest_file.existing_files_count = 0;
manifest_file.deleted_files_count = 0;
manifest_file.added_rows_count = 1;
manifest_file.existing_rows_count = 0;
manifest_file.deleted_rows_count = 0;
PartitionFieldSummary partition;
partition.contains_null = false;
partition.contains_nan = false;
partition.lower_bound = bounds[i];
partition.upper_bound = bounds[i];
manifest_file.partitions.emplace_back(partition);
manifest_files.emplace_back(manifest_file);
}
return manifest_files;
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
};
Expand All @@ -55,74 +95,9 @@ TEST_F(ManifestListReaderTest, BasicTest) {
auto read_result = manifest_reader->Files();
ASSERT_EQ(read_result.has_value(), true);
ASSERT_EQ(read_result.value().size(), 4);
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
for (const auto& file : read_result.value()) {
auto manifest_path = file.manifest_path.substr(test_dir_prefix.size());
if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 7412193043800610213);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 4);
ASSERT_EQ(file.min_sequence_number, 4);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
} else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 5485972788975780755);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 3);
ASSERT_EQ(file.min_sequence_number, 3);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
} else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1679468743751242972);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 2);
ASSERT_EQ(file.min_sequence_number, 2);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
} else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1579605567338877265);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 1);
ASSERT_EQ(file.min_sequence_number, 1);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
} else {
ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path;
}
ASSERT_EQ(file.partition_spec_id, 0);
ASSERT_EQ(file.content, ManifestFile::Content::kData);
ASSERT_EQ(file.added_files_count, 1);
ASSERT_EQ(file.existing_files_count, 0);
ASSERT_EQ(file.deleted_files_count, 0);
ASSERT_EQ(file.added_rows_count, 1);
ASSERT_EQ(file.existing_rows_count, 0);
ASSERT_EQ(file.deleted_rows_count, 0);
ASSERT_EQ(file.key_metadata.empty(), true);
}

auto expected_manifest_list = PrepareTestManifestList();
ASSERT_EQ(read_result.value(), expected_manifest_list);
}

} // namespace iceberg
Loading