Skip to content

Commit 82e2aed

Browse files
committed
test: add manifest list test cases
- fix schema definition of manifest list & file with versions - port manifest list test cases from java - fix various issues found by new cases
1 parent 407c3d1 commit 82e2aed

18 files changed

+820
-266
lines changed

src/iceberg/manifest_adapter.cc

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
139139
return &array_;
140140
}
141141

142+
ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
143+
: partition_spec_(std::move(partition_spec)) {
144+
if (!partition_spec_) {
145+
partition_spec_ = PartitionSpec::Unpartitioned();
146+
}
147+
}
148+
142149
ManifestEntryAdapter::~ManifestEntryAdapter() {
143150
if (array_.release != nullptr) {
144151
ArrowArrayRelease(&array_);
@@ -148,14 +155,6 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
148155
}
149156
}
150157

151-
Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryType() {
152-
if (partition_spec_ == nullptr) [[unlikely]] {
153-
return ManifestEntry::TypeFromPartitionType(nullptr);
154-
}
155-
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType());
156-
return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
157-
}
158-
159158
Status ManifestEntryAdapter::AppendPartitionValues(
160159
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
161160
const std::vector<Literal>& partition_values) {
@@ -436,37 +435,6 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
436435
return {};
437436
}
438437

439-
Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
440-
ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType())
441-
auto fields_span = manifest_entry_type->fields();
442-
std::vector<SchemaField> fields;
443-
// TODO(xiao.dong) Make this a common function to recursively handle
444-
// all nested fields in the schema
445-
for (const auto& field : fields_span) {
446-
if (field.field_id() == 2) {
447-
// handle data_file field
448-
auto data_file_struct = internal::checked_pointer_cast<StructType>(field.type());
449-
std::vector<SchemaField> data_file_fields;
450-
for (const auto& data_file_field : data_file_struct->fields()) {
451-
if (fields_ids.contains(data_file_field.field_id())) {
452-
data_file_fields.emplace_back(data_file_field);
453-
}
454-
}
455-
auto type = std::make_shared<StructType>(data_file_fields);
456-
auto data_file_field = SchemaField::MakeRequired(
457-
field.field_id(), std::string(field.name()), std::move(type));
458-
fields.emplace_back(std::move(data_file_field));
459-
} else {
460-
if (fields_ids.contains(field.field_id())) {
461-
fields.emplace_back(field);
462-
}
463-
}
464-
}
465-
manifest_schema_ = std::make_shared<Schema>(fields);
466-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
467-
return {};
468-
}
469-
470438
ManifestFileAdapter::~ManifestFileAdapter() {
471439
if (array_.release != nullptr) {
472440
ArrowArrayRelease(&array_);
@@ -671,16 +639,4 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
671639
return {};
672640
}
673641

674-
Status ManifestFileAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
675-
std::vector<SchemaField> fields;
676-
for (const auto& field : ManifestFile::Type().fields()) {
677-
if (fields_ids.contains(field.field_id())) {
678-
fields.emplace_back(field);
679-
}
680-
}
681-
manifest_list_schema_ = std::make_shared<Schema>(fields);
682-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_));
683-
return {};
684-
}
685-
686642
} // namespace iceberg

src/iceberg/manifest_adapter.h

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,15 @@ class ICEBERG_EXPORT ManifestAdapter {
6161
/// Implemented by different versions with version-specific schemas.
6262
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6363
public:
64-
explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
65-
: partition_spec_(std::move(partition_spec)) {}
64+
explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec);
65+
6666
~ManifestEntryAdapter() override;
6767

6868
virtual Status Append(const ManifestEntry& entry) = 0;
6969

7070
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
7171

