diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 013fa0007..f9e975946 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -44,6 +44,9 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + manifest_reader.cc + manifest_reader_internal.cc + arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc util/gzip_internal.cc) diff --git a/src/iceberg/arrow_c_data.h b/src/iceberg/arrow_c_data.h index 7a4618ecd..55d708b23 100644 --- a/src/iceberg/arrow_c_data.h +++ b/src/iceberg/arrow_c_data.h @@ -73,4 +73,45 @@ struct ArrowArray { #endif // ARROW_C_DATA_INTERFACE +#ifndef ARROW_C_STREAM_INTERFACE +# define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE + } // extern "C" diff --git a/src/iceberg/arrow_c_data_guard_internal.cc b/src/iceberg/arrow_c_data_guard_internal.cc new file mode 100644 index 000000000..5fb3f9fa5 --- /dev/null +++ b/src/iceberg/arrow_c_data_guard_internal.cc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow_c_data_guard_internal.h" + +namespace iceberg::internal { + +ArrowArrayGuard::~ArrowArrayGuard() { + if (array_ != nullptr) { + ArrowArrayRelease(array_); + } +} + +ArrowSchemaGuard::~ArrowSchemaGuard() { + if (schema_ != nullptr) { + ArrowSchemaRelease(schema_); + } +} + +ArrowArrayViewGuard::~ArrowArrayViewGuard() { + if (view_ != nullptr) { + ArrowArrayViewReset(view_); + } +} + +ArrowArrayBufferGuard::~ArrowArrayBufferGuard() { + if (buffer_ != nullptr) { + ArrowBufferReset(buffer_); + } +} + +} // namespace iceberg::internal diff --git a/src/iceberg/arrow_c_data_guard_internal.h b/src/iceberg/arrow_c_data_guard_internal.h new file mode 100644 index 000000000..8bce14e57 --- /dev/null +++ b/src/iceberg/arrow_c_data_guard_internal.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/arrow_c_data.h" + +namespace iceberg::internal { + +class ArrowArrayGuard { + public: + explicit ArrowArrayGuard(ArrowArray* array) : array_(array) {} + ~ArrowArrayGuard(); + + private: + ArrowArray* array_; +}; + +class ArrowSchemaGuard { + public: + explicit ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {} + ~ArrowSchemaGuard(); + + private: + ArrowSchema* schema_; +}; + +class ArrowArrayViewGuard { + public: + explicit ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {} + ~ArrowArrayViewGuard(); + + private: + ArrowArrayView* view_; +}; + +class ArrowArrayBufferGuard { + public: + explicit ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {} + ~ArrowArrayBufferGuard(); + + private: + ArrowBuffer* buffer_; +}; + +} // namespace iceberg::internal diff --git a/src/iceberg/manifest_list.cc b/src/iceberg/manifest_list.cc index e85c42140..b9907604f 100644 --- a/src/iceberg/manifest_list.cc +++ b/src/iceberg/manifest_list.cc @@ -19,9 +19,7 @@ #include "iceberg/manifest_list.h" -#include - -#include "iceberg/type.h" +#include "iceberg/schema.h" namespace iceberg { diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc new file mode 100644 index 000000000..208263d57 --- /dev/null +++ b/src/iceberg/manifest_reader.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_reader.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ManifestReader::MakeReader( + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ReaderFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(manifest_location), + .io = std::move(file_io), + .projection = schema})); + return std::make_unique(std::move(reader), std::move(schema)); +} + +Result> ManifestListReader::MakeReader( + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(manifest_list_location), + .io = std::move(file_io), + .projection = schema})); + return std::make_unique(std::move(reader), std::move(schema)); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 5e1cb6c45..5d231de0f 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -23,7 +23,7 @@ /// Data reader interface for manifest files. #include -#include +#include #include "iceberg/file_reader.h" #include "iceberg/iceberg_export.h" @@ -35,35 +35,29 @@ namespace iceberg { class ICEBERG_EXPORT ManifestReader { public: virtual ~ManifestReader() = default; - virtual Result>> Entries() const = 0; - - private: - std::unique_ptr reader_; + virtual Result> Entries() const = 0; + + /// \brief Creates a reader for a manifest file. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the reader or an error. + static Result> MakeReader( + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema); }; /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: virtual ~ManifestListReader() = default; - virtual Result>> Files() const = 0; - - private: - std::unique_ptr reader_; + virtual Result> Files() const = 0; + + /// \brief Creates a reader for the manifest list. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the reader or an error. + static Result> MakeReader( + std::string_view manifest_list_location, std::shared_ptr file_io); }; -/// \brief Creates a reader for the manifest list. -/// \param file_path Path to the manifest list file. -/// \return A Result containing the reader or an error. -Result> CreateManifestListReader( - std::string_view file_path) { - return NotImplemented("CreateManifestListReader is not implemented yet."); -} - -/// \brief Creates a reader for a manifest file. -/// \param file_path Path to the manifest file. -/// \return A Result containing the reader or an error. -Result> CreateManifestReader(std::string_view file_path) { - return NotImplemented("CreateManifestReader is not implemented yet."); -} - } // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc new file mode 100644 index 000000000..92d51c631 --- /dev/null +++ b/src/iceberg/manifest_reader_internal.cc @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "manifest_reader_internal.h" + +#include + +#include + +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +#define NANOARROW_RETURN_IF_NOT_OK(status, error) \ + if (status != NANOARROW_OK) [[unlikely]] { \ + return InvalidArrowData("Nanoarrow error: {}", error.message); \ + } + +Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, + std::vector& manifest_files) { + auto manifest_count = view_of_column->length; + // view_of_column is list> + if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) { + return InvalidManifestList("partitions field should be a list."); + } + auto view_of_list_iterm = view_of_column->children[0]; + // view_of_list_iterm is struct + if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifestList("partitions list field should be a list."); + } + if (view_of_list_iterm->n_children != 4) { + return InvalidManifestList("PartitionFieldSummary should have 4 fields."); + } + if (view_of_list_iterm->children[0]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidManifestList("contains_null should have be bool type column."); + } + auto contains_null = view_of_list_iterm->children[0]; + if (view_of_list_iterm->children[1]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidManifestList("contains_nan should have be bool type column."); + } + auto contains_nan = view_of_list_iterm->children[1]; + if (view_of_list_iterm->children[2]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidManifestList("lower_bound should have be binary type column."); + } + auto lower_bound_list = view_of_list_iterm->children[2]; + if (view_of_list_iterm->children[3]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidManifestList("upper_bound should have be binary type column."); + } + auto upper_bound_list = view_of_list_iterm->children[3]; + for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) { + auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx); + auto next_offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1); + // partitions from offset to next_offset belongs to manifest_idx + auto& manifest_file = manifest_files[manifest_idx]; + for (int64_t partition_idx = offset; partition_idx < next_offset; partition_idx++) { + PartitionFieldSummary partition_field_summary; + if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { + partition_field_summary.contains_null = + ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); + } + if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { + partition_field_summary.contains_nan = + ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); + } + if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); + partition_field_summary.lower_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); + partition_field_summary.upper_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + + manifest_file.partitions.emplace_back(partition_field_summary); + } + } + return {}; +} + +Result> ParseManifestListEntry(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { + if (schema->n_children != array_in->n_children) { + return InvalidManifestList("Columns size not match between schema:{} and array:{}", + schema->n_children, array_in->n_children); + } + if (iceberg_schema.fields().size() != array_in->n_children) { + return InvalidManifestList("Columns size not match between schema:{} and array:{}", + iceberg_schema.fields().size(), array_in->n_children); + } + + ArrowError error; + ArrowArrayView array_view; + auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + internal::ArrowArrayViewGuard view_guard(&array_view); + status = ArrowArrayViewSetArray(&array_view, array_in, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); + NANOARROW_RETURN_IF_NOT_OK(status, error); + + std::vector manifest_files; + manifest_files.resize(array_in->length); + + for (int64_t idx = 0; idx < array_in->n_children; idx++) { + const auto& field = iceberg_schema.GetFieldByIndex(idx); + if (!field.has_value()) { + return InvalidSchema("Field index {} is not found in schema", idx); + } + auto field_name = field.value().get().name(); + bool required = !field.value().get().optional(); + auto view_of_column = array_view.children[idx]; + +#define PARSE_PRIMITIVE_FIELD(item, type) \ + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ + manifest_files[row_idx].item = static_cast(value); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ + } + + switch (idx) { + case 0: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); + std::string path_str(value.data, value.size_bytes); + manifest_files[row_idx].manifest_path = path_str; + } + } + break; + case 1: + PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); + break; + case 2: + PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); + break; + case 3: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); + manifest_files[row_idx].content = static_cast(value); + } + } + break; + case 4: + PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); + break; + case 5: + PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); + break; + case 6: + PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); + break; + case 7: + PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); + break; + case 8: + PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); + break; + case 9: + PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); + break; + case 10: + PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); + break; + case 11: + PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); + break; + case 12: + PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); + break; + case 13: + ICEBERG_RETURN_UNEXPECTED( + ParsePartitionFieldSummaryList(view_of_column, manifest_files)); + break; + case 14: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); + manifest_files[row_idx].key_metadata = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + } + break; + case 15: + PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); + break; + default: + return InvalidManifestList("Unsupported type: {}", field_name); + } + } + return manifest_files; +} + +Result> ManifestReaderImpl::Entries() const { return {}; } + +Result> ManifestListReaderImpl::Files() const { + std::vector manifest_files; + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); + while (true) { + auto result = reader_->Next(); + if (!result.has_value()) { + return InvalidManifestList("Failed to read manifest list entry:{}", + result.error().message); + } + if (result.value().has_value()) { + internal::ArrowArrayGuard array_guard(&result.value().value()); + ICEBERG_ASSIGN_OR_RAISE( + auto entries, + ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_)); + manifest_files.insert(manifest_files.end(), + std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); + } else { + // eof + break; + } + } + return manifest_files; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h new file mode 100644 index 000000000..a367ede2f --- /dev/null +++ b/src/iceberg/manifest_reader_internal.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/internal/manifest_reader_internal.h +/// Reader implementation for manifest list files and manifest files. + +#include "iceberg/manifest_reader.h" + +namespace iceberg { + +/// \brief Read manifest entries from a manifest file. +class ManifestReaderImpl : public ManifestReader { + public: + explicit ManifestReaderImpl(std::unique_ptr reader, + std::shared_ptr schema) + : schema_(std::move(schema)), reader_(std::move(reader)) {} + + Result> Entries() const override; + + private: + std::shared_ptr schema_; + std::unique_ptr reader_; +}; + +/// \brief Read manifest files from a manifest list file. +class ManifestListReaderImpl : public ManifestListReader { + public: + explicit ManifestListReaderImpl(std::unique_ptr reader, + std::shared_ptr schema) + : schema_(std::move(schema)), reader_(std::move(reader)) {} + + Result> Files() const override; + + private: + std::shared_ptr schema_; + std::unique_ptr reader_; +}; + +} // namespace iceberg diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 281c8023f..d1aa4cedd 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -36,6 +36,8 @@ enum class ErrorKind { kInvalidArrowData, kInvalidExpression, kInvalidSchema, + kInvalidManifest, + kInvalidManifestList, kIOError, kJsonParseError, kNoSuchNamespace, @@ -81,6 +83,8 @@ DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidArrowData) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) +DEFINE_ERROR_FUNCTION(InvalidManifest) +DEFINE_ERROR_FUNCTION(InvalidManifestList) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NoSuchNamespace) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 45539ef83..8bc0a363d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -161,23 +161,26 @@ DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr f : TableScan(std::move(context), std::move(file_io)) {} Result>> DataTableScan::PlanFiles() const { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, - CreateManifestListReader(context_.snapshot->manifest_list)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_list_reader, + ManifestListReader::MakeReader(context_.snapshot->manifest_list, file_io_)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); std::vector> tasks; for (const auto& manifest_file : manifest_files) { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, - CreateManifestReader(manifest_file->manifest_path)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_reader, + ManifestReader::MakeReader(manifest_file.manifest_path, file_io_, + /* TODO(xiao.dong) partition schema*/ nullptr)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); // TODO(gty404): filter manifests using partition spec and filter expression for (auto& manifest_entry : manifests) { - const auto& data_file = manifest_entry->data_file; + const auto& data_file = manifest_entry.data_file; switch (data_file->content) { case DataFile::Content::kData: - tasks.emplace_back(std::make_shared(manifest_entry->data_file)); + tasks.emplace_back(std::make_shared(manifest_entry.data_file)); break; case DataFile::Content::kPositionDeletes: case DataFile::Content::kEqualityDeletes: diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a565cb64a..8d73961b6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -77,10 +77,16 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc - avro_stream_test.cc) + target_sources(avro_test + PRIVATE avro_data_test.cc + avro_test.cc + avro_schema_test.cc + avro_stream_test.cc + manifest_list_reader_test.cc + test_common.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) + target_include_directories(avro_test PRIVATE "${CMAKE_BINARY_DIR}") add_test(NAME avro_test COMMAND avro_test) add_executable(arrow_test) diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc new file mode 100644 index 000000000..f825a589d --- /dev/null +++ b/test/manifest_list_reader_test.cc @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/schema.h" +#include "matchers.h" +#include "temp_file_test_base.h" +#include "test_common.h" + +namespace iceberg { + +class ManifestListReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { avro::AvroReader::Register(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; +}; + +TEST_F(ManifestListReaderTest, BasicTest) { + std::string path = GetResourcePath( + "snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro"); + auto manifest_reader_result = ManifestListReader::MakeReader(path, file_io_); + ASSERT_EQ(manifest_reader_result.has_value(), true); + auto manifest_reader = std::move(manifest_reader_result.value()); + auto read_result = manifest_reader->Files(); + ASSERT_EQ(read_result.has_value(), true); + ASSERT_EQ(read_result.value().size(), 4); + std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/"; + for (const auto& file : read_result.value()) { + auto manifest_path = file.manifest_path.substr(test_dir_prefix.size()); + if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") { + ASSERT_EQ(file.added_snapshot_id, 7412193043800610213); + ASSERT_EQ(file.manifest_length, 7433); + ASSERT_EQ(file.sequence_number, 4); + ASSERT_EQ(file.min_sequence_number, 4); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({'x', ';', 0x07, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({'x', ';', 0x07, 0x00})); + } else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") { + ASSERT_EQ(file.added_snapshot_id, 5485972788975780755); + ASSERT_EQ(file.manifest_length, 7431); + ASSERT_EQ(file.sequence_number, 3); + ASSERT_EQ(file.min_sequence_number, 3); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({'(', 0x19, 0x07, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({'(', 0x19, 0x07, 0x00})); + } else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") { + ASSERT_EQ(file.added_snapshot_id, 1679468743751242972); + ASSERT_EQ(file.manifest_length, 7433); + ASSERT_EQ(file.sequence_number, 2); + ASSERT_EQ(file.min_sequence_number, 2); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({0xd0, 0xd4, 0x06, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({0xd0, 0xd4, 0x06, 0x00})); + } else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") { + ASSERT_EQ(file.added_snapshot_id, 1579605567338877265); + ASSERT_EQ(file.manifest_length, 7431); + ASSERT_EQ(file.sequence_number, 1); + ASSERT_EQ(file.min_sequence_number, 1); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({0xb8, 0xd4, 0x06, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({0xb8, 0xd4, 0x06, 0x00})); + } else { + ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path; + } + ASSERT_EQ(file.partition_spec_id, 0); + ASSERT_EQ(file.content, ManifestFile::Content::kData); + ASSERT_EQ(file.added_files_count, 1); + ASSERT_EQ(file.existing_files_count, 0); + ASSERT_EQ(file.deleted_files_count, 0); + ASSERT_EQ(file.added_rows_count, 1); + ASSERT_EQ(file.existing_rows_count, 0); + ASSERT_EQ(file.deleted_rows_count, 0); + ASSERT_EQ(file.key_metadata.empty(), true); + } +} + +} // namespace iceberg diff --git a/test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro b/test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro new file mode 100644 index 000000000..c22993917 Binary files /dev/null and b/test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro differ