Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,32 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::move(writer), std::move(adapter), manifest_location, first_row_id));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
std::optional<ManifestContent> content, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema));
case 2:
ICEBERG_PRECHECK(content.has_value(),
"ManifestContent is required for format version 2");
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema),
content.value());
case 3:
ICEBERG_PRECHECK(content.has_value(),
"ManifestContent is required for format version 3");
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
std::move(file_io), std::move(partition_spec),
std::move(current_schema), content.value());
default:
return NotSupported("Format version {} is not supported", format_version);
}
}

ManifestListWriter::ManifestListWriter(std::unique_ptr<Writer> writer,
std::unique_ptr<ManifestFileAdapter> adapter)
: writer_(std::move(writer)), adapter_(std::move(adapter)) {}
Expand Down Expand Up @@ -452,4 +478,30 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
int8_t format_version, int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io,
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, parent_snapshot_id, manifest_list_location,
std::move(file_io));
case 2:
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 2");
return MakeV2Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
manifest_list_location, std::move(file_io));
case 3:
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 3");
ICEBERG_PRECHECK(first_row_id.has_value(),
"First row ID is required for format version 3");
return MakeV3Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
first_row_id.value(), manifest_list_location,
std::move(file_io));
default:
return NotSupported("Format version {} is not supported", format_version);
}
}

} // namespace iceberg
38 changes: 38 additions & 0 deletions src/iceberg/manifest/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,26 @@ class ICEBERG_EXPORT ManifestWriter {
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

/// \brief Factory function to create a writer for a manifest file based on format
/// version.
/// \param format_version The format version (1, 2, 3, etc.).
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Schema containing the source fields referenced by partition
/// spec.
/// \param content Content of the manifest (required for format_version >= 2).
/// \param first_row_id First row ID of the snapshot (required for format_version >= 3).
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
std::optional<ManifestContent> content = std::nullopt,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that ManifestContent content = ManifestContent::kData is a better option?

std::optional<int64_t> first_row_id = std::nullopt);

private:
// Private constructor for internal use only, use the static Make*Writer methods
// instead.
Expand Down Expand Up @@ -240,6 +260,24 @@ class ICEBERG_EXPORT ManifestListWriter {
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);

/// \brief Factory function to create a writer for the manifest list based on format
/// version.
/// \param format_version The format version (1, 2, 3, etc.).
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
/// \param sequence_number Sequence number of the snapshot (required for format_version
/// >= 2).
/// \param first_row_id First row ID of the snapshot (required for format_version >= 3).
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeWriter(
int8_t format_version, int64_t snapshot_id,
std::optional<int64_t> parent_snapshot_id, std::string_view manifest_list_location,
std::shared_ptr<FileIO> file_io,
std::optional<int64_t> sequence_number = std::nullopt,
std::optional<int64_t> first_row_id = std::nullopt);

private:
// Private constructor for internal use only, use the static Make*Writer methods
// instead.
Expand Down
14 changes: 3 additions & 11 deletions src/iceberg/test/delete_file_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,9 @@ class DeleteFileIndexTest : public testing::TestWithParam<int> {
std::shared_ptr<PartitionSpec> spec) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 2) {
writer_result = ManifestWriter::MakeV2Writer(
snapshot_id, manifest_path, file_io_, spec, schema_, ManifestContent::kDeletes);
} else if (format_version == 3) {
writer_result = ManifestWriter::MakeV3Writer(
snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, spec,
schema_, ManifestContent::kDeletes);
}
auto writer_result = ManifestWriter::MakeWriter(
format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
ManifestContent::kDeletes, /*first_row_id=*/std::nullopt);

EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
Expand Down
53 changes: 11 additions & 42 deletions src/iceberg/test/manifest_group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,10 @@ class ManifestGroupTest : public testing::TestWithParam<int> {
std::shared_ptr<PartitionSpec> spec) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 1) {
writer_result = ManifestWriter::MakeV1Writer(snapshot_id, manifest_path, file_io_,
spec, schema_);
} else if (format_version == 2) {
writer_result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, file_io_,
spec, schema_, ManifestContent::kData);
} else if (format_version == 3) {
writer_result =
ManifestWriter::MakeV3Writer(snapshot_id, /*first_row_id=*/0L, manifest_path,
file_io_, spec, schema_, ManifestContent::kData);
}

auto writer_result =
ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
spec, schema_, ManifestContent::kData,
/*first_row_id=*/0L);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

Expand All @@ -168,18 +157,10 @@ class ManifestGroupTest : public testing::TestWithParam<int> {
std::shared_ptr<PartitionSpec> spec) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 2) {
writer_result = ManifestWriter::MakeV2Writer(
snapshot_id, manifest_path, file_io_, spec, schema_, ManifestContent::kDeletes);
} else if (format_version == 3) {
writer_result = ManifestWriter::MakeV3Writer(
snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, spec,
schema_, ManifestContent::kDeletes);
}

auto writer_result =
ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
spec, schema_, ManifestContent::kDeletes,
/*first_row_id=*/std::nullopt);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

