Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a7c4d14
feat: Add Fluss catalog layer and metadata operations
Shekharrajak Dec 28, 2025
05cb494
feat: Implement Fluss schema loading and type conversion
Shekharrajak Dec 28, 2025
3e96519
feat: Implement Fluss scan node for query planning
Shekharrajak Dec 28, 2025
66b04be
feat: Add Thrift definitions for Fluss table format
Shekharrajak Dec 28, 2025
baef921
feat: Implement Fluss table reader in BE
Shekharrajak Dec 28, 2025
3d71557
test: Add Fluss catalog and utils unit tests
Shekharrajak Dec 28, 2025
324f5da
test: Add Fluss metadata and table unit tests
Shekharrajak Dec 28, 2025
640c08c
test: Add Fluss database, source and split unit tests
Shekharrajak Dec 28, 2025
3d899b5
test: Fix FlussExternalTableTest and add factory test
Shekharrajak Dec 28, 2025
96ae86d
Merge branch 'master' into feature/fluss-table-integration
Shekharrajak Dec 28, 2025
bea158b
Merge branch 'master' into feature/fluss-table-integration
Shekharrajak Jan 3, 2026
e36029e
Merge branch 'master' into feature/fluss-table-integration
Shekharrajak Jan 5, 2026
54f7372
Add FLUSS to TableFormatType enum
Shekharrajak Jan 5, 2026
d68cb2f
Add Thrift definitions for Fluss
Shekharrajak Jan 5, 2026
215993e
Add bucket/partition fields to FlussSplit
Shekharrajak Jan 5, 2026
3ddeccd
Implement multi-split parallelism in FlussScanNode
Shekharrajak Jan 5, 2026
3c021a8
Add connection management with retry logic
Shekharrajak Jan 5, 2026
701da2a
Add cache synchronization with ReadWriteLock
Shekharrajak Jan 5, 2026
4bbbe45
Add tiered storage support to Fluss FE classes
Shekharrajak Jan 13, 2026
b9a579b
Add tier fields to Thrift definitions
Shekharrajak Jan 13, 2026
9863a2a
Add tier logging to FlussReader BE
Shekharrajak Jan 13, 2026
0683c76
Update Java unit tests for Fluss
Shekharrajak Jan 13, 2026
3f5dc80
Add Fluss Docker test environment with MinIO
Shekharrajak Jan 13, 2026
e959ed7
Add Fluss test config to regression-conf.groovy
Shekharrajak Jan 13, 2026
dd0f030
Update Fluss documentation with tiered storage
Shekharrajak Jan 13, 2026
7aed92e
Add Fluss regression test suites
Shekharrajak Jan 13, 2026
220eaaf
Update thirdparty build scripts
Shekharrajak Jan 13, 2026
a6bc8e9
Remove Fluss documentation files
Shekharrajak Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions be/src/vec/exec/format/table/fluss_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/exec/format/table/fluss_reader.h"

#include "common/logging.h"
#include "common/status.h"
#include "runtime/runtime_state.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

FlussReader::FlussReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx, FileMetaCache* meta_cache)
: TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
meta_cache) {
// Log tier information for debugging
if (range.__isset.table_format_params &&
range.table_format_params.__isset.fluss_params) {
const auto& fluss_params = range.table_format_params.fluss_params;
LOG(INFO) << "FlussReader initialized for table: "
<< fluss_params.database_name << "." << fluss_params.table_name
<< ", bucket: " << fluss_params.bucket_id
<< ", format: " << fluss_params.file_format
<< ", lake_snapshot_id: " << fluss_params.lake_snapshot_id
<< ", lake_files: " << (fluss_params.__isset.lake_file_paths ?
fluss_params.lake_file_paths.size() : 0);
}
}

Status FlussReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
// For MVP, we read from lake tier (Parquet/ORC files) using the underlying reader.
// The FE has already determined which files to read based on the LakeSnapshot.
// Future phases will add support for LOG_ONLY and HYBRID tiers via JNI bridge.
RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized

99 changes: 99 additions & 0 deletions be/src/vec/exec/format/table/fluss_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <vector>

#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/table_format_reader.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

// FlussReader wraps Parquet/ORC readers for Fluss table format
// For MVP, this is a simple wrapper. Future enhancements will integrate
// Fluss Rust C++ bindings for direct data access.
class FlussReader : public TableFormatReader {
public:
FlussReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache);

~FlussReader() override = default;

Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;
};

class FlussParquetReader final : public FlussReader {
public:
ENABLE_FACTORY_CREATOR(FlussParquetReader);
FlussParquetReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx, FileMetaCache* meta_cache)
: FlussReader(std::move(file_format_reader), profile, state, params, range, io_ctx,
meta_cache) {};
~FlussParquetReader() final = default;

Status init_reader(
const std::vector<std::string>& read_table_col_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
return parquet_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts,
false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, nullptr);
}
};

class FlussOrcReader final : public FlussReader {
public:
ENABLE_FACTORY_CREATOR(FlussOrcReader);
FlussOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache)
: FlussReader(std::move(file_format_reader), profile, state, params, range, io_ctx,
meta_cache) {};
~FlussOrcReader() final = default;

Status init_reader(
const std::vector<std::string>& read_table_col_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
const orc::Type* orc_type_ptr = nullptr;
RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
return orc_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts,
false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, nullptr);
}
};

#include "common/compile_check_end.h"
} // namespace doris::vectorized

22 changes: 22 additions & 0 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "vec/exec/format/table/hive_reader.h"
#include "vec/exec/format/table/hudi_jni_reader.h"
#include "vec/exec/format/table/hudi_reader.h"
#include "vec/exec/format/table/fluss_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/format/table/lakesoul_jni_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
Expand Down Expand Up @@ -1244,6 +1245,16 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parque
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "fluss") {
std::unique_ptr<FlussParquetReader> fluss_reader = FlussParquetReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
file_meta_cache_ptr);
init_status = fluss_reader->init_reader(
_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(fluss_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
Expand Down Expand Up @@ -1358,6 +1369,17 @@ Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "fluss") {
std::unique_ptr<FlussOrcReader> fluss_reader =
FlussOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
range, _io_ctx.get(), file_meta_cache_ptr);

init_status = fluss_reader->init_reader(
_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
_cur_reader = std::move(fluss_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
std::unique_ptr<HudiOrcReader> hudi_reader =
Expand Down
Loading