Skip to content

Commit 23ed851

Browse files
wgtmaczhjwpku
andauthored
test: add manifest list test cases (#293)
- fix schema definition of manifest list & file with versions - port manifest list test cases from java - fix various issues found by new cases Co-authored-by: Junwang Zhao <[email protected]>
1 parent e759807 commit 23ed851

21 files changed

+888
-286
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions
5555

5656
class AvroWriter::Impl {
5757
public:
58+
~Impl() {
59+
if (arrow_schema_.release != nullptr) {
60+
ArrowSchemaRelease(&arrow_schema_);
61+
}
62+
}
63+
5864
Status Open(const WriterOptions& options) {
5965
write_schema_ = options.schema;
6066

src/iceberg/manifest_adapter.cc

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
139139
return &array_;
140140
}
141141

142+
ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
143+
ManifestContent content)
144+
: partition_spec_(std::move(partition_spec)), content_(content) {
145+
if (!partition_spec_) {
146+
partition_spec_ = PartitionSpec::Unpartitioned();
147+
}
148+
}
149+
142150
ManifestEntryAdapter::~ManifestEntryAdapter() {
143151
if (array_.release != nullptr) {
144152
ArrowArrayRelease(&array_);
@@ -148,14 +156,6 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
148156
}
149157
}
150158

151-
Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryType() {
152-
if (partition_spec_ == nullptr) [[unlikely]] {
153-
return ManifestEntry::TypeFromPartitionType(nullptr);
154-
}
155-
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType());
156-
return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
157-
}
158-
159159
Status ManifestEntryAdapter::AppendPartitionValues(
160160
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
161161
const std::vector<Literal>& partition_values) {
@@ -436,37 +436,6 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
436436
return {};
437437
}
438438

439-
Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
440-
ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType())
441-
auto fields_span = manifest_entry_type->fields();
442-
std::vector<SchemaField> fields;
443-
// TODO(xiao.dong) Make this a common function to recursively handle
444-
// all nested fields in the schema
445-
for (const auto& field : fields_span) {
446-
if (field.field_id() == 2) {
447-
// handle data_file field
448-
auto data_file_struct = internal::checked_pointer_cast<StructType>(field.type());
449-
std::vector<SchemaField> data_file_fields;
450-
for (const auto& data_file_field : data_file_struct->fields()) {
451-
if (fields_ids.contains(data_file_field.field_id())) {
452-
data_file_fields.emplace_back(data_file_field);
453-
}
454-
}
455-
auto type = std::make_shared<StructType>(data_file_fields);
456-
auto data_file_field = SchemaField::MakeRequired(
457-
field.field_id(), std::string(field.name()), std::move(type));
458-
fields.emplace_back(std::move(data_file_field));
459-
} else {
460-
if (fields_ids.contains(field.field_id())) {
461-
fields.emplace_back(field);
462-
}
463-
}
464-
}
465-
manifest_schema_ = std::make_shared<Schema>(fields);
466-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
467-
return {};
468-
}
469-
470439
ManifestFileAdapter::~ManifestFileAdapter() {
471440
if (array_.release != nullptr) {
472441
ArrowArrayRelease(&array_);
@@ -671,16 +640,4 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
671640
return {};
672641
}
673642

674-
Status ManifestFileAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
675-
std::vector<SchemaField> fields;
676-
for (const auto& field : ManifestFile::Type().fields()) {
677-
if (fields_ids.contains(field.field_id())) {
678-
fields.emplace_back(field);
679-
}
680-
}
681-
manifest_list_schema_ = std::make_shared<Schema>(fields);
682-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_));
683-
return {};
684-
}
685-
686643
} // namespace iceberg

src/iceberg/manifest_adapter.h

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,18 @@ class ICEBERG_EXPORT ManifestAdapter {
6161
/// Implemented by different versions with version-specific schemas.
6262
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6363
public:
64-
explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
65-
: partition_spec_(std::move(partition_spec)) {}
64+
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
65+
ManifestContent content);
66+
6667
~ManifestEntryAdapter() override;
6768

6869
virtual Status Append(const ManifestEntry& entry) = 0;
6970

7071
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
7172

72-
protected:
73-
virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
73+
ManifestContent content() const { return content_; }
7474