7272
protected:
73-
virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
74-
75-
/// \brief Initialize version-specific schema.
76-
///
77-
/// \param fields_ids Field IDs to include in the manifest schema. The schema will be
78-
/// initialized to include only the fields with these IDs.
79-
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
8073
Status AppendInternal(const ManifestEntry& entry);
8174
Status AppendDataFile(ArrowArray* array,
8275
const std::shared_ptr<StructType>& data_file_type,
@@ -110,12 +103,9 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
110103

111104
const std::shared_ptr<Schema>& schema() const { return manifest_list_schema_; }
112105

106+
virtual std::optional<int64_t> next_row_id() const { return std::nullopt; }
107+
113108
protected:
114-
/// \brief Initialize version-specific schema.
115-
///
116-
/// \param fields_ids Field IDs to include in the manifest list schema. The schema will
117-
/// be initialized to include only the fields with these IDs.
118-
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
119109
Status AppendInternal(const ManifestFile& file);
120110
static Status AppendPartitionSummary(
121111
ArrowArray* array, const std::shared_ptr<ListType>& summary_type,

src/iceberg/manifest_entry.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other) const {
3838

3939
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
4040
if (!partition_type) {
41-
partition_type = PartitionSpec::Unpartitioned()->schema();
41+
partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
4242
}
4343
return std::make_shared<StructType>(std::vector<SchemaField>{
4444
kContent,

src/iceberg/manifest_entry.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ struct ICEBERG_EXPORT DataFile {
185185
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
186186
inline static const int32_t kPartitionFieldId = 102;
187187
inline static const std::string kPartitionField = "partition";
188+
inline static const std::string kPartitionDoc =
189+
"Partition data tuple, schema based on the partition spec";
188190
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
189191
103, "record_count", iceberg::int64(), "Number of records in the file");
190192
inline static const SchemaField kFileSize = SchemaField::MakeRequired(

src/iceberg/manifest_list.cc

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,25 @@ const StructType& PartitionFieldSummary::Type() {
3333
return kInstance;
3434
}
3535

36-
const StructType& ManifestFile::Type() {
37-
static const StructType kInstance(
38-
{kManifestPath, kManifestLength, kPartitionSpecId, kContent, kSequenceNumber,
39-
kMinSequenceNumber, kAddedSnapshotId, kAddedFilesCount, kExistingFilesCount,
40-
kDeletedFilesCount, kAddedRowsCount, kExistingRowsCount, kDeletedRowsCount,
41-
kPartitions, kKeyMetadata, kFirstRowId});
36+
const std::shared_ptr<Schema>& ManifestFile::Type() {
37+
static const auto kInstance = std::make_shared<Schema>(std::vector<SchemaField>{
38+
kManifestPath,
39+
kManifestLength,
40+
kPartitionSpecId,
41+
kContent,
42+
kSequenceNumber,
43+
kMinSequenceNumber,
44+
kAddedSnapshotId,
45+
kAddedFilesCount,
46+
kExistingFilesCount,
47+
kDeletedFilesCount,
48+
kAddedRowsCount,
49+
kExistingRowsCount,
50+
kDeletedRowsCount,
51+
kPartitions,
52+
kKeyMetadata,
53+
kFirstRowId,
54+
});
4255
return kInstance;
4356
}
4457

src/iceberg/manifest_list.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ struct ICEBERG_EXPORT ManifestFile {
197197

198198
bool operator==(const ManifestFile& other) const = default;
199199

200-
static const StructType& Type();
200+
static const std::shared_ptr<Schema>& Type();
201201
};
202202

203203
/// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are

src/iceberg/manifest_reader.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,15 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
7171

7272
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
7373
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
74-
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
75-
ManifestFile::Type().fields().end());
76-
auto schema = std::make_shared<Schema>(fields);
77-
ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(
78-
FileFormatType::kAvro,
79-
{.path = std::string(manifest_list_location),
80-
.io = std::move(file_io),
81-
.projection = schema}));
74+
std::shared_ptr<Schema> schema = ManifestFile::Type();
75+
ICEBERG_ASSIGN_OR_RAISE(
76+
auto reader,
77+
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
78+
{
79+
.path = std::string(manifest_list_location),
80+
.io = std::move(file_io),
81+
.projection = schema,
82+
}));
8283
return std::make_unique<ManifestListReaderImpl>(std::move(reader), std::move(schema));
8384
}
8485

src/iceberg/manifest_writer.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ Status ManifestListWriter::Close() {
136136
return writer_->Close();
137137
}
138138

139+
std::optional<int64_t> ManifestListWriter::next_row_id() const {
140+
return adapter_->next_row_id();
141+
}
142+
139143
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
140144
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
141145
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
@@ -169,7 +173,7 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
169173

170174
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
171175
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
172-
int64_t sequence_number, std::optional<int64_t> first_row_id,
176+
int64_t sequence_number, int64_t first_row_id,
173177
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
174178
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id, parent_snapshot_id,
175179
sequence_number, first_row_id);

src/iceberg/manifest_writer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ class ICEBERG_EXPORT ManifestListWriter {
114114
/// \brief Close writer and flush to storage.
115115
Status Close();
116116

117+
/// \brief Get the next row id to assign.
118+
std::optional<int64_t> next_row_id() const;
119+
117120
/// \brief Creates a writer for the v1 manifest list.
118121
/// \param snapshot_id ID of the snapshot.
119122
/// \param parent_snapshot_id ID of the parent snapshot.
@@ -146,7 +149,7 @@ class ICEBERG_EXPORT ManifestListWriter {
146149
/// \return A Result containing the writer or an error.
147150
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
148151
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
149-
int64_t sequence_number, std::optional<int64_t> first_row_id,
152+
int64_t sequence_number, int64_t first_row_id,
150153
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
151154

152155
private:

src/iceberg/schema_field.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,18 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
7979
return lhs.Equals(rhs);
8080
}
8181

82+
SchemaField AsRequired() const {
83+
auto copy = *this;
84+
copy.optional_ = false;
85+
return copy;
86+
}
87+
88+
SchemaField AsOptional() const {
89+
auto copy = *this;
90+
copy.optional_ = true;
91+
return copy;
92+
}
93+
8294
private:
8395
/// \brief Compare two fields for equality.
8496
[[nodiscard]] bool Equals(const SchemaField& other) const;

0 commit comments

Comments
 (0)