Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ include_directories(
${GENSRC_DIR}/
${THIRDPARTY_DIR}/include
${GPERFTOOLS_HOME}/include
${THIRDPARTY_DIR}/include/paimon # paimon-cpp headers
)

if ("${DORIS_JAVA_HOME}" STREQUAL "")
Expand Down
14 changes: 14 additions & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,17 @@ add_thirdparty(icudata LIB64)


add_thirdparty(pugixml LIB64)

# paimon-cpp libraries
# Main paimon library
add_thirdparty(paimon_static LIB64)
# File format libraries
add_thirdparty(paimon_parquet_file_format_static LIB64)
add_thirdparty(paimon_orc_file_format_static LIB64)
add_thirdparty(paimon_avro_file_format_static LIB64)
add_thirdparty(paimon_blob_file_format_static LIB64)
# File system libraries
add_thirdparty(paimon_local_file_system_static LIB64)
# Index libraries
add_thirdparty(paimon_file_index_static LIB64)
add_thirdparty(paimon_global_index_static LIB64)
266 changes: 266 additions & 0 deletions be/src/vec/exec/format/table/paimon_cpp_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "paimon_cpp_reader.h"

#include <algorithm>
#include <utility>

#include "arrow/c/bridge.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "paimon/defs.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/read_context.h"
#include "paimon/table/source/table_read.h"
#include "vec/exec/format/table/paimon_doris_file_system.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/url_coding.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

namespace {
constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND";

} // namespace

PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TFileRangeDesc& range,
const TFileScanRangeParams* range_params)
: _file_slot_descs(file_slot_descs),
_state(state),
_profile(profile),
_range(range),
_range_params(range_params) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
if (range.__isset.table_format_params &&
range.table_format_params.__isset.table_level_row_count) {
_remaining_table_level_row_count = range.table_format_params.table_level_row_count;
} else {
_remaining_table_level_row_count = -1;
}
}

PaimonCppReader::~PaimonCppReader() = default;

Status PaimonCppReader::init_reader() {
return _init_paimon_reader();
}

Status PaimonCppReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
*eof = false;
if (_remaining_table_level_row_count == 0) {
*eof = true;
}
return Status::OK();
}

if (!_batch_reader) {
return Status::InternalError("paimon-cpp reader is not initialized");
}

if (_col_name_to_block_idx.empty()) {
_col_name_to_block_idx = block->get_name_to_pos_map();
}

auto batch_result = _batch_reader->NextBatch();
if (!batch_result.ok()) {
return Status::InternalError("paimon-cpp read batch failed: {}",
batch_result.status().ToString());
}
auto batch = std::move(batch_result).value();
if (paimon::BatchReader::IsEofBatch(batch)) {
*read_rows = 0;
*eof = true;
return Status::OK();
}

arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result =
arrow::ImportRecordBatch(batch.first.get(), batch.second.get());
if (!import_result.ok()) {
return Status::InternalError("failed to import paimon-cpp arrow batch: {}",
import_result.status().message());
}

auto record_batch = std::move(import_result).ValueUnsafe();
const auto num_rows = record_batch->num_rows();
const auto num_columns = record_batch->num_columns();
for (int c = 0; c < num_columns; ++c) {
const auto& field = record_batch->schema()->field(c);
if (field->name() == VALUE_KIND_FIELD) {
continue;
}

auto it = _col_name_to_block_idx.find(field->name());
if (it == _col_name_to_block_idx.end()) {
return Status::InternalError("paimon-cpp column {} not found in block", field->name());
}
const vectorized::ColumnWithTypeAndName& column_with_name =
block->get_by_position(it->second);
try {
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
column_with_name.column->assume_mutable_ref(), record_batch->column(c).get(), 0,
num_rows, _ctzz));
} catch (Exception& e) {
return Status::InternalError("Failed to convert from arrow to block: {}", e.what());
}
}

*read_rows = num_rows;
*eof = false;
return Status::OK();
}

Status PaimonCppReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (const auto& slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
return Status::OK();
}

Status PaimonCppReader::close() {
if (_batch_reader) {
_batch_reader->Close();
}
return Status::OK();
}