75-
/// \brief Initialize version-specific schema.
76-
///
77-
/// \param fields_ids Field IDs to include in the manifest schema. The schema will be
78-
/// initialized to include only the fields with these IDs.
79-
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
75+
protected:
8076
Status AppendInternal(const ManifestEntry& entry);
8177
Status AppendDataFile(ArrowArray* array,
8278
const std::shared_ptr<StructType>& data_file_type,
@@ -97,6 +93,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9793
protected:
9894
std::shared_ptr<PartitionSpec> partition_spec_;
9995
std::shared_ptr<Schema> manifest_schema_;
96+
const ManifestContent content_;
10097
};
10198

10299
/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.
@@ -110,12 +107,9 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
110107

111108
const std::shared_ptr<Schema>& schema() const { return manifest_list_schema_; }
112109

110+
virtual std::optional<int64_t> next_row_id() const { return std::nullopt; }
111+
113112
protected:
114-
/// \brief Initialize version-specific schema.
115-
///
116-
/// \param fields_ids Field IDs to include in the manifest list schema. The schema will
117-
/// be initialized to include only the fields with these IDs.
118-
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
119113
Status AppendInternal(const ManifestFile& file);
120114
static Status AppendPartitionSummary(
121115
ArrowArray* array, const std::shared_ptr<ListType>& summary_type,

src/iceberg/manifest_entry.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other) const {
3838

3939
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
4040
if (!partition_type) {
41-
partition_type = PartitionSpec::Unpartitioned()->schema();
41+
partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
4242
}
4343
return std::make_shared<StructType>(std::vector<SchemaField>{
4444
kContent,

src/iceberg/manifest_entry.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
5757
}
5858
}
5959

60+
enum class ManifestContent {
61+
kData = 0,
62+
kDeletes = 1,
63+
};
64+
65+
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
66+
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
67+
std::string_view str) noexcept;
68+
6069
/// \brief DataFile carries data file path, partition tuple, metrics, ...
6170
struct ICEBERG_EXPORT DataFile {
6271
/// \brief Content of a data file
@@ -185,6 +194,8 @@ struct ICEBERG_EXPORT DataFile {
185194
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
186195
inline static const int32_t kPartitionFieldId = 102;
187196
inline static const std::string kPartitionField = "partition";
197+
inline static const std::string kPartitionDoc =
198+
"Partition data tuple, schema based on the partition spec";
188199
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
189200
103, "record_count", iceberg::int64(), "Number of records in the file");
190201
inline static const SchemaField kFileSize = SchemaField::MakeRequired(

src/iceberg/manifest_list.cc

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,25 @@ const StructType& PartitionFieldSummary::Type() {
3333
return kInstance;
3434
}
3535

36-
const StructType& ManifestFile::Type() {
37-
static const StructType kInstance(
38-
{kManifestPath, kManifestLength, kPartitionSpecId, kContent, kSequenceNumber,
39-
kMinSequenceNumber, kAddedSnapshotId, kAddedFilesCount, kExistingFilesCount,
40-
kDeletedFilesCount, kAddedRowsCount, kExistingRowsCount, kDeletedRowsCount,
41-
kPartitions, kKeyMetadata, kFirstRowId});
36+
const std::shared_ptr<Schema>& ManifestFile::Type() {
37+
static const auto kInstance = std::make_shared<Schema>(std::vector<SchemaField>{
38+
kManifestPath,
39+
kManifestLength,
40+
kPartitionSpecId,
41+
kContent,
42+
kSequenceNumber,
43+
kMinSequenceNumber,
44+
kAddedSnapshotId,
45+
kAddedFilesCount,
46+
kExistingFilesCount,
47+
kDeletedFilesCount,
48+
kAddedRowsCount,
49+
kExistingRowsCount,
50+
kDeletedRowsCount,
51+
kPartitions,
52+
kKeyMetadata,
53+
kFirstRowId,
54+
});
4255
return kInstance;
4356
}
4457

src/iceberg/manifest_list.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ struct ICEBERG_EXPORT ManifestFile {
197197

198198
bool operator==(const ManifestFile& other) const = default;
199199

200-
static const StructType& Type();
200+
static const std::shared_ptr<Schema>& Type();
201201
};
202202

203203
/// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are

src/iceberg/manifest_reader.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,15 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
7171

7272
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
7373
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
74-
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
75-
ManifestFile::Type().fields().end());
76-
auto schema = std::make_shared<Schema>(fields);
77-
ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(
78-
FileFormatType::kAvro,
79-
{.path = std::string(manifest_list_location),
80-
.io = std::move(file_io),
81-
.projection = schema}));
74+
std::shared_ptr<Schema> schema = ManifestFile::Type();
75+
ICEBERG_ASSIGN_OR_RAISE(
76+
auto reader,
77+
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
78+
{
79+
.path = std::string(manifest_list_location),
80+
.io = std::move(file_io),
81+
.projection = schema,
82+
}));
8283
return std::make_unique<ManifestListReaderImpl>(std::move(reader), std::move(schema));
8384
}
8485

