diff --git a/be/src/util/deletion_vector.h b/be/src/util/deletion_vector.h deleted file mode 100644 index 63908352700e00..00000000000000 --- a/be/src/util/deletion_vector.h +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "common/status.h" -#include "roaring/roaring.hh" - -namespace doris { -class DeletionVector { -public: - const static uint32_t MAGIC_NUMBER = 1581511376; - DeletionVector(roaring::Roaring roaring_bitmap) : _roaring_bitmap(std::move(roaring_bitmap)) {}; - ~DeletionVector() = default; - - bool checked_delete(uint32_t postition) { return _roaring_bitmap.addChecked(postition); } - - bool is_delete(uint32_t postition) const { return _roaring_bitmap.contains(postition); } - - bool is_empty() const { return _roaring_bitmap.isEmpty(); } - - uint32_t maximum() const { return _roaring_bitmap.maximum(); } - - uint32_t minimum() const { return _roaring_bitmap.minimum(); } - - static Result deserialize(const char* buf, size_t length) { - uint32_t actual_length; - std::memcpy(reinterpret_cast(&actual_length), buf, 4); - // change byte order to big endian - std::reverse(reinterpret_cast(&actual_length), - reinterpret_cast(&actual_length) + 4); - buf += 4; - if (actual_length != length - 4) { - return ResultError( - Status::RuntimeError("DeletionVector deserialize error: length not match, " - "actual length: {}, expect length: {}", - actual_length, length - 4)); - } - uint32_t magic_number; - std::memcpy(reinterpret_cast(&magic_number), buf, 4); - // change byte order to big endian - std::reverse(reinterpret_cast(&magic_number), - reinterpret_cast(&magic_number) + 4); - buf += 4; - if (magic_number != MAGIC_NUMBER) { - return ResultError(Status::RuntimeError( - "DeletionVector deserialize error: invalid magic number {}", magic_number)); - } - roaring::Roaring roaring_bitmap; - try { - roaring_bitmap = roaring::Roaring::readSafe(buf, length); - } catch (std::runtime_error&) { - return ResultError(Status::RuntimeError( - "DeletionVector deserialize error: failed to deserialize roaring bitmap")); - } - return DeletionVector(roaring_bitmap); - } - -private: - roaring::Roaring _roaring_bitmap; -}; -} // namespace doris diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 3ca7a37f143f01..617b0b0d447565 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -182,7 +182,7 @@ class OrcReader : public GenericReader { Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; - void set_position_delete_rowids(std::vector* delete_rows) { + void set_position_delete_rowids(const std::vector* delete_rows) { _position_delete_ordered_rowids = delete_rows; } @@ -704,7 +704,7 @@ class OrcReader : public GenericReader { std::unordered_map> _converters; //support iceberg position delete . - std::vector* _position_delete_ordered_rowids = nullptr; + const std::vector* _position_delete_ordered_rowids = nullptr; std::unordered_map _vslot_ref_to_orc_predicate_data_type; std::unordered_map _vliteral_to_orc_literal; diff --git a/be/src/vec/exec/format/table/deletion_vector_reader.cpp b/be/src/vec/exec/format/table/deletion_vector_reader.cpp new file mode 100644 index 00000000000000..49d31479e6d8e9 --- /dev/null +++ b/be/src/vec/exec/format/table/deletion_vector_reader.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "deletion_vector_reader.h" + +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "util/block_compression.h" + +namespace doris { +namespace vectorized { +Status DeletionVectorReader::open() { + if (_is_opened) [[unlikely]] { + return Status::OK(); + } + + _init_system_properties(); + _init_file_description(); + RETURN_IF_ERROR(_create_file_reader()); + + _file_size = _file_reader->size(); + _is_opened = true; + return Status::OK(); +} + +Status DeletionVectorReader::read_at(size_t offset, Slice result) { + if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { + return Status::EndOfFile("stop read."); + } + size_t bytes_read = 0; + RETURN_IF_ERROR(_file_reader->read_at(offset, result, &bytes_read, _io_ctx)); + if (bytes_read != result.size) [[unlikely]] { + return Status::IOError("Failed to read fully at offset {}, expected {}, got {}", offset, + result.size, bytes_read); + } + return Status::OK(); +} + +Status DeletionVectorReader::_create_file_reader() { + if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { + return Status::EndOfFile("stop read."); + } + + _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; + io::FileReaderOptions reader_options = + FileFactory::get_reader_options(_state, _file_description); + _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + _profile, _system_properties, _file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + return Status::OK(); +} + +void DeletionVectorReader::_init_file_description() { + _file_description.path = _range.path; + _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1; + if (_range.__isset.fs_name) { + _file_description.fs_name = _range.fs_name; + } +} + +void DeletionVectorReader::_init_system_properties() { + if (_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _range.file_type; + } else { + _system_properties.system_type = _params.file_type; + } + _system_properties.properties = _params.properties; + _system_properties.hdfs_params = _params.hdfs_params; + if (_params.__isset.broker_addresses) { + _system_properties.broker_addresses.assign(_params.broker_addresses.begin(), + _params.broker_addresses.end()); + } +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/format/table/deletion_vector_reader.h b/be/src/vec/exec/format/table/deletion_vector_reader.h new file mode 100644 index 00000000000000..96edb14a9f2e0d --- /dev/null +++ b/be/src/vec/exec/format/table/deletion_vector_reader.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "common/status.h" +#include "io/file_factory.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/file_reader.h" +#include "roaring/roaring64map.hh" +#include "util/profile_collector.h" +#include "util/slice.h" +#include "vec/exec/format/generic_reader.h" + +namespace io { +struct IOContext; +} // namespace io + +namespace doris { +namespace vectorized { +class DeletionVectorReader { + ENABLE_FACTORY_CREATOR(DeletionVectorReader); + +public: + DeletionVectorReader(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx) + : _state(state), _profile(profile), _range(range), _params(params), _io_ctx(io_ctx) {} + ~DeletionVectorReader() = default; + Status open(); + Status read_at(size_t offset, Slice result); + +private: + void _init_system_properties(); + void _init_file_description(); + Status _create_file_reader(); + +private: + RuntimeState* _state = nullptr; + RuntimeProfile* _profile = nullptr; + const TFileRangeDesc& _range; + const TFileScanRangeParams& _params; + io::IOContext* _io_ctx = nullptr; + + io::FileSystemProperties _system_properties; + io::FileDescription _file_description; + io::FileReaderSPtr _file_reader; + int64_t _file_size = 0; + bool _is_opened = false; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 8e15f7f6e4b13b..7bda260f4c9501 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -23,11 +23,9 @@ #include #include #include -#include #include #include -#include #include #include #include @@ -37,8 +35,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/runtime_state.h" -#include "runtime/types.h" -#include "util/string_util.h" +#include "util/coding.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" @@ -53,6 +50,7 @@ #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/schema_desc.h" #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" +#include "vec/exec/format/table/deletion_vector_reader.h" #include "vec/exec/format/table/iceberg/iceberg_orc_nested_column_utils.h" #include "vec/exec/format/table/iceberg/iceberg_parquet_nested_column_utils.h" #include "vec/exec/format/table/nested_column_access_helper.h" @@ -96,6 +94,8 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); + _iceberg_profile.parse_delete_file_time = + ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile); } Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { @@ -124,24 +124,42 @@ Status IcebergTableReader::init_row_filters() { std::vector position_delete_files; std::vector equality_delete_files; + std::vector deletion_vector_files; for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { if (desc.content == POSITION_DELETE) { position_delete_files.emplace_back(desc); } else if (desc.content == EQUALITY_DELETE) { equality_delete_files.emplace_back(desc); + } else if (desc.content == DELETION_VECTOR) { + deletion_vector_files.emplace_back(desc); } } - if (!position_delete_files.empty()) { - RETURN_IF_ERROR( - _position_delete_base(table_desc.original_file_path, position_delete_files)); - _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); - } if (!equality_delete_files.empty()) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } + if (!deletion_vector_files.empty()) { + if (deletion_vector_files.size() != 1) [[unlikely]] { + /* + * Deletion vectors are a binary representation of deletes for a single data file that is more efficient + * at execution time than position delete files. Unlike equality or position delete files, there can be + * at most one deletion vector for a given data file in a snapshot. + */ + return Status::DataQualityError("This iceberg data file has multiple DVs."); + } + RETURN_IF_ERROR( + read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); + + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + // Readers can safely ignore position delete files if there is a DV for a data file. + } else if (!position_delete_files.empty()) { + RETURN_IF_ERROR( + _position_delete_base(table_desc.original_file_path, position_delete_files)); + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + } + COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); return Status::OK(); } @@ -307,10 +325,24 @@ Status IcebergTableReader::_position_delete_base( }; delete_file_map.if_contains(data_file_path, get_value); } + // Use a KV cache to store the delete rows corresponding to a data file path. + // The Parquet/ORC reader holds a reference (pointer) to this cached entry. + // This allows delete rows to be reused when a single data file is split into + // multiple splits, avoiding excessive memory usage when delete rows are large. if (num_delete_rows > 0) { SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); - _sort_delete_rows(delete_rows_array, num_delete_rows); - this->set_delete_rows(); + _iceberg_delete_rows = + _kv_cache->get(data_file_path, + [&]() -> DeleteRows* { + auto* data_file_position_delete = new DeleteRows; + _sort_delete_rows(delete_rows_array, num_delete_rows, + *data_file_position_delete); + + return data_file_position_delete; + } + + ); + set_delete_rows(); COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); } return Status::OK(); @@ -357,29 +389,35 @@ IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range( return range; } -void IcebergTableReader::_sort_delete_rows(std::vector*>& delete_rows_array, - int64_t num_delete_rows) { +/** + * https://iceberg.apache.org/spec/#position-delete-files + * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. + * Sorting by file_path allows filter pushdown by file in columnar storage formats. + * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. + */ +void IcebergTableReader::_sort_delete_rows( + const std::vector*>& delete_rows_array, int64_t num_delete_rows, + std::vector& result) { if (delete_rows_array.empty()) { return; } if (delete_rows_array.size() == 1) { - _iceberg_delete_rows.resize(num_delete_rows); - memcpy(_iceberg_delete_rows.data(), delete_rows_array.front()->data(), - sizeof(int64_t) * num_delete_rows); + result.resize(num_delete_rows); + memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); return; } if (delete_rows_array.size() == 2) { - _iceberg_delete_rows.resize(num_delete_rows); + result.resize(num_delete_rows); std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), delete_rows_array.back()->begin(), delete_rows_array.back()->end(), - _iceberg_delete_rows.begin()); + result.begin()); return; } using vec_pair = std::pair::iterator, std::vector::iterator>; - _iceberg_delete_rows.resize(num_delete_rows); - auto row_id_iter = _iceberg_delete_rows.begin(); - auto iter_end = _iceberg_delete_rows.end(); + result.resize(num_delete_rows); + auto row_id_iter = result.begin(); + auto iter_end = result.end(); std::vector rows_array; for (auto* rows : delete_rows_array) { if (!rows->empty()) { @@ -408,6 +446,7 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, size_t read_rows, bool file_path_column_dictionary_coded) { + SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); // todo: maybe do not need to build name to index map every time auto name_to_pos_map = block.get_name_to_pos_map(); ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; @@ -744,5 +783,94 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete return Status::OK(); } +// Directly read the deletion vector using the `content_offset` and +// `content_size_in_bytes` provided by FE in `delete_file_desc`. +// These two fields indicate the location of a blob in storage. +// Since the current format is `deletion-vector-v1`, which does not +// compress any blobs, we can temporarily skip parsing the Puffin footer. +Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path, + const TIcebergDeleteFileDesc& delete_file_desc) { + Status create_status = Status::OK(); + SCOPED_TIMER(_iceberg_profile.delete_files_read_time); + _iceberg_delete_rows = _kv_cache->get(data_file_path, [&]() -> DeleteRows* { + auto* delete_rows = new DeleteRows; + + TFileRangeDesc delete_range; + // must use __set() method to make sure __isset is true + delete_range.__set_fs_name(_range.fs_name); + delete_range.path = delete_file_desc.path; + delete_range.start_offset = delete_file_desc.content_offset; + delete_range.size = delete_file_desc.content_size_in_bytes; + delete_range.file_size = -1; + + // We may consider caching the DeletionVectorReader when reading Puffin files, + // where the underlying reader is an `InMemoryFileReader` and a single data file is + // split into multiple splits. However, we need to ensure that the underlying + // reader supports multi-threaded access. + DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx); + create_status = dv_reader.open(); + if (!create_status.ok()) [[unlikely]] { + return nullptr; + } + + size_t buffer_size = delete_range.size; + std::vector buf(buffer_size); + if (buffer_size < 12) [[unlikely]] { + // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32 + create_status = Status::DataQualityError("Deletion vector file size too small: {}", + buffer_size); + return nullptr; + } + + create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); + if (!create_status) [[unlikely]] { + return nullptr; + } + // The serialized blob contains: + // + // Combined length of the vector and magic bytes stored as 4 bytes, big-endian + // A 4-byte magic sequence, D1 D3 39 64 + // The vector, serialized as described below + // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian + + auto total_length = BigEndian::Load32(buf.data()); + if (total_length + 8 != buffer_size) [[unlikely]] { + create_status = Status::DataQualityError( + "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, + buffer_size); + return nullptr; + } + + constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; + if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { + create_status = Status::DataQualityError("Deletion vector magic number mismatch"); + return nullptr; + } + + roaring::Roaring64Map bitmap; + SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); + try { + bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); + } catch (const std::runtime_error& e) { + create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); + return nullptr; + } + // skip CRC-32 checksum + + delete_rows->reserve(bitmap.cardinality()); + for (auto it = bitmap.begin(); it != bitmap.end(); it++) { + delete_rows->push_back(*it); + } + COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); + return delete_rows; + }); + + RETURN_IF_ERROR(create_status); + if (!_iceberg_delete_rows->empty()) [[likely]] { + set_delete_rows(); + } + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 995b78915b5279..ffbb18d1255ea6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -80,17 +80,21 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; + enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; enum Fileformat { NONE, PARQUET, ORC, AVRO }; virtual void set_delete_rows() = 0; + Status read_deletion_vector(const std::string& data_file_path, + const TIcebergDeleteFileDesc& delete_file_desc); + protected: struct IcebergProfile { RuntimeProfile::Counter* num_delete_files; RuntimeProfile::Counter* num_delete_rows; RuntimeProfile::Counter* delete_files_read_time; RuntimeProfile::Counter* delete_rows_sort_time; + RuntimeProfile::Counter* parse_delete_file_time; }; using DeleteRows = std::vector; using DeleteFile = phmap::parallel_flat_hash_map< @@ -103,8 +107,8 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel * Sorting by file_path allows filter pushdown by file in columnar storage formats. * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. */ - void _sort_delete_rows(std::vector*>& delete_rows_array, - int64_t num_delete_rows); + static void _sort_delete_rows(const std::vector*>& delete_rows_array, + int64_t num_delete_rows, std::vector& result); PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); @@ -128,7 +132,8 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel // owned by scan node ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; - std::vector _iceberg_delete_rows; + // _iceberg_delete_rows from kv_cache + const std::vector* _iceberg_delete_rows = nullptr; std::vector _expand_col_names; std::vector _expand_columns; std::vector _all_required_col_names; @@ -183,7 +188,7 @@ class IcebergParquetReader final : public IcebergTableReader { void set_delete_rows() final { auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); - parquet_reader->set_delete_rows(&_iceberg_delete_rows); + parquet_reader->set_delete_rows(_iceberg_delete_rows); } protected: @@ -217,7 +222,7 @@ class IcebergOrcReader final : public IcebergTableReader { void set_delete_rows() final { auto* orc_reader = (OrcReader*)_file_format_reader.get(); - orc_reader->set_position_delete_rowids(&_iceberg_delete_rows); + orc_reader->set_position_delete_rowids(_iceberg_delete_rows); } Status init_reader( diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index d5bb048ebf7e58..fef9643180bf5f 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -21,22 +21,26 @@ #include "common/status.h" #include "runtime/runtime_state.h" -#include "util/deletion_vector.h" +#include "vec/exec/format/table/deletion_vector_reader.h" namespace doris::vectorized { #include "common/compile_check_begin.h" PaimonReader::PaimonReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, - io::IOContext* io_ctx, FileMetaCache* meta_cache) + ShardedKVCache* kv_cache, io::IOContext* io_ctx, + FileMetaCache* meta_cache) : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx, - meta_cache) { + meta_cache), + _kv_cache(kv_cache) { static const char* paimon_profile = "PaimonProfile"; ADD_TIMER(_profile, paimon_profile); _paimon_profile.num_delete_rows = ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, paimon_profile); _paimon_profile.delete_files_read_time = ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); + _paimon_profile.parse_deletion_vector_time = + ADD_CHILD_TIMER(_profile, "ParseDeletionVectorTime", paimon_profile); } Status PaimonReader::init_row_filters() { @@ -50,58 +54,91 @@ Status PaimonReader::init_row_filters() { if (!_range.table_format_params.paimon_params.__isset.row_count) { _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } - const auto& deletion_file = table_desc.deletion_file; - io::FileSystemProperties properties = { - .system_type = _params.file_type, - .properties = _params.properties, - .hdfs_params = _params.hdfs_params, - .broker_addresses {}, - }; - if (_range.__isset.file_type) { - // for compatibility - properties.system_type = _range.file_type; - } - if (_params.__isset.broker_addresses) { - properties.broker_addresses.assign(_params.broker_addresses.begin(), - _params.broker_addresses.end()); - } - io::FileDescription file_description = { - .path = deletion_file.path, - .file_size = -1, - .mtime = 0, - .fs_name = _range.fs_name, - }; + Status create_status = Status::OK(); - // TODO: cache the file in local - auto delete_file_reader = DORIS_TRY(FileFactory::create_file_reader( - properties, file_description, io::FileReaderOptions::DEFAULT)); - // the reason of adding 4: https://github.com/apache/paimon/issues/3313 - size_t bytes_read = deletion_file.length + 4; - // TODO: better way to alloc memeory - std::vector buf(bytes_read); - Slice result(buf.data(), bytes_read); - { - SCOPED_TIMER(_paimon_profile.delete_files_read_time); - RETURN_IF_ERROR( - delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, _io_ctx)); - } - if (bytes_read != deletion_file.length + 4) { - return Status::IOError( - "failed to read deletion vector, deletion file path: {}, offset: {}, expect " - "length: {}, real " - "length: {}", - deletion_file.path, deletion_file.offset, deletion_file.length + 4, bytes_read); - } - auto deletion_vector = DORIS_TRY(DeletionVector::deserialize(result.data, result.size)); - if (!deletion_vector.is_empty()) { - for (auto i = deletion_vector.minimum(); i <= deletion_vector.maximum(); i++) { - if (deletion_vector.is_delete(i)) { - _delete_rows.push_back(i); - } + std::string key; + key.resize(deletion_file.path.size() + sizeof(deletion_file.offset)); + memcpy(key.data(), deletion_file.path.data(), deletion_file.path.size()); + memcpy(key.data() + deletion_file.path.size(), &deletion_file.offset, + sizeof(deletion_file.offset)); + + SCOPED_TIMER(_paimon_profile.delete_files_read_time); + using DeleteRows = std::vector; + _delete_rows = _kv_cache->get(key, [&]() -> DeleteRows* { + auto* delete_rows = new DeleteRows; + + TFileRangeDesc delete_range; + // must use __set() method to make sure __isset is true + delete_range.__set_fs_name(_range.fs_name); + delete_range.path = deletion_file.path; + delete_range.start_offset = deletion_file.offset; + delete_range.size = deletion_file.length + 4; + delete_range.file_size = -1; + + DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx); + create_status = dv_reader.open(); + if (!create_status.ok()) [[unlikely]] { + return nullptr; + } + + // the reason of adding 4: https://github.com/apache/paimon/issues/3313 + size_t bytes_read = deletion_file.length + 4; + // TODO: better way to alloc memeory + std::vector buffer(bytes_read); + create_status = dv_reader.read_at(deletion_file.offset, {buffer.data(), bytes_read}); + if (!create_status.ok()) [[unlikely]] { + return nullptr; + } + + // parse deletion vector + const char* buf = buffer.data(); + uint32_t actual_length; + std::memcpy(reinterpret_cast(&actual_length), buf, 4); + // change byte order to big endian + std::reverse(reinterpret_cast(&actual_length), + reinterpret_cast(&actual_length) + 4); + buf += 4; + if (actual_length != bytes_read - 4) [[unlikely]] { + create_status = Status::RuntimeError( + "DeletionVector deserialize error: length not match, " + "actual length: {}, expect length: {}", + actual_length, bytes_read - 4); + return nullptr; + } + uint32_t magic_number; + std::memcpy(reinterpret_cast(&magic_number), buf, 4); + // change byte order to big endian + std::reverse(reinterpret_cast(&magic_number), + reinterpret_cast(&magic_number) + 4); + buf += 4; + const static uint32_t MAGIC_NUMBER = 1581511376; + if (magic_number != MAGIC_NUMBER) [[unlikely]] { + create_status = Status::RuntimeError( + "DeletionVector deserialize error: invalid magic number {}", magic_number); + return nullptr; + } + + roaring::Roaring roaring_bitmap; + SCOPED_TIMER(_paimon_profile.parse_deletion_vector_time); + try { + roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4); + } catch (const std::runtime_error& e) { + create_status = Status::RuntimeError( + "DeletionVector deserialize error: failed to deserialize roaring bitmap, {}", + e.what()); + return nullptr; + } + delete_rows->reserve(roaring_bitmap.cardinality()); + for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); it++) { + delete_rows->push_back(*it); } - COUNTER_UPDATE(_paimon_profile.num_delete_rows, _delete_rows.size()); + COUNTER_UPDATE(_paimon_profile.num_delete_rows, delete_rows->size()); + return delete_rows; + }); + RETURN_IF_ERROR(create_status); + if (!_delete_rows->empty()) [[likely]] { set_delete_rows(); } return Status::OK(); diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 30cd788ce89163..9f43d858575ce6 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -30,7 +30,8 @@ class PaimonReader : public TableFormatReader, public TableSchemaChangeHelper { public: PaimonReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache); + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, + FileMetaCache* meta_cache); ~PaimonReader() override = default; @@ -42,8 +43,12 @@ class PaimonReader : public TableFormatReader, public TableSchemaChangeHelper { struct PaimonProfile { RuntimeProfile::Counter* num_delete_rows; RuntimeProfile::Counter* delete_files_read_time; + RuntimeProfile::Counter* parse_deletion_vector_time; }; - std::vector _delete_rows; + // _delete_rows from kv_cache. + const std::vector* _delete_rows = nullptr; + // owned by scan node + ShardedKVCache* _kv_cache; PaimonProfile _paimon_profile; virtual void set_delete_rows() = 0; @@ -54,14 +59,15 @@ class PaimonOrcReader final : public PaimonReader { ENABLE_FACTORY_CREATOR(PaimonOrcReader); PaimonOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache) - : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx, - meta_cache) {}; + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, + FileMetaCache* meta_cache) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, kv_cache, + io_ctx, meta_cache) {}; ~PaimonOrcReader() final = default; void set_delete_rows() final { (reinterpret_cast(_file_format_reader.get())) - ->set_position_delete_rowids(&_delete_rows); + ->set_position_delete_rowids(_delete_rows); } Status init_reader( @@ -90,15 +96,15 @@ class PaimonParquetReader final : public PaimonReader { ENABLE_FACTORY_CREATOR(PaimonParquetReader); PaimonParquetReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, io::IOContext* io_ctx, - FileMetaCache* meta_cache) - : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx, - meta_cache) {}; + const TFileRangeDesc& range, ShardedKVCache* kv_cache, + io::IOContext* io_ctx, FileMetaCache* meta_cache) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, kv_cache, + io_ctx, meta_cache) {}; ~PaimonParquetReader() final = default; void set_delete_rows() final { (reinterpret_cast(_file_format_reader.get())) - ->set_delete_rows(&_delete_rows); + ->set_delete_rows(_delete_rows); } Status init_reader( diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 8da6c762137a8f..799280fed3faee 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -1232,8 +1232,8 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "paimon") { std::unique_ptr paimon_reader = PaimonParquetReader::create_unique( - std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(), - file_meta_cache_ptr); + std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache, + _io_ctx.get(), file_meta_cache_ptr); init_status = paimon_reader->init_reader( _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(), @@ -1345,9 +1345,9 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, _cur_reader = std::move(iceberg_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "paimon") { - std::unique_ptr paimon_reader = - PaimonOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params, - range, _io_ctx.get(), file_meta_cache_ptr); + std::unique_ptr paimon_reader = PaimonOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(), + file_meta_cache_ptr); init_status = paimon_reader->init_reader( _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, diff --git a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl index dd0dcae18c460a..bde0dad21715f1 100644 --- a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl +++ b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl @@ -25,6 +25,10 @@ done set -ex +# remove /opt/spark/jars/iceberg-aws-bundle-1.5.0.jar\:/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar +rm /opt/spark/jars/iceberg-aws-bundle-1.5.0.jar +rm /opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar + start-master.sh -p 7077 start-worker.sh spark://doris--spark-iceberg:7077 start-history-server.sh @@ -50,6 +54,16 @@ END_TIME2=$(date +%s) EXECUTION_TIME2=$((END_TIME2 - START_TIME2)) echo "Script paimon total: {} executed in $EXECUTION_TIME2 seconds" + + +ls /mnt/scripts/create_preinstalled_scripts/iceberg_scala/*.scala | xargs -n 1 -I {} bash -c ' + START_TIME=$(date +%s) + spark-shell --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -I {} + END_TIME=$(date +%s) + EXECUTION_TIME=$((END_TIME - START_TIME)) + echo "Script: {} executed in $EXECUTION_TIME seconds" +' + touch /mnt/SUCCESS; tail -f /dev/null diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index 4dd00504faa930..3c52bf691590ca 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -36,6 +36,8 @@ services: - ./spark-defaults.conf:/opt/spark/conf/spark-defaults.conf - ./data/input/jars/paimon-spark-3.5-1.0.1.jar:/opt/spark/jars/paimon-spark-3.5-1.0.1.jar - ./data/input/jars/paimon-s3-1.0.1.jar:/opt/spark/jars/paimon-s3-1.0.1.jar + - ./data/input/jars/iceberg-aws-bundle-1.10.0.jar:/opt/spark/jars/iceberg-aws-bundle-1.10.0.jar + - ./data/input/jars/iceberg-spark-runtime-3.5_2.12-1.10.0.jar:/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.10.0.jar environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password @@ -57,7 +59,7 @@ services: POSTGRES_USER: root POSTGRES_DB: iceberg healthcheck: - test: [ "CMD-SHELL", "pg_isready -U root" ] + test: [ "CMD-SHELL", "pg_isready -U root -d iceberg" ] interval: 5s timeout: 60s retries: 120 @@ -67,12 +69,13 @@ services: - doris--iceberg rest: - image: tabulario/iceberg-rest:1.6.0 + image: apache/iceberg-rest-fixture:1.10.0 container_name: doris--iceberg-rest ports: - ${REST_CATALOG_PORT}:8181 volumes: - ./data:/mnt/data + - ./data/input/jars/postgresql-42.7.4.jar:/opt/jdbc/postgresql.jar depends_on: postgres: condition: service_healthy @@ -90,7 +93,11 @@ services: - CATALOG_JDBC_PASSWORD=123456 networks: - doris--iceberg - entrypoint: /bin/bash /mnt/data/input/script/rest_init.sh + command: + - java + - -cp + - /usr/lib/iceberg-rest/iceberg-rest-adapter.jar:/opt/jdbc/postgresql.jar + - org.apache.iceberg.rest.RESTCatalogServer minio: image: minio/minio:RELEASE.2025-01-20T14-49-07Z diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql index 8bf10ad7d58dcb..b8ad4c0970683e 100644 --- a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql @@ -6,6 +6,7 @@ CREATE TABLE test_iceberg_systable_unpartitioned ( ) USING ICEBERG TBLPROPERTIES ( + 'format-version'='2', 'primary-key' = 'id', 'write.upsert.enabled' = 'true' ); @@ -17,6 +18,7 @@ CREATE TABLE test_iceberg_systable_partitioned ( USING ICEBERG PARTITIONED BY (id) TBLPROPERTIES ( + 'format-version'='2', 'primary-key' = 'id', 'write.upsert.enabled' = 'true' ); diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala new file mode 100644 index 00000000000000..f557ac9e2a7d7d --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +val dbName = "format_v3" +val tableName = "dv_test" +val fullTable = s"$dbName.$tableName" + +spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName") + + +spark.sql(s""" drop table if exists $fullTable """) +spark.sql(s""" +CREATE TABLE $fullTable ( + id INT, + batch INT, + data STRING +) +USING iceberg +TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' +) +""") + +import spark.implicits._ + +val batch1 = Seq( + (1, 1, "a"), (2, 1, "b"), (3, 1, "c"), (4, 1, "d"), + (5, 1, "e"), (6, 1, "f"), (7, 1, "g"), (8, 1, "h") +).toDF("id", "batch", "data") + .coalesce(1) + +batch1.writeTo(fullTable).append() + +spark.sql(s""" +DELETE FROM $fullTable +WHERE batch = 1 AND id IN (3, 4, 5) +""") + +val batch2 = Seq( + (9, 2, "i"), (10, 2, "j"), (11, 2, "k"), (12, 2, "l"), + (13, 2, "m"), (14, 2, "n"), (15, 2, "o"), (16, 2, "p") +).toDF("id", "batch", "data") + .coalesce(1) + +batch2.writeTo(fullTable).append() + +spark.sql(s""" +DELETE FROM $fullTable +WHERE batch = 2 AND id >= 14 +""") + +spark.sql(s""" +DELETE FROM $fullTable +WHERE id % 2 = 1 +""") + + +// spark.sql(s""" select count(*) from $fullTable """).show() + + +// v2 to v3. + +val tableName = "dv_test_v2" +val fullTable = s"$dbName.$tableName" +spark.sql(s""" drop table if exists $fullTable """) + +spark.sql(s""" +CREATE TABLE $fullTable ( + id INT, + batch INT, + data STRING +) +USING iceberg +TBLPROPERTIES ( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' +) +""") + +batch1.writeTo(fullTable).append() + +spark.sql(s""" +DELETE FROM $fullTable +WHERE batch = 1 AND id IN (3, 4, 5) +""") + +spark.sql(s""" +ALTER TABLE $fullTable +SET TBLPROPERTIES ('format-version' = '3') +""") + +spark.sql(s""" +DELETE FROM $fullTable +WHERE id % 2 = 1 +""") + + +// spark.sql(s""" select * from $fullTable order by id """).show() + + +val tableName = "dv_test_1w" +val fullTable = s"$dbName.$tableName" +spark.sql(s""" drop table if exists $fullTable """) + +spark.sql(s""" +CREATE TABLE $fullTable ( + id BIGINT, + grp INT, + value INT, + ts TIMESTAMP +) +USING iceberg +TBLPROPERTIES ( + 'format-version'='3', + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'write.parquet.row-group-size-bytes'='10240' +) +""") + +import org.apache.spark.sql.functions._ + +val df = spark.range(0, 100000).select( + col("id"), + (col("id") % 100).cast("int").as("grp"), + (rand() * 1000).cast("int").as("value"), + current_timestamp().as("ts") + ) + +df.repartition(10).writeTo(fullTable).append() + + +spark.conf.set("spark.sql.shuffle.partitions", "1") +spark.conf.set("spark.sql.adaptive.enabled", "false") + + +spark.sql(s""" +DELETE FROM $fullTable +WHERE id%2 = 1 +""") + +spark.sql(s""" +DELETE FROM $fullTable +WHERE id%3 = 1 +""") + +// spark.sql(s""" select count(*) from $fullTable """).show() diff --git a/docker/thirdparties/run-thirdparties-docker.sh b/docker/thirdparties/run-thirdparties-docker.sh index 313ca60ad3d07b..80b4a9da043898 100755 --- a/docker/thirdparties/run-thirdparties-docker.sh +++ b/docker/thirdparties/run-thirdparties-docker.sh @@ -458,6 +458,18 @@ start_iceberg() { echo "${ICEBERG_DIR}/data exist, continue !" fi + if [[ ! -f "${ICEBERG_DIR}/data/input/jars/iceberg-aws-bundle-1.10.0.jar" ]]; then + echo "iceberg 1.10.0 jars does not exist" + cd "${ICEBERG_DIR}" \ + && rm -f iceberg_1_10_0*.jars.tar.gz\ + && wget -P "${ROOT}"/docker-compose/iceberg https://"${s3BucketName}.${s3Endpoint}"/regression/datalake/pipeline_data/iceberg_1_10_0.jars.tar.gz \ + && sudo tar xzvf iceberg_1_10_0.jars.tar.gz -C "data/input/jars" \ + && sudo rm -rf iceberg_1_10_0.jars.tar.gz + cd - + else + echo "iceberg 1.10.0 jars exist, continue !" + fi + sudo docker compose -f "${ROOT}"/docker-compose/iceberg/iceberg.yaml --env-file "${ROOT}"/docker-compose/iceberg/iceberg.env up -d --wait fi } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java index b876732ff3f4e3..32de4ebfdd9c0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java @@ -18,32 +18,60 @@ package org.apache.doris.datasource.iceberg.source; import lombok.Data; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.types.Conversions; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; @Data public class IcebergDeleteFileFilter { private String deleteFilePath; private long filesize; + private FileFormat fileformat; - public IcebergDeleteFileFilter(String deleteFilePath, long filesize) { + + public static int type() { + return 0; + } + + public IcebergDeleteFileFilter(String deleteFilePath, long filesize, FileFormat fileformat) { this.deleteFilePath = deleteFilePath; this.filesize = filesize; + this.fileformat = fileformat; } - public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound, long filesize) { - return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize); + public static PositionDelete createPositionDelete(DeleteFile deleteFile) { + Optional positionLowerBound = Optional.ofNullable(deleteFile.lowerBounds()) + .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + Optional positionUpperBound = Optional.ofNullable(deleteFile.upperBounds()) + .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) + .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); + String deleteFilePath = deleteFile.path().toString(); + + if (deleteFile.format() == FileFormat.PUFFIN) { + // The content_offset and content_size_in_bytes fields are used to reference + // a specific blob for direct access to a deletion vector. + return new DeletionVector(deleteFilePath, positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L), + deleteFile.fileSizeInBytes(), deleteFile.contentOffset(), deleteFile.contentSizeInBytes()); + } else { + return new PositionDelete(deleteFilePath, positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L), + deleteFile.fileSizeInBytes(), deleteFile.format()); + } } - public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { + public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds, + long fileSize, FileFormat fileformat) { // todo: // Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds)); // StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct()); // pass deleteSet to BE // compare two StructLike value, if equals, filtered - return new EqualityDelete(deleteFilePath, fieldIds, fileSize); + return new EqualityDelete(deleteFilePath, fieldIds, fileSize, fileformat); } static class PositionDelete extends IcebergDeleteFileFilter { @@ -51,8 +79,8 @@ static class PositionDelete extends IcebergDeleteFileFilter { private final Long positionUpperBound; public PositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound, long fileSize) { - super(deleteFilePath, fileSize); + Long positionUpperBound, long fileSize, FileFormat fileformat) { + super(deleteFilePath, fileSize, fileformat); this.positionLowerBound = positionLowerBound; this.positionUpperBound = positionUpperBound; } @@ -64,13 +92,41 @@ public OptionalLong getPositionLowerBound() { public OptionalLong getPositionUpperBound() { return positionUpperBound == -1L ? OptionalLong.empty() : OptionalLong.of(positionUpperBound); } + + public static int type() { + return 1; + } + } + + static class DeletionVector extends PositionDelete { + private final long contentOffset; + private final long contentLength; + + public DeletionVector(String deleteFilePath, Long positionLowerBound, + Long positionUpperBound, long fileSize, long contentOffset, long contentLength) { + super(deleteFilePath, positionLowerBound, positionUpperBound, fileSize, FileFormat.PUFFIN); + this.contentOffset = contentOffset; + this.contentLength = contentLength; + } + + public long getContentOffset() { + return contentOffset; + } + + public long getContentLength() { + return contentLength; + } + + public static int type() { + return 3; + } } static class EqualityDelete extends IcebergDeleteFileFilter { private List fieldIds; - public EqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { - super(deleteFilePath, fileSize); + public EqualityDelete(String deleteFilePath, List fieldIds, long fileSize, FileFormat fileFormat) { + super(deleteFilePath, fileSize, fileFormat); this.fieldIds = fieldIds; } @@ -81,5 +137,10 @@ public List getFieldIds() { public void setFieldIds(List fieldIds) { this.fieldIds = fieldIds; } + + + public static int type() { + return 2; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 99f9ab0ac5c213..f1dc6cd4eb9fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -43,6 +43,7 @@ import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader; import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue; import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter; +import org.apache.doris.datasource.iceberg.source.IcebergDeleteFileFilter.EqualityDelete; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.planner.PlanNodeId; @@ -70,7 +71,6 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -85,7 +85,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.types.Conversions; import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; @@ -243,12 +242,19 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli if (upperBound.isPresent()) { deleteFileDesc.setPositionUpperBound(upperBound.getAsLong()); } - deleteFileDesc.setContent(FileContent.POSITION_DELETES.id()); + deleteFileDesc.setContent(IcebergDeleteFileFilter.PositionDelete.type()); + + if (filter instanceof IcebergDeleteFileFilter.DeletionVector) { + IcebergDeleteFileFilter.DeletionVector dv = (IcebergDeleteFileFilter.DeletionVector) filter; + deleteFileDesc.setContentOffset((int) dv.getContentOffset()); + deleteFileDesc.setContentSizeInBytes((int) dv.getContentLength()); + deleteFileDesc.setContent(IcebergDeleteFileFilter.DeletionVector.type()); + } } else { IcebergDeleteFileFilter.EqualityDelete equalityDelete = (IcebergDeleteFileFilter.EqualityDelete) filter; deleteFileDesc.setFieldIds(equalityDelete.getFieldIds()); - deleteFileDesc.setContent(FileContent.EQUALITY_DELETES.id()); + deleteFileDesc.setContent(EqualityDelete.type()); } fileDesc.addToDeleteFiles(deleteFileDesc); } @@ -799,18 +805,11 @@ private List getDeleteFileFilters(FileScanTask spitTask List filters = new ArrayList<>(); for (DeleteFile delete : spitTask.deletes()) { if (delete.content() == FileContent.POSITION_DELETES) { - Optional positionLowerBound = Optional.ofNullable(delete.lowerBounds()) - .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - Optional positionUpperBound = Optional.ofNullable(delete.upperBounds()) - .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) - .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); - filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), - positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L), - delete.fileSizeInBytes())); + filters.add(IcebergDeleteFileFilter.createPositionDelete(delete)); } else if (delete.content() == FileContent.EQUALITY_DELETES) { filters.add(IcebergDeleteFileFilter.createEqualityDelete( - delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes())); + delete.path().toString(), delete.equalityFieldIds(), + delete.fileSizeInBytes(), delete.format())); } else { throw new IllegalStateException("Unknown delete content: " + delete.content()); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f16a1ad1d2d60a..f7a8e8d978d50c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -290,8 +290,12 @@ struct TIcebergDeleteFileDesc { 2: optional i64 position_lower_bound; 3: optional i64 position_upper_bound; 4: optional list field_ids; - // Iceberg file type, 0: data, 1: position delete, 2: equality delete. + // Iceberg file type, 0: data, 1: position delete, 2: equality delete, 3: deletion vector. 5: optional i32 content; + // 6 & 7 : iceberg v3 deletion vector. + // The content_offset and content_size_in_bytes fields are used to reference a specific blob for direct access to a deletion vector. + 6: optional i32 content_offset; + 7: optional i32 content_size_in_bytes; } struct TIcebergFileDesc { diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out new file mode 100644 index 00000000000000..41819b90bd731a --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out @@ -0,0 +1,65 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +2 1 b +6 1 f +8 1 h +10 2 j +12 2 l + +-- !q2 -- +2 1 b +6 1 f +8 1 h + +-- !q4 -- +2 1 b + +-- !q5 -- +2 1 b + +-- !q6 -- + +-- !q7 -- + +-- !q8 -- +2 1 b + +-- !q9 -- +2 1 b + +-- !q10 -- +5 + +-- !q11 -- +3 + +-- !q12 -- +5 + +-- !q13 -- +3 + +-- !q14 -- +5 + +-- !q15 -- +3 + +-- !q16 -- +33334 + +-- !q17 -- +33334 + +-- !q18 -- +0 + +-- !q19 -- +0 + +-- !q20 -- +0 + +-- !q21 -- +33334 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy new file mode 100644 index 00000000000000..b730761ac85950 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_deletion_vector", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_deletion_vector" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + + sql """switch ${catalog_name};""" + sql """ use format_v3;""" + + + qt_q1 """ SELECT * FROM dv_test ORDER BY id; """ + qt_q2 """ SELECT * FROM dv_test_v2 ORDER BY id; """ + + qt_q4 """ SELECT * FROM dv_test where id = 2 ORDER BY id ; """ + qt_q5 """ SELECT * FROM dv_test_v2 where id = 2 ORDER BY id ; """ + + qt_q6 """ SELECT * FROM dv_test where id %2 =1 ORDER BY id; """ + qt_q7 """ SELECT * FROM dv_test_v2 where id %2 =1 ORDER BY id; """ + + qt_q8 """ SELECT * FROM dv_test where data < 'f' ORDER BY id; """ + qt_q9 """ SELECT * FROM dv_test_v2 where data < 'f' ORDER BY id; """ + + + qt_q10 """ SELECT count(*) FROM dv_test ; """ + qt_q11 """ SELECT count(*) FROM dv_test_v2 ; """ + + + qt_q12 """ SELECT count(id) FROM dv_test ; """ + qt_q13 """ SELECT count(id) FROM dv_test_v2 ; """ + + qt_q14 """ SELECT count(batch) FROM dv_test ; """ + qt_q15 """ SELECT count(batch) FROM dv_test_v2 ; """ + + + qt_q16 """ SELECT count(*) FROM dv_test_1w ; """ + qt_q17 """ SELECT count(id) FROM dv_test_1w ; """ + qt_q18 """ SELECT count(grp) FROM dv_test_1w where id = 1; """ + qt_q19 """ SELECT count(value) FROM dv_test_1w where id%2 = 1; """ + qt_q20 """ SELECT count(id) FROM dv_test_1w where id%3 = 1; """ + qt_q21 """ SELECT count(ts) FROM dv_test_1w where id%3 != 1; """ + + + + + +}