Status PaimonCppReader::_init_paimon_reader() {
register_paimon_doris_file_system();
RETURN_IF_ERROR(_decode_split(&_split));

auto table_path_opt = _resolve_table_path();
if (!table_path_opt.has_value()) {
return Status::InternalError(
"paimon-cpp missing paimon_table; cannot resolve paimon table root path");
}
auto options = _build_options();
auto read_columns = _build_read_columns();

std::string table_path = std::move(table_path_opt.value());
paimon::ReadContextBuilder builder(table_path);
if (!read_columns.empty()) {
builder.SetReadSchema(read_columns);
}
if (!options.empty()) {
builder.SetOptions(options);
}

auto context_result = builder.Finish();
if (!context_result.ok()) {
return Status::InternalError("paimon-cpp build read context failed: {}",
context_result.status().ToString());
}
auto context = std::move(context_result).value();

auto table_read_result = paimon::TableRead::Create(std::move(context));
if (!table_read_result.ok()) {
return Status::InternalError("paimon-cpp create table read failed: {}",
table_read_result.status().ToString());
}
auto table_read = std::move(table_read_result).value();
auto reader_result = table_read->CreateReader(_split);
if (!reader_result.ok()) {
return Status::InternalError("paimon-cpp create reader failed: {}",
reader_result.status().ToString());
}
_table_read = std::move(table_read);
_batch_reader = std::move(reader_result).value();
return Status::OK();
}

Status PaimonCppReader::_decode_split(std::shared_ptr<paimon::Split>* split) {
if (!_range.__isset.table_format_params || !_range.table_format_params.__isset.paimon_params ||
!_range.table_format_params.paimon_params.__isset.paimon_split) {
return Status::InternalError("paimon-cpp missing paimon_split in scan range");
}
const auto& encoded_split = _range.table_format_params.paimon_params.paimon_split;
std::string decoded_split;
if (!base64_decode(encoded_split, &decoded_split)) {
return Status::InternalError("paimon-cpp base64 decode paimon_split failed");
}
auto pool = paimon::GetDefaultPool();
auto split_result =
paimon::Split::Deserialize(decoded_split.data(), decoded_split.size(), pool);
if (!split_result.ok()) {
return Status::InternalError("paimon-cpp deserialize split failed: {}",
split_result.status().ToString());
}
*split = std::move(split_result).value();
return Status::OK();
}

std::optional<std::string> PaimonCppReader::_resolve_table_path() const {
if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
_range.table_format_params.paimon_params.__isset.paimon_table &&
!_range.table_format_params.paimon_params.paimon_table.empty()) {
return _range.table_format_params.paimon_params.paimon_table;
}
return std::nullopt;
}

std::vector<std::string> PaimonCppReader::_build_read_columns() const {
std::vector<std::string> columns;
columns.reserve(_file_slot_descs.size());
for (const auto& slot : _file_slot_descs) {
columns.emplace_back(slot->col_name());
}
return columns;
}

std::map<std::string, std::string> PaimonCppReader::_build_options() const {
std::map<std::string, std::string> options;
if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
_range.table_format_params.paimon_params.__isset.paimon_options) {
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
_range.table_format_params.paimon_params.paimon_options.end());
}

if (_range_params && _range_params->__isset.properties && !_range_params->properties.empty()) {
for (const auto& kv : _range_params->properties) {
options[kv.first] = kv.second;
}
} else if (_range.__isset.table_format_params &&
_range.table_format_params.__isset.paimon_params &&
_range.table_format_params.paimon_params.__isset.hadoop_conf) {
for (const auto& kv : _range.table_format_params.paimon_params.hadoop_conf) {
options[kv.first] = kv.second;
}
}

options[paimon::Options::FILE_SYSTEM] = "doris";
return options;
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
90 changes: 90 additions & 0 deletions be/src/vec/exec/format/table/paimon_cpp_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstddef>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "cctz/time_zone.h"
#include "common/status.h"
#include "exec/olap_common.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/table/source/split.h"
#include "vec/exec/format/generic_reader.h"

namespace paimon {
class TableRead;
} // namespace paimon

namespace doris {
class RuntimeProfile;
class RuntimeState;
class SlotDescriptor;
} // namespace doris

namespace doris::vectorized {
#include "common/compile_check_begin.h"

class Block;

class PaimonCppReader : public GenericReader {
ENABLE_FACTORY_CREATOR(PaimonCppReader);

public:
PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range,
const TFileScanRangeParams* range_params);
~PaimonCppReader() override;

Status init_reader();
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
Status close() override;

private:
Status _init_paimon_reader();
Status _decode_split(std::shared_ptr<paimon::Split>* split);
// Resolve paimon table root path for schema/manifest lookup.
std::optional<std::string> _resolve_table_path() const;
std::vector<std::string> _build_read_columns() const;
std::map<std::string, std::string> _build_options() const;

const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
[[maybe_unused]] RuntimeProfile* _profile = nullptr;
const TFileRangeDesc& _range;
const TFileScanRangeParams* _range_params = nullptr;

std::shared_ptr<paimon::Split> _split;
std::unique_ptr<paimon::TableRead> _table_read;
std::unique_ptr<paimon::BatchReader> _batch_reader;

std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
int64_t _remaining_table_level_row_count = -1;
cctz::time_zone _ctzz;
};

#include "common/compile_check_end.h"
} // namespace doris::vectorized
Loading