src/iceberg/manifest_writer.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ Status ManifestWriter::Close() {
5353
return writer_->Close();
5454
}
5555

56+
ManifestContent ManifestWriter::content() const { return adapter_->content(); }
57+
5658
Result<std::unique_ptr<Writer>> OpenFileWriter(
5759
std::string_view location, std::shared_ptr<Schema> schema,
5860
std::shared_ptr<FileIO> file_io,
@@ -83,9 +85,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
8385

8486
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
8587
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
86-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
87-
auto adapter =
88-
std::make_unique<ManifestEntryAdapterV2>(snapshot_id, std::move(partition_spec));
88+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
89+
ManifestContent content) {
90+
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
91+
snapshot_id, std::move(partition_spec), content);
8992
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
9093
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
9194

@@ -99,9 +102,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
99102
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
100103
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
101104
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
102-
std::shared_ptr<PartitionSpec> partition_spec) {
103-
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id, first_row_id,
104-
std::move(partition_spec));
105+
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content) {
106+
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
107+
snapshot_id, first_row_id, std::move(partition_spec), content);
105108
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
106109
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
107110

@@ -136,6 +139,10 @@ Status ManifestListWriter::Close() {
136139
return writer_->Close();
137140
}
138141

142+
std::optional<int64_t> ManifestListWriter::next_row_id() const {
143+
return adapter_->next_row_id();
144+
}
145+
139146
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
140147
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
141148
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
@@ -169,7 +176,7 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
169176

170177
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
171178
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
172-
int64_t sequence_number, std::optional<int64_t> first_row_id,
179+
int64_t sequence_number, int64_t first_row_id,
173180
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
174181
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id, parent_snapshot_id,
175182
sequence_number, first_row_id);

src/iceberg/manifest_writer.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ICEBERG_EXPORT ManifestWriter {
5454
/// \brief Close writer and flush to storage.
5555
Status Close();
5656

57+
/// \brief Get the content of the manifest.
58+
ManifestContent content() const;
59+
5760
/// \brief Creates a writer for a manifest file.
5861
/// \param snapshot_id ID of the snapshot.
5962
/// \param manifest_location Path to the manifest file.
@@ -69,22 +72,25 @@ class ICEBERG_EXPORT ManifestWriter {
6972
/// \param manifest_location Path to the manifest file.
7073
/// \param file_io File IO implementation to use.
7174
/// \param partition_spec Partition spec for the manifest.
75+
/// \param content Content of the manifest.
7276
/// \return A Result containing the writer or an error.
7377
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
7478
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
75-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
79+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
80+
ManifestContent content);
7681

7782
/// \brief Creates a writer for a manifest file.
7883
/// \param snapshot_id ID of the snapshot.
7984
/// \param first_row_id First row ID of the snapshot.
8085
/// \param manifest_location Path to the manifest file.
8186
/// \param file_io File IO implementation to use.
8287
/// \param partition_spec Partition spec for the manifest.
88+
/// \param content Content of the manifest.
8389
/// \return A Result containing the writer or an error.
8490
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
8591
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
8692
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
87-
std::shared_ptr<PartitionSpec> partition_spec);
93+
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content);
8894

8995
private:
9096
static constexpr int64_t kBatchSize = 1024;
@@ -114,6 +120,9 @@ class ICEBERG_EXPORT ManifestListWriter {
114120
/// \brief Close writer and flush to storage.
115121
Status Close();
116122

123+
/// \brief Get the next row id to assign.
124+
std::optional<int64_t> next_row_id() const;
125+
117126
/// \brief Creates a writer for the v1 manifest list.
118127
/// \param snapshot_id ID of the snapshot.
119128
/// \param parent_snapshot_id ID of the parent snapshot.
@@ -146,7 +155,7 @@ class ICEBERG_EXPORT ManifestListWriter {
146155
/// \return A Result containing the writer or an error.
147156
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
148157
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
149-
int64_t sequence_number, std::optional<int64_t> first_row_id,
158+
int64_t sequence_number, int64_t first_row_id,
150159
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
151160

152161
private:

0 commit comments

Comments
 (0)