Skip to content

Commit f41bd62

Browse files
author
xiao.dong
committed
refactor: support operator== for manifest&list
1 parent 84565b5 commit f41bd62

File tree

6 files changed

+113
-80
lines changed

6 files changed

+113
-80
lines changed

src/iceberg/file_format.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ enum class ICEBERG_EXPORT FileFormatType {
3636
kAvro,
3737
kOrc,
3838
kPuffin,
39+
kUnknown = 99
3940
};
4041

4142
/// \brief Convert a FileFormatType to a string

src/iceberg/manifest_entry.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,32 @@
2727

2828
namespace iceberg {
2929

30+
bool DataFile::operator==(const iceberg::DataFile& other) const {
31+
return content == other.content && file_path == other.file_path &&
32+
file_format == other.file_format && partition <=> other.partition == 0 &&
33+
record_count == other.record_count &&
34+
file_size_in_bytes == other.file_size_in_bytes &&
35+
column_sizes == other.column_sizes && value_counts == other.value_counts &&
36+
null_value_counts == other.null_value_counts &&
37+
nan_value_counts == other.nan_value_counts &&
38+
lower_bounds == other.lower_bounds && upper_bounds == other.upper_bounds &&
39+
key_metadata == other.key_metadata && split_offsets == other.split_offsets &&
40+
equality_ids == other.equality_ids && sort_order_id == other.sort_order_id &&
41+
partition_spec_id == other.partition_spec_id &&
42+
first_row_id == other.first_row_id &&
43+
referenced_data_file == other.referenced_data_file &&
44+
content_offset == other.content_offset &&
45+
content_size_in_bytes == other.content_size_in_bytes;
46+
}
47+
48+
bool ManifestEntry::operator==(const iceberg::ManifestEntry& other) const {
49+
return status == other.status && snapshot_id == other.snapshot_id &&
50+
sequence_number == other.sequence_number &&
51+
file_sequence_number == other.file_sequence_number &&
52+
(data_file && other.data_file && *data_file == *other.data_file) ||
53+
(!data_file && !other.data_file);
54+
}
55+
3056
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
3157
return std::make_shared<StructType>(std::vector<SchemaField>{
3258
kContent,

src/iceberg/manifest_entry.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ struct ICEBERG_EXPORT DataFile {
6868
/// Field id: 134
6969
/// Type of content stored by the data file: data, equality deletes, or position
7070
/// deletes (all v1 files are data files)
71-
Content content;
71+
Content content = Content::kData;
7272
/// Field id: 100
7373
/// Full URI for the file with FS scheme
7474
std::string file_path;
7575
/// Field id: 101
7676
/// File format type, avro, orc, parquet, or puffin
77-
FileFormatType file_format;
77+
FileFormatType file_format = FileFormatType::kUnknown;
7878
/// Field id: 102
7979
/// Partition data tuple, schema based on the partition spec output using partition
8080
/// field ids
@@ -146,7 +146,7 @@ struct ICEBERG_EXPORT DataFile {
146146
std::optional<int32_t> sort_order_id;
147147
/// This field is not included in spec, so it is not serialized into the manifest file.
148148
/// It is just store in memory representation used in process.
149-
int32_t partition_spec_id;
149+
int32_t partition_spec_id = 0;
150150
/// Field id: 142
151151
/// The _row_id for the first row in the data file.
152152
///
@@ -261,6 +261,8 @@ struct ICEBERG_EXPORT DataFile {
261261
SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
262262
"The length of referenced content stored in the file");
263263

264+
bool operator==(const DataFile& other) const;
265+
264266
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
265267
};
266268

@@ -272,7 +274,7 @@ struct ICEBERG_EXPORT ManifestEntry {
272274
/// Field id: 0
273275
/// Used to track additions and deletions. Deletes are informational only and not used
274276
/// in scans.
275-
ManifestStatus status;
277+
ManifestStatus status = ManifestStatus::kAdded;
276278
/// Field id: 1
277279
/// Snapshot id where the file was added, or deleted if status is 2. Inherited when
278280
/// null.
@@ -297,6 +299,8 @@ struct ICEBERG_EXPORT ManifestEntry {
297299
inline static const SchemaField kFileSequenceNumber =
298300
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
299301

302+
bool operator==(const ManifestEntry& other) const;
303+
300304
static std::shared_ptr<StructType> TypeFromPartitionType(
301305
std::shared_ptr<StructType> partition_type);
302306
static std::shared_ptr<StructType> TypeFromDataFileType(

src/iceberg/manifest_list.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,29 @@
2323

2424
namespace iceberg {
2525

26+
bool PartitionFieldSummary::operator==(
27+
const iceberg::PartitionFieldSummary& other) const {
28+
return contains_null == other.contains_null && contains_nan == other.contains_nan &&
29+
lower_bound == other.lower_bound && upper_bound == other.upper_bound;
30+
}
31+
32+
bool ManifestFile::operator==(const iceberg::ManifestFile& other) const {
33+
return manifest_path == other.manifest_path &&
34+
manifest_length == other.manifest_length &&
35+
partition_spec_id == other.partition_spec_id && content == other.content &&
36+
sequence_number == other.sequence_number &&
37+
min_sequence_number == other.min_sequence_number &&
38+
added_snapshot_id == other.added_snapshot_id &&
39+
added_files_count == other.added_files_count &&
40+
existing_files_count == other.existing_files_count &&
41+
deleted_files_count == other.deleted_files_count &&
42+
added_rows_count == other.added_rows_count &&
43+
existing_rows_count == other.existing_rows_count &&
44+
deleted_rows_count == other.deleted_rows_count &&
45+
partitions == other.partitions && key_metadata == other.key_metadata &&
46+
first_row_id == other.first_row_id;
47+
}
48+
2649
const StructType& PartitionFieldSummary::Type() {
2750
static const StructType kInstance{{
2851
PartitionFieldSummary::kContainsNull,

src/iceberg/manifest_list.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace iceberg {
4040
struct ICEBERG_EXPORT PartitionFieldSummary {
4141
/// Field id: 509
4242
/// Whether the manifest contains at least one partition with a null value for the field
43-
bool contains_null;
43+
bool contains_null = false;
4444
/// Field id: 518
4545
/// Whether the manifest contains at least one partition with a NaN value for the field
4646
std::optional<bool> contains_nan;
@@ -64,6 +64,8 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
6464
inline static const SchemaField kUpperBound = SchemaField::MakeOptional(
6565
511, "upper_bound", iceberg::binary(), "Partition upper bound for all files");
6666

67+
bool operator==(const PartitionFieldSummary& other) const;
68+
6769
static const StructType& Type();
6870
};
6971

@@ -83,26 +85,26 @@ struct ICEBERG_EXPORT ManifestFile {
8385
std::string manifest_path;
8486
/// Field id: 501
8587
/// Length of the manifest file in bytes
86-
int64_t manifest_length;
88+
int64_t manifest_length = 0;
8789
/// Field id: 502
8890
/// ID of a partition spec used to write the manifest; must be listed in table metadata
8991
/// partition-specs
90-
int32_t partition_spec_id;
92+
int32_t partition_spec_id = 0;
9193
/// Field id: 517
9294
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
9395
/// manifests
94-
Content content;
96+
Content content = Content::kData;
9597
/// Field id: 515
9698
/// The sequence number when the manifest was added to the table; use 0 when reading v1
9799
/// manifest lists
98-
int64_t sequence_number;
100+
int64_t sequence_number = 0;
99101
/// Field id: 516
100102
/// The minimum data sequence number of all live data or delete files in the manifest;
101103
/// use 0 when reading v1 manifest lists
102-
int64_t min_sequence_number;
104+
int64_t min_sequence_number = 0;
103105
/// Field id: 503
104106
/// ID of the snapshot where the manifest file was added
105-
int64_t added_snapshot_id;
107+
int64_t added_snapshot_id = 0;
106108
/// Field id: 504
107109
/// Number of entries in the manifest that have status ADDED (1), when null this is
108110
/// assumed to be non-zero
@@ -137,7 +139,7 @@ struct ICEBERG_EXPORT ManifestFile {
137139
std::vector<uint8_t> key_metadata;
138140
/// Field id: 520
139141
/// The starting _row_id to assign to rows added by ADDED data files
140-
int64_t first_row_id;
142+
int64_t first_row_id = 0;
141143

142144
/// \brief Checks if this manifest file contains entries with ADDED status.
143145
bool has_added_files() const { return added_files_count.value_or(1) > 0; }
@@ -188,6 +190,8 @@ struct ICEBERG_EXPORT ManifestFile {
188190
520, "first_row_id", iceberg::int64(),
189191
"Starting row ID to assign to new rows in ADDED data files");
190192

193+
bool operator==(const ManifestFile& other) const;
194+
191195
static const StructType& Type();
192196
};
193197

test/manifest_list_reader_test.cc

Lines changed: 43 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,46 @@ class ManifestListReaderTest : public TempFileTestBase {
4242
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
4343
}
4444

45+
std::vector<ManifestFile> prepare_test_manifest_list() {
46+
std::vector<ManifestFile> manifest_files;
47+
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
48+
std::vector<std::string> paths = {"2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro",
49+
"9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro",
50+
"2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro",
51+
"3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro"};
52+
std::vector<int64_t> file_size = {7433, 7431, 7433, 7431};
53+
std::vector<int64_t> snapshot_id = {7412193043800610213, 5485972788975780755,
54+
1679468743751242972, 1579605567338877265};
55+
std::vector<std::vector<uint8_t>> bounds = {{'x', ';', 0x07, 0x00},
56+
{'(', 0x19, 0x07, 0x00},
57+
{0xd0, 0xd4, 0x06, 0x00},
58+
{0xb8, 0xd4, 0x06, 0x00}};
59+
for (int i = 0; i < 4; ++i) {
60+
ManifestFile manifest_file;
61+
manifest_file.manifest_path = test_dir_prefix + paths[i];
62+
manifest_file.manifest_length = file_size[i];
63+
manifest_file.partition_spec_id = 0;
64+
manifest_file.content = ManifestFile::Content::kData;
65+
manifest_file.sequence_number = 4 - i;
66+
manifest_file.min_sequence_number = 4 - i;
67+
manifest_file.added_snapshot_id = snapshot_id[i];
68+
manifest_file.added_files_count = 1;
69+
manifest_file.existing_files_count = 0;
70+
manifest_file.deleted_files_count = 0;
71+
manifest_file.added_rows_count = 1;
72+
manifest_file.existing_rows_count = 0;
73+
manifest_file.deleted_rows_count = 0;
74+
PartitionFieldSummary partition;
75+
partition.contains_null = false;
76+
partition.contains_nan = false;
77+
partition.lower_bound = bounds[i];
78+
partition.upper_bound = bounds[i];
79+
manifest_file.partitions.emplace_back(partition);
80+
manifest_files.emplace_back(manifest_file);
81+
}
82+
return manifest_files;
83+
}
84+
4585
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
4686
std::shared_ptr<FileIO> file_io_;
4787
};
@@ -55,74 +95,9 @@ TEST_F(ManifestListReaderTest, BasicTest) {
5595
auto read_result = manifest_reader->Files();
5696
ASSERT_EQ(read_result.has_value(), true);
5797
ASSERT_EQ(read_result.value().size(), 4);
58-
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
59-
for (const auto& file : read_result.value()) {
60-
auto manifest_path = file.manifest_path.substr(test_dir_prefix.size());
61-
if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") {
62-
ASSERT_EQ(file.added_snapshot_id, 7412193043800610213);
63-
ASSERT_EQ(file.manifest_length, 7433);
64-
ASSERT_EQ(file.sequence_number, 4);
65-
ASSERT_EQ(file.min_sequence_number, 4);
66-
ASSERT_EQ(file.partitions.size(), 1);
67-
const auto& partition = file.partitions[0];
68-
ASSERT_EQ(partition.contains_null, false);
69-
ASSERT_EQ(partition.contains_nan.value(), false);
70-
ASSERT_EQ(partition.lower_bound.value(),
71-
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
72-
ASSERT_EQ(partition.upper_bound.value(),
73-
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
74-
} else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") {
75-
ASSERT_EQ(file.added_snapshot_id, 5485972788975780755);
76-
ASSERT_EQ(file.manifest_length, 7431);
77-
ASSERT_EQ(file.sequence_number, 3);
78-
ASSERT_EQ(file.min_sequence_number, 3);
79-
ASSERT_EQ(file.partitions.size(), 1);
80-
const auto& partition = file.partitions[0];
81-
ASSERT_EQ(partition.contains_null, false);
82-
ASSERT_EQ(partition.contains_nan.value(), false);
83-
ASSERT_EQ(partition.lower_bound.value(),
84-
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
85-
ASSERT_EQ(partition.upper_bound.value(),
86-
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
87-
} else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") {
88-
ASSERT_EQ(file.added_snapshot_id, 1679468743751242972);
89-
ASSERT_EQ(file.manifest_length, 7433);
90-
ASSERT_EQ(file.sequence_number, 2);
91-
ASSERT_EQ(file.min_sequence_number, 2);
92-
ASSERT_EQ(file.partitions.size(), 1);
93-
const auto& partition = file.partitions[0];
94-
ASSERT_EQ(partition.contains_null, false);
95-
ASSERT_EQ(partition.contains_nan.value(), false);
96-
ASSERT_EQ(partition.lower_bound.value(),
97-
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
98-
ASSERT_EQ(partition.upper_bound.value(),
99-
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
100-
} else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") {
101-
ASSERT_EQ(file.added_snapshot_id, 1579605567338877265);
102-
ASSERT_EQ(file.manifest_length, 7431);
103-
ASSERT_EQ(file.sequence_number, 1);
104-
ASSERT_EQ(file.min_sequence_number, 1);
105-
ASSERT_EQ(file.partitions.size(), 1);
106-
const auto& partition = file.partitions[0];
107-
ASSERT_EQ(partition.contains_null, false);
108-
ASSERT_EQ(partition.contains_nan.value(), false);
109-
ASSERT_EQ(partition.lower_bound.value(),
110-
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
111-
ASSERT_EQ(partition.upper_bound.value(),
112-
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
113-
} else {
114-
ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path;
115-
}
116-
ASSERT_EQ(file.partition_spec_id, 0);
117-
ASSERT_EQ(file.content, ManifestFile::Content::kData);
118-
ASSERT_EQ(file.added_files_count, 1);
119-
ASSERT_EQ(file.existing_files_count, 0);
120-
ASSERT_EQ(file.deleted_files_count, 0);
121-
ASSERT_EQ(file.added_rows_count, 1);
122-
ASSERT_EQ(file.existing_rows_count, 0);
123-
ASSERT_EQ(file.deleted_rows_count, 0);
124-
ASSERT_EQ(file.key_metadata.empty(), true);
125-
}
98+
99+
auto expected_manifest_list = prepare_test_manifest_list();
100+
ASSERT_EQ(read_result.value(), expected_manifest_list);
126101
}
127102

128103
} // namespace iceberg

0 commit comments

Comments
 (0)