Expand Down Expand Up @@ -213,21 +194,9 @@ class ManifestGroupTest : public testing::TestWithParam<int> {
constexpr int64_t kParentSnapshotId = 0L;
constexpr int64_t kSnapshotFirstRowId = 0L;

Result<std::unique_ptr<ManifestListWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 1) {
writer_result = ManifestListWriter::MakeV1Writer(snapshot_id, kParentSnapshotId,
manifest_list_path, file_io_);
} else if (format_version == 2) {
writer_result = ManifestListWriter::MakeV2Writer(
snapshot_id, kParentSnapshotId, sequence_number, manifest_list_path, file_io_);
} else if (format_version == 3) {
writer_result = ManifestListWriter::MakeV3Writer(
snapshot_id, kParentSnapshotId, sequence_number, kSnapshotFirstRowId,
manifest_list_path, file_io_);
}

auto writer_result = ManifestListWriter::MakeWriter(
format_version, snapshot_id, kParentSnapshotId, manifest_list_path, file_io_,
sequence_number, kSnapshotFirstRowId);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
EXPECT_THAT(writer->Add(manifest), IsOk());
Expand Down
26 changes: 6 additions & 20 deletions src/iceberg/test/manifest_list_versions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,9 @@ class TestManifestListVersions : public ::testing::Test {
const std::string manifest_list_path = CreateManifestListPath();
constexpr int64_t kParentSnapshotId = kSnapshotId - 1;

Result<std::unique_ptr<ManifestListWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 1) {
writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, kParentSnapshotId,
manifest_list_path, file_io_);
} else if (format_version == 2) {
writer_result = ManifestListWriter::MakeV2Writer(
kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path, file_io_);
} else if (format_version == 3) {
writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId, kParentSnapshotId,
kSeqNum, kSnapshotFirstRowId,
manifest_list_path, file_io_);
}

auto writer_result = ManifestListWriter::MakeWriter(
format_version, kSnapshotId, kParentSnapshotId, manifest_list_path, file_io_,
kSeqNum, kSnapshotFirstRowId);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

Expand Down Expand Up @@ -202,11 +190,9 @@ class TestManifestListVersions : public ::testing::Test {
TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) {
const std::string manifest_list_path = CreateManifestListPath();

auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, kSnapshotId - 1,
manifest_list_path, file_io_);
EXPECT_THAT(writer_result, IsOk());

auto writer = std::move(writer_result.value());
ICEBERG_UNWRAP_OR_FAIL(auto writer,
ManifestListWriter::MakeV1Writer(kSnapshotId, kSnapshotId - 1,
manifest_list_path, file_io_));
auto status = writer->Add(kDeleteManifest);

EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList));
Expand Down
23 changes: 3 additions & 20 deletions src/iceberg/test/manifest_reader_stats_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,9 @@ class TestManifestReaderStats : public testing::TestWithParam<int> {
ManifestFile WriteManifest(int format_version, std::unique_ptr<DataFile> data_file) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

switch (format_version) {
case 1:
writer_result = ManifestWriter::MakeV1Writer(/*snapshot_id=*/1000L, manifest_path,
file_io_, spec_, schema_);
break;
case 2:
writer_result =
ManifestWriter::MakeV2Writer(/*snapshot_id=*/1000L, manifest_path, file_io_,
spec_, schema_, ManifestContent::kData);
break;
case 3:
writer_result = ManifestWriter::MakeV3Writer(
/*snapshot_id=*/1000L, /*sequence_number=*/0L, manifest_path, file_io_, spec_,
schema_, ManifestContent::kData);
break;
}

auto writer_result = ManifestWriter::MakeWriter(
format_version, /*snapshot_id=*/1000L, manifest_path, file_io_, spec_, schema_,
ManifestContent::kData, /*first_row_id=*/0L);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

Expand Down
38 changes: 8 additions & 30 deletions src/iceberg/test/manifest_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,10 @@ class TestManifestReader : public testing::TestWithParam<int> {
const std::vector<ManifestEntry>& entries) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

switch (format_version) {
case 1:
writer_result = ManifestWriter::MakeV1Writer(snapshot_id, manifest_path, file_io_,
spec_, schema_);
break;
case 2:
writer_result = ManifestWriter::MakeV2Writer(
snapshot_id, manifest_path, file_io_, spec_, schema_, ManifestContent::kData);
break;
case 3:
writer_result = ManifestWriter::MakeV3Writer(snapshot_id, /*first_row_id=*/0L,
manifest_path, file_io_, spec_,
schema_, ManifestContent::kData);
break;
}
auto writer_result =
ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
spec_, schema_, ManifestContent::kData,
/*first_row_id=*/0L);

EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
Expand Down Expand Up @@ -152,18 +138,10 @@ class TestManifestReader : public testing::TestWithParam<int> {
std::vector<ManifestEntry> entries) {
const std::string manifest_path = MakeManifestPath();

Result<std::unique_ptr<ManifestWriter>> writer_result =
NotSupported("Format version: {}", format_version);

if (format_version == 2) {
writer_result =
ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, file_io_, spec_,
schema_, ManifestContent::kDeletes);
} else if (format_version == 3) {
writer_result = ManifestWriter::MakeV3Writer(
snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, spec_,
schema_, ManifestContent::kDeletes);
}
auto writer_result =
ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
spec_, schema_, ManifestContent::kDeletes,
/*first_row_id=*/std::nullopt);

EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
Expand Down
Loading
Loading