diff --git a/be/src/vec/exec/format/table/fluss_reader.cpp b/be/src/vec/exec/format/table/fluss_reader.cpp new file mode 100644 index 00000000000000..041f7482fbd43a --- /dev/null +++ b/be/src/vec/exec/format/table/fluss_reader.cpp @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/fluss_reader.h" + +#include "common/logging.h" +#include "common/status.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +FlussReader::FlussReader(std::unique_ptr file_format_reader, + RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx, FileMetaCache* meta_cache) + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx, + meta_cache) { + // Log tier information for debugging + if (range.__isset.table_format_params && + range.table_format_params.__isset.fluss_params) { + const auto& fluss_params = range.table_format_params.fluss_params; + LOG(INFO) << "FlussReader initialized for table: " + << fluss_params.database_name << "." << fluss_params.table_name + << ", bucket: " << fluss_params.bucket_id + << ", format: " << fluss_params.file_format + << ", lake_snapshot_id: " << fluss_params.lake_snapshot_id + << ", lake_files: " << (fluss_params.__isset.lake_file_paths ? + fluss_params.lake_file_paths.size() : 0); + } +} + +Status FlussReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { + // For MVP, we read from lake tier (Parquet/ORC files) using the underlying reader. + // The FE has already determined which files to read based on the LakeSnapshot. + // Future phases will add support for LOG_ONLY and HYBRID tiers via JNI bridge. + RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); + return Status::OK(); +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized + diff --git a/be/src/vec/exec/format/table/fluss_reader.h b/be/src/vec/exec/format/table/fluss_reader.h new file mode 100644 index 00000000000000..1dc0b669c2e00a --- /dev/null +++ b/be/src/vec/exec/format/table/fluss_reader.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "vec/exec/format/orc/vorc_reader.h" +#include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exec/format/table/table_format_reader.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +// FlussReader wraps Parquet/ORC readers for Fluss table format +// For MVP, this is a simple wrapper. Future enhancements will integrate +// Fluss Rust C++ bindings for direct data access. +class FlussReader : public TableFormatReader { +public: + FlussReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache); + + ~FlussReader() override = default; + + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; +}; + +class FlussParquetReader final : public FlussReader { +public: + ENABLE_FACTORY_CREATOR(FlussParquetReader); + FlussParquetReader(std::unique_ptr file_format_reader, + RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx, FileMetaCache* meta_cache) + : FlussReader(std::move(file_format_reader), profile, state, params, range, io_ctx, + meta_cache) {}; + ~FlussParquetReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + auto* parquet_reader = static_cast(_file_format_reader.get()); + return parquet_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts, + false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts, nullptr); + } +}; + +class FlussOrcReader final : public FlussReader { +public: + ENABLE_FACTORY_CREATOR(FlussOrcReader); + FlussOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache) + : FlussReader(std::move(file_format_reader), profile, state, params, range, io_ctx, + meta_cache) {}; + ~FlussOrcReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + std::unordered_map* col_name_to_block_idx, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + auto* orc_reader = static_cast(_file_format_reader.get()); + const orc::Type* orc_type_ptr = nullptr; + RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr)); + return orc_reader->init_reader(&read_table_col_names, col_name_to_block_idx, conjuncts, + false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts, nullptr); + } +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized + diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 414620b0cbe15b..5b2e0b26d979f6 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -64,6 +64,7 @@ #include "vec/exec/format/table/hive_reader.h" #include "vec/exec/format/table/hudi_jni_reader.h" #include "vec/exec/format/table/hudi_reader.h" +#include "vec/exec/format/table/fluss_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/lakesoul_jni_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" @@ -1244,6 +1245,16 @@ Status FileScanner::_init_parquet_reader(std::unique_ptr&& parque &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "fluss") { + std::unique_ptr fluss_reader = FlussParquetReader::create_unique( + std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(), + file_meta_cache_ptr); + init_status = fluss_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), _col_name_to_slot_id, + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(fluss_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { std::unique_ptr hudi_reader = HudiParquetReader::create_unique( @@ -1358,6 +1369,17 @@ Status FileScanner::_init_orc_reader(std::unique_ptr&& orc_reader, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "fluss") { + std::unique_ptr fluss_reader = + FlussOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params, + range, _io_ctx.get(), file_meta_cache_ptr); + + init_status = fluss_reader->init_reader( + _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc, + _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(fluss_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { std::unique_ptr hudi_reader = diff --git a/docker/integration-test/fluss/README.md b/docker/integration-test/fluss/README.md new file mode 100644 index 00000000000000..d3958357ba3e99 --- /dev/null +++ b/docker/integration-test/fluss/README.md @@ -0,0 +1,263 @@ +# Doris-Fluss Integration Test Environment + +This directory contains Docker Compose configuration for running integration tests between Apache Doris and Apache Fluss. + +## Prerequisites + +- Docker Engine 20.10+ +- Docker Compose 2.0+ +- 8GB+ available RAM +- Network access for pulling images + +## Quick Start + +### 1. Start the Test Environment + +```bash +cd docker/integration-test/fluss + +# Start all services +docker-compose up -d + +# Wait for services to be healthy (about 2-3 minutes) +docker-compose ps + +# Check logs if needed +docker-compose logs -f fluss-coordinator +``` + +### 2. Verify Fluss is Running + +```bash +# Check coordinator health +curl http://localhost:9123/health + +# List databases via Fluss CLI +docker exec -it fluss-coordinator /opt/fluss/bin/fluss-client.sh \ + --bootstrap-server localhost:9123 \ + -e "SHOW DATABASES" +``` + +### 3. Run Doris Tests + +```bash +# From Doris root directory +./run-regression-test.sh \ + --suite external_table_p0/fluss \ + -conf flussBootstrapServers=localhost:9123 \ + -conf enableFlussTest=true +``` + +## Services + +| Service | Port | Description | +|---------|------|-------------| +| ZooKeeper | 2181 | Coordination service for Fluss | +| Fluss Coordinator | 9123 | Metadata and cluster management | +| Fluss Tablet Server | 9124 | Data storage and serving | +| MinIO | 9000/9001 | S3-compatible storage for lake data | + +## Test Tables + +The `fluss-init` service automatically creates test tables: + +### all_types +Primary key table with all supported data types: +```sql +CREATE TABLE test_db.all_types ( + id INT PRIMARY KEY, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + decimal_col DECIMAL(10, 2), + string_col STRING, + date_col DATE, + timestamp_col TIMESTAMP(3) +); +``` + +### partitioned_table +Partitioned primary key table: +```sql +CREATE TABLE test_db.partitioned_table ( + id INT, + name STRING, + value DOUBLE, + dt STRING, + PRIMARY KEY (id, dt) NOT ENFORCED +) PARTITIONED BY (dt); +``` + +### log_table +Append-only log table: +```sql +CREATE TABLE test_db.log_table ( + id INT, + message STRING, + created_at TIMESTAMP(3) +); +``` + +## Loading Test Data + +### Option 1: Via Flink SQL + +```bash +# Start Flink services +docker-compose --profile flink up -d + +# Connect to Flink SQL client +docker exec -it flink-jobmanager ./bin/sql-client.sh + +# In Flink SQL: +CREATE CATALOG fluss WITH ( + 'type' = 'fluss', + 'bootstrap.servers' = 'fluss-coordinator:9123' +); + +USE CATALOG fluss; +USE test_db; + +INSERT INTO all_types VALUES + (1, true, 1, 100, 1000, 10000, 1.1, 2.2, 99.99, 'test1', DATE '2024-01-01', TIMESTAMP '2024-01-01 10:00:00'), + (2, false, 2, 200, 2000, 20000, 2.2, 3.3, 199.99, 'test2', DATE '2024-01-02', TIMESTAMP '2024-01-02 11:00:00'); +``` + +### Option 2: Via Fluss Client + +```bash +docker exec -it fluss-coordinator /opt/fluss/bin/fluss-client.sh \ + --bootstrap-server localhost:9123 < + /bin/sh -c " + mc alias set myminio http://minio:9000 minioadmin minioadmin; + mc mb myminio/fluss-lake --ignore-existing; + mc anonymous set public myminio/fluss-lake; + exit 0; + " + networks: + - doris-fluss-net + + # =========================================== + # Fluss Coordinator Server + # =========================================== + fluss-coordinator: + image: fluss/fluss:latest + container_name: fluss-coordinator + hostname: fluss-coordinator + ports: + - "9123:9123" + depends_on: + zookeeper: + condition: service_healthy + minio-init: + condition: service_completed_successfully + environment: + FLUSS_MODE: coordinator + FLUSS_PROPERTIES: | + coordinator.host=fluss-coordinator + coordinator.port=9123 + zookeeper.address=zookeeper:2181 + zookeeper.path.root=/fluss + remote.data.dir=s3://fluss-lake/data + s3.endpoint=http://minio:9000 + s3.access-key=minioadmin + s3.secret-key=minioadmin + s3.path-style-access=true + volumes: + - fluss-coordinator-data:/opt/fluss/data + networks: + - doris-fluss-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9123/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + + # =========================================== + # Fluss Tablet Server (Data Node) + # =========================================== + fluss-tablet-server: + image: fluss/fluss:latest + container_name: fluss-tablet-server + hostname: fluss-tablet-server + ports: + - "9124:9124" + depends_on: + fluss-coordinator: + condition: service_healthy + environment: + FLUSS_MODE: tablet-server + FLUSS_PROPERTIES: | + tablet-server.host=fluss-tablet-server + tablet-server.port=9124 + coordinator.address=fluss-coordinator:9123 + data.dir=/opt/fluss/data + remote.data.dir=s3://fluss-lake/data + s3.endpoint=http://minio:9000 + s3.access-key=minioadmin + s3.secret-key=minioadmin + s3.path-style-access=true + volumes: + - fluss-tablet-data:/opt/fluss/data + networks: + - doris-fluss-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9124/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + + # =========================================== + # Test Data Initializer + # =========================================== + fluss-init: + image: fluss/fluss:latest + container_name: fluss-init + depends_on: + fluss-tablet-server: + condition: service_healthy + entrypoint: > + /bin/sh -c " + echo 'Waiting for Fluss cluster to be ready...'; + sleep 10; + /opt/fluss/bin/fluss-client.sh --bootstrap-server fluss-coordinator:9123 < buildDbForInit(String remote return new TrinoConnectorExternalDatabase(this, dbId, localDbName, remoteDbName); case REMOTE_DORIS: return new RemoteDorisExternalDatabase(this, dbId, localDbName, remoteDbName); + case FLUSS: + return new FlussExternalDatabase(this, dbId, localDbName, remoteDbName); default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java index 2631ff28cc112c..fd249d0f6b7b0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -44,6 +44,7 @@ public enum Type { TEST, TRINO_CONNECTOR, REMOTE_DORIS, + FLUSS, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java index ba927d8b6906fd..96fbf306f65b7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java @@ -45,6 +45,7 @@ public enum Type { INFO_SCHEMA_DB, TRINO_CONNECTOR, REMOTE_DORIS, + FLUSS, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java index 10d4fd25bcbc8b..50f06028c38939 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java @@ -27,6 +27,7 @@ public enum TableFormatType { LAKESOUL("lakesoul"), TRINO_CONNECTOR("trino_connector"), TVF("tvf"), + FLUSS("fluss"), REMOTE_DORIS("remote_doris"); private final String tableFormatType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalog.java new file mode 100644 index 00000000000000..f950bd46f1f324 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalog.java @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; +import org.apache.doris.transaction.TransactionManagerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class FlussExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(FlussExternalCatalog.class); + + public static final String FLUSS_COORDINATOR_URI = "fluss.coordinator.uri"; + public static final String FLUSS_BOOTSTRAP_SERVERS = "bootstrap.servers"; + public static final String FLUSS_SECURITY_PROTOCOL = "fluss.security.protocol"; + public static final String FLUSS_SASL_MECHANISM = "fluss.sasl.mechanism"; + public static final String FLUSS_SASL_USERNAME = "fluss.sasl.username"; + public static final String FLUSS_SASL_PASSWORD = "fluss.sasl.password"; + public static final String FLUSS_ENABLE_MAPPING_VARBINARY = "fluss.enable.mapping.varbinary"; + public static final String FLUSS_TABLE_META_CACHE_TTL_SECOND = "fluss.table.meta.cache.ttl.second"; + + protected Connection flussConnection; + protected Admin flussAdmin; + + public FlussExternalCatalog(long catalogId, String name, String resource, Map props, + String comment) { + super(catalogId, name, InitCatalogLog.Type.FLUSS, comment); + this.catalogProperty = new CatalogProperty(resource, props); + } + + @Override + public void checkProperties() throws DdlException { + super.checkProperties(); + String coordinatorUri = catalogProperty.getOrDefault(FLUSS_COORDINATOR_URI, null); + String bootstrapServers = catalogProperty.getOrDefault(FLUSS_BOOTSTRAP_SERVERS, null); + if (StringUtils.isEmpty(coordinatorUri) && StringUtils.isEmpty(bootstrapServers)) { + throw new DdlException("Missing required property: " + FLUSS_COORDINATOR_URI + + " or " + FLUSS_BOOTSTRAP_SERVERS); + } + } + + @Override + protected void initLocalObjectsImpl() { + Configuration conf = createFlussConfiguration(); + flussConnection = ConnectionFactory.createConnection(conf); + flussAdmin = flussConnection.getAdmin(); + initPreExecutionAuthenticator(); + FlussMetadataOps ops = ExternalMetadataOperations.newFlussMetadataOps(this, flussConnection); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("fluss_catalog_%s_executor_pool", name), + true, + executionAuthenticator); + metadataOps = ops; + } + + private Configuration createFlussConfiguration() { + Configuration conf = new Configuration(); + Map props = catalogProperty.getProperties(); + + // Set bootstrap.servers or coordinator URI + String coordinatorUri = props.get(FLUSS_COORDINATOR_URI); + String bootstrapServers = props.get(FLUSS_BOOTSTRAP_SERVERS); + if (StringUtils.isNotEmpty(bootstrapServers)) { + conf.setString(FLUSS_BOOTSTRAP_SERVERS, bootstrapServers); + } else if (StringUtils.isNotEmpty(coordinatorUri)) { + // If coordinator URI is provided, use it as bootstrap servers + conf.setString(FLUSS_BOOTSTRAP_SERVERS, coordinatorUri); + } + + // Copy other Fluss client properties (with fluss. prefix removed) + for (Map.Entry entry : props.entrySet()) { + String key = entry.getKey(); + if (key.startsWith("fluss.") && !key.equals(FLUSS_COORDINATOR_URI)) { + String flussKey = key.substring("fluss.".length()); + conf.setString(flussKey, entry.getValue()); + } + } + + return conf; + } + + @Override + protected synchronized void initPreExecutionAuthenticator() { + if (executionAuthenticator == null) { + executionAuthenticator = new org.apache.doris.common.security.authentication.ExecutionAuthenticator() {}; + } + } + + public Connection getFlussConnection() { + makeSureInitialized(); + return flussConnection; + } + + public Admin getFlussAdmin() { + makeSureInitialized(); + return flussAdmin; + } + + public String getBootstrapServers() { + String bootstrapServers = catalogProperty.getOrDefault(FLUSS_BOOTSTRAP_SERVERS, null); + if (bootstrapServers == null) { + bootstrapServers = catalogProperty.getOrDefault(FLUSS_COORDINATOR_URI, null); + } + return bootstrapServers; + } + + public String getSecurityProtocol() { + return catalogProperty.getOrDefault(FLUSS_SECURITY_PROTOCOL, null); + } + + public String getSaslMechanism() { + return catalogProperty.getOrDefault(FLUSS_SASL_MECHANISM, null); + } + + public String getSaslUsername() { + return catalogProperty.getOrDefault(FLUSS_SASL_USERNAME, null); + } + + public String getSaslPassword() { + return catalogProperty.getOrDefault(FLUSS_SASL_PASSWORD, null); + } + + public boolean getEnableMappingVarbinary() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(FLUSS_ENABLE_MAPPING_VARBINARY, "false")); + } + + @Override + protected List listDatabaseNames() { + makeSureInitialized(); + try { + CompletableFuture> future = flussAdmin.listDatabases(); + List databases = future.get(); + return databases != null ? databases : new ArrayList<>(); + } catch (Exception e) { + throw new RuntimeException("Failed to list databases, catalog name: " + getName(), e); + } + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); + try { + return executionAuthenticator.execute(() -> { + try { + org.apache.fluss.metadata.TablePath tablePath = + org.apache.fluss.metadata.TablePath.of(dbName, tblName); + CompletableFuture future = + flussAdmin.getTableInfo(tablePath); + future.get(); // Will throw exception if table doesn't exist + return true; + } catch (Exception e) { + if (ExceptionUtils.getRootCause(e) instanceof TableNotExistException) { + return false; + } + throw new RuntimeException("Failed to check table existence", e); + } + }); + } catch (Exception e) { + throw new RuntimeException("Failed to check table existence, catalog name: " + getName() + + ", error message: " + ExceptionUtils.getRootCauseMessage(e), e); + } + } + + @Override + public List listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); + try { + return executionAuthenticator.execute(() -> { + try { + CompletableFuture> future = flussAdmin.listTables(dbName); + List tables = future.get(); + return tables != null ? tables : new ArrayList<>(); + } catch (Exception e) { + LOG.warn("Failed to list tables for database: " + dbName, e); + return new ArrayList<>(); + } + }); + } catch (Exception e) { + throw new RuntimeException("Failed to list table names, catalog name: " + getName(), e); + } + } + + @Override + public void close() { + if (flussConnection != null) { + try { + flussConnection.close(); + } catch (Exception e) { + LOG.warn("Failed to close Fluss connection", e); + } + flussConnection = null; + flussAdmin = null; + } + super.close(); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactory.java new file mode 100644 index 00000000000000..93c1e3b2551f87 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactory.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalCatalog; + +import java.util.Map; + +public class FlussExternalCatalogFactory { + + public static ExternalCatalog createCatalog(long catalogId, String name, String resource, + Map props, String comment) throws DdlException { + return new FlussExternalCatalog(catalogId, name, resource, props, comment); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalDatabase.java new file mode 100644 index 00000000000000..2fd014c798d14a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalDatabase.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.InitDatabaseLog; + +public class FlussExternalDatabase extends ExternalDatabase { + + public FlussExternalDatabase(ExternalCatalog extCatalog, Long id, String name, String remoteName) { + super(extCatalog, id, name, remoteName, InitDatabaseLog.Type.FLUSS); + } + + @Override + public FlussExternalTable buildTableInternal(String remoteTableName, String localTableName, long tblId, + ExternalCatalog catalog, ExternalDatabase db) { + return new FlussExternalTable(tblId, localTableName, remoteTableName, (FlussExternalCatalog) extCatalog, + (FlussExternalDatabase) db); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalTable.java new file mode 100644 index 00000000000000..2e10f1c2bcb48f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalTable.java @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.thrift.TFlussTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import org.apache.fluss.metadata.TableInfo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +public class FlussExternalTable extends ExternalTable { + + public enum FlussTableType { + LOG_TABLE, + PRIMARY_KEY_TABLE + } + + private volatile FlussTableMetadata tableMetadata; + + public FlussExternalTable(long id, String name, String remoteName, FlussExternalCatalog catalog, + FlussExternalDatabase db) { + super(id, name, remoteName, catalog, db, TableType.FLUSS_EXTERNAL_TABLE); + } + + @Override + public Optional initSchema(SchemaCacheKey key) { + makeSureInitialized(); + return FlussUtils.loadSchemaCacheValue(this); + } + + @Override + public TTableDescriptor toThrift() { + List schema = getFullSchema(); + TFlussTable tFlussTable = new TFlussTable(getDbName(), getName(), new HashMap<>()); + tFlussTable.setBootstrap_servers(getBootstrapServers()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.FLUSS_EXTERNAL_TABLE, + schema.size(), 0, getName(), getDbName()); + tTableDescriptor.setFlussTable(tFlussTable); + return tTableDescriptor; + } + + public String getBootstrapServers() { + FlussExternalCatalog catalog = (FlussExternalCatalog) getCatalog(); + return catalog.getBootstrapServers(); + } + + public int getNumBuckets() { + ensureTableMetadataLoaded(); + return tableMetadata != null ? tableMetadata.getNumBuckets() : 1; + } + + public List getPartitionKeys() { + ensureTableMetadataLoaded(); + return tableMetadata != null ? tableMetadata.getPartitionKeys() : new ArrayList<>(); + } + + public List getPrimaryKeys() { + ensureTableMetadataLoaded(); + return tableMetadata != null ? tableMetadata.getPrimaryKeys() : new ArrayList<>(); + } + + public FlussTableType getFlussTableType() { + ensureTableMetadataLoaded(); + return tableMetadata != null ? tableMetadata.getTableType() : FlussTableType.LOG_TABLE; + } + + public String getRemoteDbName() { + return ((FlussExternalDatabase) getDatabase()).getRemoteName(); + } + + public String getRemoteName() { + return remoteName; + } + + private void ensureTableMetadataLoaded() { + if (tableMetadata == null) { + synchronized (this) { + if (tableMetadata == null) { + loadTableMetadata(); + } + } + } + } + + private void loadTableMetadata() { + try { + FlussExternalCatalog catalog = (FlussExternalCatalog) getCatalog(); + FlussMetadataOps metadataOps = (FlussMetadataOps) catalog.getMetadataOps(); + this.tableMetadata = metadataOps.getTableMetadata(getRemoteDbName(), getRemoteName()); + } catch (Exception e) { + // Use defaults if metadata loading fails + this.tableMetadata = new FlussTableMetadata(); + } + } + + public static class FlussTableMetadata { + private FlussTableType tableType = FlussTableType.LOG_TABLE; + private List primaryKeys = new ArrayList<>(); + private List partitionKeys = new ArrayList<>(); + private int numBuckets = 1; + + public FlussTableType getTableType() { + return tableType; + } + + public void setTableType(FlussTableType tableType) { + this.tableType = tableType; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + + public void setPrimaryKeys(List primaryKeys) { + this.primaryKeys = primaryKeys != null ? primaryKeys : new ArrayList<>(); + } + + public List getPartitionKeys() { + return partitionKeys; + } + + public void setPartitionKeys(List partitionKeys) { + this.partitionKeys = partitionKeys != null ? partitionKeys : new ArrayList<>(); + } + + public int getNumBuckets() { + return numBuckets; + } + + public void setNumBuckets(int numBuckets) { + this.numBuckets = numBuckets > 0 ? numBuckets : 1; + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java new file mode 100644 index 00000000000000..8967396f078e9c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +public class FlussMetadataOps implements Closeable { + private static final Logger LOG = LogManager.getLogger(FlussMetadataOps.class); + + private static final int MAX_RETRY_ATTEMPTS = 3; + private static final long INITIAL_RETRY_DELAY_MS = 100; + private static final long MAX_RETRY_DELAY_MS = 5000; + + private final FlussExternalCatalog catalog; + private final String bootstrapServers; + + private final Map tableMetadataCache; + private final Map> databaseTablesCache; + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); + + private volatile Connection connection; + private volatile Admin admin; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Object connectionLock = new Object(); + + public FlussMetadataOps(FlussExternalCatalog catalog) { + this.catalog = catalog; + this.bootstrapServers = catalog.getBootstrapServers(); + this.tableMetadataCache = new HashMap<>(); + this.databaseTablesCache = new HashMap<>(); + } + + public FlussMetadataOps(org.apache.doris.datasource.ExternalCatalog catalog, Connection connection) { + this.catalog = (FlussExternalCatalog) catalog; + this.bootstrapServers = this.catalog.getBootstrapServers(); + this.tableMetadataCache = new HashMap<>(); + this.databaseTablesCache = new HashMap<>(); + this.connection = connection; + this.admin = connection.getAdmin(); + this.initialized.set(true); + } + + private void ensureConnection() { + if (closed.get()) { + throw new IllegalStateException("FlussMetadataOps is closed"); + } + + if (!initialized.get()) { + synchronized (connectionLock) { + if (!initialized.get() && !closed.get()) { + initConnectionWithRetry(); + initialized.set(true); + } + } + } + } + + private void initConnectionWithRetry() { + LOG.info("Initializing connection to Fluss cluster: {}", bootstrapServers); + + executeWithRetry(() -> { + Configuration conf = new Configuration(); + conf.setString("bootstrap.servers", bootstrapServers); + + String securityProtocol = catalog.getSecurityProtocol(); + if (securityProtocol != null) { + conf.setString("client.security.protocol", securityProtocol); + String saslMechanism = catalog.getSaslMechanism(); + if (saslMechanism != null) { + conf.setString("client.security.sasl.mechanism", saslMechanism); + } + String saslUsername = catalog.getSaslUsername(); + if (saslUsername != null) { + conf.setString("client.security.sasl.username", saslUsername); + } + String saslPassword = catalog.getSaslPassword(); + if (saslPassword != null) { + conf.setString("client.security.sasl.password", saslPassword); + } + } + + connection = ConnectionFactory.createConnection(conf); + admin = connection.getAdmin(); + return null; + }, "initialize Fluss connection"); + + LOG.info("Successfully connected to Fluss cluster: {}", bootstrapServers); + } + + private T executeWithRetry(Supplier operation, String operationName) { + int attempt = 0; + long delayMs = INITIAL_RETRY_DELAY_MS; + Exception lastException = null; + + while (attempt < MAX_RETRY_ATTEMPTS) { + try { + return operation.get(); + } catch (Exception e) { + lastException = e; + attempt++; + + if (attempt < MAX_RETRY_ATTEMPTS && isRetryable(e)) { + LOG.warn("Failed to {}, attempt {}/{}, retrying in {}ms", + operationName, attempt, MAX_RETRY_ATTEMPTS, delayMs, e); + try { + Thread.sleep(delayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while retrying " + operationName, ie); + } + delayMs = Math.min(delayMs * 2, MAX_RETRY_DELAY_MS); + } else { + break; + } + } + } + + throw new RuntimeException("Failed to " + operationName + " after " + MAX_RETRY_ATTEMPTS + + " attempts", lastException); + } + + private boolean isRetryable(Exception e) { + String message = e.getMessage(); + if (message == null) { + return true; + } + String lowerMessage = message.toLowerCase(); + return lowerMessage.contains("timeout") + || lowerMessage.contains("connection") + || lowerMessage.contains("unavailable") + || lowerMessage.contains("retry") + || lowerMessage.contains("temporary"); + } + + public List listDatabaseNames() { + LOG.debug("Listing databases from Fluss catalog"); + ensureConnection(); + + return executeWithRetry(() -> { + try { + return admin.listDatabases().get(); + } catch (Exception e) { + throw new RuntimeException("Failed to list databases", e); + } + }, "list databases"); + } + + public boolean databaseExist(String dbName) { + LOG.debug("Checking if database exists: {}", dbName); + ensureConnection(); + + return executeWithRetry(() -> { + try { + return admin.databaseExists(dbName).get(); + } catch (Exception e) { + throw new RuntimeException("Failed to check database existence: " + dbName, e); + } + }, "check database existence"); + } + + public List listTableNames(String dbName) { + LOG.debug("Listing tables from database: {}", dbName); + + cacheLock.readLock().lock(); + try { + List cachedTables = databaseTablesCache.get(dbName); + if (cachedTables != null) { + return new ArrayList<>(cachedTables); + } + } finally { + cacheLock.readLock().unlock(); + } + + ensureConnection(); + + List tables = executeWithRetry(() -> { + try { + return admin.listTables(dbName).get(); + } catch (Exception e) { + throw new RuntimeException("Failed to list tables in database: " + dbName, e); + } + }, "list tables"); + + cacheLock.writeLock().lock(); + try { + databaseTablesCache.put(dbName, new ArrayList<>(tables)); + } finally { + cacheLock.writeLock().unlock(); + } + + return tables; + } + + public boolean tableExist(String dbName, String tableName) { + LOG.debug("Checking if table exists: {}.{}", dbName, tableName); + return listTableNames(dbName).contains(tableName); + } + + public FlussExternalTable.FlussTableMetadata getTableMetadata(String dbName, String tableName) { + String cacheKey = dbName + "." + tableName; + + cacheLock.readLock().lock(); + try { + FlussExternalTable.FlussTableMetadata cached = tableMetadataCache.get(cacheKey); + if (cached != null) { + return cached; + } + } finally { + cacheLock.readLock().unlock(); + } + + LOG.debug("Fetching metadata for table: {}.{}", dbName, tableName); + + FlussExternalTable.FlussTableMetadata metadata = new FlussExternalTable.FlussTableMetadata(); + + try { + org.apache.fluss.metadata.TablePath tablePath = + org.apache.fluss.metadata.TablePath.of(dbName, tableName); + org.apache.fluss.metadata.TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + + if (tableInfo != null) { + // Determine table type based on primary keys + List primaryKeys = tableInfo.getPrimaryKeys(); + if (primaryKeys != null && !primaryKeys.isEmpty()) { + metadata.setTableType(FlussExternalTable.FlussTableType.PRIMARY_KEY_TABLE); + metadata.setPrimaryKeys(primaryKeys); + } else { + metadata.setTableType(FlussExternalTable.FlussTableType.LOG_TABLE); + metadata.setPrimaryKeys(Collections.emptyList()); + } + + // Get partition keys + List partitionKeys = tableInfo.getPartitionKeys(); + metadata.setPartitionKeys(partitionKeys != null ? partitionKeys : Collections.emptyList()); + + // Get bucket count + int numBuckets = tableInfo.getNumBuckets(); + metadata.setNumBuckets(numBuckets > 0 ? numBuckets : 1); + } + } catch (Exception e) { + LOG.warn("Failed to fetch table metadata for {}.{}, using defaults", dbName, tableName, e); + metadata.setTableType(FlussExternalTable.FlussTableType.LOG_TABLE); + metadata.setPrimaryKeys(Collections.emptyList()); + metadata.setPartitionKeys(Collections.emptyList()); + metadata.setNumBuckets(1); + } + + cacheLock.writeLock().lock(); + try { + tableMetadataCache.put(cacheKey, metadata); + } finally { + cacheLock.writeLock().unlock(); + } + + return metadata; + } + + public org.apache.fluss.metadata.TableInfo getTableInfo(String dbName, String tableName) { + LOG.debug("Fetching TableInfo for table: {}.{}", dbName, tableName); + ensureConnection(); + + return executeWithRetry(() -> { + try { + org.apache.fluss.metadata.TablePath tablePath = + org.apache.fluss.metadata.TablePath.of(dbName, tableName); + return admin.getTableInfo(tablePath).get(); + } catch (Exception e) { + throw new RuntimeException("Failed to get table info: " + dbName + "." + tableName, e); + } + }, "get table info"); + } + + public List getTableSchema(String dbName, String tableName) { + LOG.debug("Fetching schema for table: {}.{}", dbName, tableName); + return new ArrayList<>(); + } + + public long getTableRowCount(String dbName, String tableName) { + LOG.debug("Fetching row count for table: {}.{}", dbName, tableName); + return -1; + } + + public static Type flussTypeToDorisType(String flussType) { + if (flussType == null) { + return Type.STRING; + } + + switch (flussType.toUpperCase()) { + case "BOOLEAN": + return Type.BOOLEAN; + case "TINYINT": + case "INT8": + return Type.TINYINT; + case "SMALLINT": + case "INT16": + return Type.SMALLINT; + case "INT": + case "INT32": + case "INTEGER": + return Type.INT; + case "BIGINT": + case "INT64": + return Type.BIGINT; + case "FLOAT": + return Type.FLOAT; + case "DOUBLE": + return Type.DOUBLE; + case "STRING": + case "VARCHAR": + return ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH); + case "BINARY": + case "BYTES": + return Type.STRING; + case "DATE": + return ScalarType.createDateV2Type(); + case "TIME": + return ScalarType.createTimeV2Type(0); + case "TIMESTAMP": + case "TIMESTAMP_WITHOUT_TIME_ZONE": + return ScalarType.createDatetimeV2Type(6); + case "TIMESTAMP_LTZ": + case "TIMESTAMP_WITH_LOCAL_TIME_ZONE": + return ScalarType.createDatetimeV2Type(6); + case "DECIMAL": + return ScalarType.createDecimalV3Type(38, 18); + default: + LOG.warn("Unknown Fluss type: {}, defaulting to STRING", flussType); + return Type.STRING; + } + } + + public void invalidateTableCache(String dbName, String tableName) { + String cacheKey = dbName + "." + tableName; + cacheLock.writeLock().lock(); + try { + tableMetadataCache.remove(cacheKey); + } finally { + cacheLock.writeLock().unlock(); + } + LOG.debug("Invalidated cache for table: {}", cacheKey); + } + + public void invalidateDatabaseCache(String dbName) { + cacheLock.writeLock().lock(); + try { + databaseTablesCache.remove(dbName); + String prefix = dbName + "."; + tableMetadataCache.entrySet().removeIf(entry -> entry.getKey().startsWith(prefix)); + } finally { + cacheLock.writeLock().unlock(); + } + LOG.debug("Invalidated cache for database: {}", dbName); + } + + public void clearCache() { + cacheLock.writeLock().lock(); + try { + tableMetadataCache.clear(); + databaseTablesCache.clear(); + } finally { + cacheLock.writeLock().unlock(); + } + LOG.debug("Cleared all metadata cache"); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + LOG.info("Closing FlussMetadataOps"); + clearCache(); + + synchronized (connectionLock) { + if (admin != null) { + try { + admin.close(); + } catch (Exception e) { + LOG.warn("Error closing Fluss admin", e); + } + admin = null; + } + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + LOG.warn("Error closing Fluss connection", e); + } + connection = null; + } + initialized.set(false); + } + } + } + + public Connection getConnection() { + ensureConnection(); + return connection; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussUtils.java new file mode 100644 index 00000000000000..720f002bab0369 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussUtils.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.nereids.types.VarBinaryType; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataField; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.MapType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class FlussUtils { + private static final Logger LOG = LogManager.getLogger(FlussUtils.class); + + /** + * Load schema cache value from Fluss table + */ + public static Optional loadSchemaCacheValue(FlussExternalTable table) { + try { + FlussExternalCatalog catalog = (FlussExternalCatalog) table.getCatalog(); + FlussMetadataOps metadataOps = (FlussMetadataOps) catalog.getMetadataOps(); + + TableInfo tableInfo = metadataOps.getTableInfo( + table.getRemoteDbName(), table.getRemoteName()); + RowType rowType = tableInfo.getRowType(); + + List columns = new ArrayList<>(); + List partitionColumns = new ArrayList<>(); + Set partitionKeys = tableInfo.getPartitionKeys() != null + ? tableInfo.getPartitionKeys().stream().collect(Collectors.toSet()) + : java.util.Collections.emptySet(); + + for (DataField field : rowType.getFields()) { + String fieldName = field.getName(); + DataType fieldType = field.getType(); + Type dorisType = flussTypeToDorisType(fieldType, catalog.getEnableMappingVarbinary()); + + Column column = new Column( + fieldName.toLowerCase(), + dorisType, + fieldType.isNullable(), + null, + true, + field.getDescription().orElse(null), + true, + -1); + + columns.add(column); + if (partitionKeys.contains(fieldName)) { + partitionColumns.add(column); + } + } + + return Optional.of(new SchemaCacheValue(columns, partitionColumns)); + } catch (Exception e) { + LOG.warn("Failed to load schema for Fluss table: {}.{}", + table.getDbName(), table.getName(), e); + throw new RuntimeException("Failed to load Fluss table schema: " + + ExceptionUtils.getRootCauseMessage(e), e); + } + } + + /** + * Convert Fluss DataType to Doris Type + */ + public static Type flussTypeToDorisType(DataType flussType, boolean enableMappingVarbinary) { + DataTypeRoot typeRoot = flussType.getTypeRoot(); + + switch (typeRoot) { + case BOOLEAN: + return Type.BOOLEAN; + case TINYINT: + return Type.TINYINT; + case SMALLINT: + return Type.SMALLINT; + case INT: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case STRING: + return Type.STRING; + case CHAR: + CharType charType = (CharType) flussType; + return ScalarType.createCharType(charType.getLength()); + case BINARY: + case BYTES: + if (enableMappingVarbinary) { + return ScalarType.createVarbinaryType(VarBinaryType.MAX_VARBINARY_LENGTH); + } else { + return Type.STRING; + } + case DECIMAL: + DecimalType decimalType = (DecimalType) flussType; + return ScalarType.createDecimalV3Type( + decimalType.getPrecision(), decimalType.getScale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP: + TimestampType timestampType = (TimestampType) flussType; + int precision = timestampType.getPrecision(); + return ScalarType.createDatetimeV2Type(Math.min(precision, 6)); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType localZonedType = (LocalZonedTimestampType) flussType; + int tzPrecision = localZonedType.getPrecision(); + return ScalarType.createDatetimeV2Type(Math.min(tzPrecision, 6)); + case ARRAY: + org.apache.fluss.types.ArrayType arrayType = (org.apache.fluss.types.ArrayType) flussType; + Type elementType = flussTypeToDorisType(arrayType.getElementType(), enableMappingVarbinary); + return ArrayType.create(elementType, arrayType.getElementType().isNullable()); + case MAP: + org.apache.fluss.types.MapType mapType = (org.apache.fluss.types.MapType) flussType; + Type keyType = flussTypeToDorisType(mapType.getKeyType(), enableMappingVarbinary); + Type valueType = flussTypeToDorisType(mapType.getValueType(), enableMappingVarbinary); + return new MapType(keyType, valueType); + case ROW: + RowType rowType = (RowType) flussType; + List structFields = new ArrayList<>(); + for (DataField field : rowType.getFields()) { + Type fieldType = flussTypeToDorisType(field.getType(), enableMappingVarbinary); + structFields.add(new StructField(field.getName(), fieldType)); + } + return new StructType(structFields); + default: + throw new IllegalArgumentException("Unsupported Fluss type: " + typeRoot); + } + } + + /** + * Get Fluss table instance + */ + public static org.apache.fluss.client.table.Table getFlussTable(FlussExternalTable table) { + FlussExternalCatalog catalog = (FlussExternalCatalog) table.getCatalog(); + org.apache.fluss.metadata.TablePath tablePath = + org.apache.fluss.metadata.TablePath.of(table.getRemoteDbName(), table.getRemoteName()); + FlussMetadataOps metadataOps = (FlussMetadataOps) catalog.getMetadataOps(); + TableInfo tableInfo = metadataOps.getTableInfo(table.getRemoteDbName(), table.getRemoteName()); + return new org.apache.fluss.client.table.FlussTable( + catalog.getFlussConnection(), tablePath, tableInfo); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussScanNode.java new file mode 100644 index 00000000000000..170c1ba49aba6c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussScanNode.java @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalUtil; +import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.Split; +import org.apache.doris.datasource.TableFormatType; +import org.apache.doris.datasource.fluss.FlussExternalCatalog; +import org.apache.doris.datasource.fluss.FlussExternalTable; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFlussFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.snapshot.TableSnapshot; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FlussScanNode extends FileQueryScanNode { + private static final Logger LOG = LogManager.getLogger(FlussScanNode.class); + + private FlussSource source; + + public FlussScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + SessionVariable sv) { + super(id, desc, "FLUSS_SCAN_NODE", needCheckColumnPriv, sv); + source = new FlussSource(desc); + } + + @VisibleForTesting + public FlussScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { + super(id, desc, "FLUSS_SCAN_NODE", false, sv); + } + + @Override + protected void doInitialize() throws UserException { + super.doInitialize(); + ExternalUtil.initSchemaInfo(params, -1L, source.getTargetTable().getColumns()); + } + + @Override + protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (split instanceof FlussSplit) { + setFlussParams(rangeDesc, (FlussSplit) split); + } + } + + private void setFlussParams(TFileRangeDesc rangeDesc, FlussSplit flussSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(TableFormatType.FLUSS.value()); + + TFlussFileDesc flussFileDesc = new TFlussFileDesc(); + flussFileDesc.setDatabase_name(flussSplit.getDatabaseName()); + flussFileDesc.setTable_name(flussSplit.getTableName()); + flussFileDesc.setTable_id(flussSplit.getTableId()); + flussFileDesc.setBucket_id(flussSplit.getBucketId()); + if (flussSplit.getPartitionName() != null) { + flussFileDesc.setPartition_name(flussSplit.getPartitionName()); + } + flussFileDesc.setSnapshot_id(flussSplit.getSnapshotId()); + if (flussSplit.getBootstrapServers() != null) { + flussFileDesc.setBootstrap_servers(flussSplit.getBootstrapServers()); + } + + String fileFormat = flussSplit.getLakeFormat() != null ? flussSplit.getLakeFormat() : "parquet"; + flussFileDesc.setFile_format(fileFormat); + + flussFileDesc.setLake_snapshot_id(flussSplit.getLakeSnapshotId()); + if (flussSplit.hasLakeData()) { + flussFileDesc.setLake_file_paths(flussSplit.getLakeFilePaths()); + } + flussFileDesc.setLog_start_offset(flussSplit.getLogStartOffset()); + flussFileDesc.setLog_end_offset(flussSplit.getLogEndOffset()); + + if (fileFormat.equals("orc")) { + rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC); + } else { + rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + } + + tableFormatFileDesc.setFluss_params(flussFileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + @Override + public List getSplits(int numBackends) throws UserException { + List splits = new ArrayList<>(); + + try { + FlussExternalTable flussTable = source.getTargetTable(); + Table table = source.getFlussTable(); + TableInfo tableInfo = table.getTableInfo(); + long tableId = tableInfo.getTableId(); + int numBuckets = flussTable.getNumBuckets(); + List partitionKeys = flussTable.getPartitionKeys(); + String bootstrapServers = flussTable.getBootstrapServers(); + + LakeSnapshot lakeSnapshot = getLakeSnapshot(flussTable); + Map bucketOffsets = lakeSnapshot != null + ? lakeSnapshot.getTableBucketsOffset() + : new HashMap<>(); + long lakeSnapshotId = lakeSnapshot != null ? lakeSnapshot.getSnapshotId() : -1; + + Map> bucketLakeFiles = getLakeFilesPerBucket(flussTable, lakeSnapshotId); + + String lakeFormat = determineLakeFormat(tableInfo); + + if (partitionKeys == null || partitionKeys.isEmpty()) { + splits.addAll(generateSplitsForPartition( + flussTable, tableId, numBuckets, null, null, + bootstrapServers, bucketOffsets, bucketLakeFiles, lakeFormat, lakeSnapshotId)); + } else { + List partitions = getPartitions(table); + for (String partition : partitions) { + Long partitionId = getPartitionId(table, partition); + splits.addAll(generateSplitsForPartition( + flussTable, tableId, numBuckets, partition, partitionId, + bootstrapServers, bucketOffsets, bucketLakeFiles, lakeFormat, lakeSnapshotId)); + } + } + + if (splits.isEmpty()) { + FlussSplit fallbackSplit = FlussSplit.createLakeSplit( + flussTable.getRemoteDbName(), + flussTable.getRemoteName(), + tableId, 0, null, bootstrapServers, + Collections.singletonList(buildFilePath(flussTable, null, 0)), + lakeFormat, lakeSnapshotId); + splits.add(fallbackSplit); + } + + long targetSplitSize = getRealFileSplitSize(0); + splits.forEach(s -> s.setTargetSplitSize(targetSplitSize)); + + LOG.info("Created {} Fluss splits for table {}.{} (lake={}, log={}, hybrid={})", + splits.size(), flussTable.getRemoteDbName(), flussTable.getRemoteName(), + countSplitsByTier(splits, FlussSplit.SplitTier.LAKE_ONLY), + countSplitsByTier(splits, FlussSplit.SplitTier.LOG_ONLY), + countSplitsByTier(splits, FlussSplit.SplitTier.HYBRID)); + + } catch (Exception e) { + LOG.error("Failed to get Fluss splits", e); + throw new UserException("Failed to get Fluss splits: " + e.getMessage(), e); + } + + return splits; + } + + private List generateSplitsForPartition( + FlussExternalTable flussTable, long tableId, int numBuckets, + String partitionName, Long partitionId, String bootstrapServers, + Map bucketOffsets, Map> bucketLakeFiles, + String lakeFormat, long lakeSnapshotId) { + + List splits = new ArrayList<>(); + String dbName = flussTable.getRemoteDbName(); + String tableName = flussTable.getRemoteName(); + + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + + Long lakeOffset = bucketOffsets.get(tableBucket); + List lakeFiles = bucketLakeFiles.getOrDefault(tableBucket, Collections.emptyList()); + boolean hasLakeData = lakeFiles != null && !lakeFiles.isEmpty(); + + FlussSplit split; + if (hasLakeData) { + split = FlussSplit.createLakeSplit( + dbName, tableName, tableId, bucketId, partitionName, + bootstrapServers, lakeFiles, lakeFormat, lakeSnapshotId); + } else { + split = new FlussSplit(dbName, tableName, tableId, bucketId, + partitionName, lakeSnapshotId, bootstrapServers, + buildFilePath(flussTable, partitionName, bucketId), 0); + } + splits.add(split); + } + return splits; + } + + private LakeSnapshot getLakeSnapshot(FlussExternalTable flussTable) { + try { + FlussExternalCatalog catalog = (FlussExternalCatalog) flussTable.getCatalog(); + Admin admin = catalog.getFlussAdmin(); + TablePath tablePath = TablePath.of(flussTable.getRemoteDbName(), flussTable.getRemoteName()); + return admin.getLatestLakeSnapshot(tablePath).get(); + } catch (Exception e) { + LOG.warn("Failed to get lake snapshot for {}.{}, will use log-only splits", + flussTable.getRemoteDbName(), flussTable.getRemoteName(), e); + return null; + } + } + + private Map> getLakeFilesPerBucket(FlussExternalTable flussTable, long lakeSnapshotId) { + Map> result = new HashMap<>(); + if (lakeSnapshotId < 0) { + return result; + } + + try { + String dbName = flussTable.getRemoteDbName(); + String tableName = flussTable.getRemoteName(); + int numBuckets = flussTable.getNumBuckets(); + long tableId = 0; + + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + TableBucket bucket = new TableBucket(tableId, null, bucketId); + List files = new ArrayList<>(); + files.add(buildLakeFilePath(flussTable, null, bucketId, lakeSnapshotId)); + result.put(bucket, files); + } + } catch (Exception e) { + LOG.warn("Failed to get lake files for table, will discover at read time", e); + } + return result; + } + + private String buildLakeFilePath(FlussExternalTable table, String partition, int bucketId, long snapshotId) { + StringBuilder path = new StringBuilder(); + // Use S3 path for lake storage (MinIO or other S3-compatible storage) + path.append("s3://fluss-lake/").append(table.getRemoteDbName()) + .append("/").append(table.getRemoteName()); + if (partition != null) { + path.append("/").append(partition); + } + path.append("/bucket-").append(bucketId) + .append("/snapshot-").append(snapshotId) + .append("/data.parquet"); + return path.toString(); + } + + private String determineLakeFormat(TableInfo tableInfo) { + try { + Map options = tableInfo.getTableConfig().toMap(); + String format = options.getOrDefault("lake.format", "parquet"); + return format.toLowerCase(); + } catch (Exception e) { + return "parquet"; + } + } + + private Long getPartitionId(Table table, String partitionName) { + try { + return null; + } catch (Exception e) { + return null; + } + } + + private long countSplitsByTier(List splits, FlussSplit.SplitTier tier) { + return splits.stream() + .filter(s -> s instanceof FlussSplit && ((FlussSplit) s).getTier() == tier) + .count(); + } + + private long getLatestSnapshotId(Table table) { + try { + TableSnapshot snapshot = table.getLatestSnapshot(); + if (snapshot != null) { + return snapshot.getSnapshotId(); + } + } catch (Exception e) { + LOG.warn("Failed to get latest snapshot, using -1", e); + } + return -1L; + } + + private List getPartitions(Table table) { + List partitions = new ArrayList<>(); + try { + List partitionNames = table.listPartitions(); + if (partitionNames != null) { + partitions.addAll(partitionNames); + } + } catch (Exception e) { + LOG.warn("Failed to list partitions, returning empty list", e); + } + return partitions; + } + + private String buildFilePath(FlussExternalTable table, String partition, int bucketId) { + StringBuilder path = new StringBuilder(); + path.append("/fluss/").append(table.getRemoteDbName()).append("/").append(table.getRemoteName()); + if (partition != null) { + path.append("/").append(partition); + } + path.append("/bucket-").append(bucketId); + return path.toString(); + } + + @Override + public void createScanRangeLocations() throws UserException { + super.createScanRangeLocations(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSource.java new file mode 100644 index 00000000000000..6786843d7690b2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSource.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.fluss.FlussExternalCatalog; +import org.apache.doris.datasource.fluss.FlussExternalTable; +import org.apache.doris.datasource.fluss.FlussUtils; + +import org.apache.fluss.client.table.Table; + +public class FlussSource { + private final FlussExternalTable targetTable; + private final FlussExternalCatalog catalog; + private Table flussTable; + + public FlussSource(TupleDescriptor desc) { + ExternalTable table = (ExternalTable) desc.getTable(); + if (!(table instanceof FlussExternalTable)) { + throw new IllegalArgumentException("Table must be FlussExternalTable"); + } + this.targetTable = (FlussExternalTable) table; + this.catalog = (FlussExternalCatalog) targetTable.getCatalog(); + } + + public FlussExternalTable getTargetTable() { + return targetTable; + } + + public FlussExternalCatalog getCatalog() { + return catalog; + } + + public Table getFlussTable() { + if (flussTable == null) { + flussTable = FlussUtils.getFlussTable(targetTable); + } + return flussTable; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSplit.java new file mode 100644 index 00000000000000..2200309010b600 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/source/FlussSplit.java @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss.source; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.TableFormatType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class FlussSplit extends FileSplit { + + public enum SplitTier { + LAKE_ONLY, // Data only in lake (Parquet/ORC files) - can be read directly + LOG_ONLY, // Data only in log (Fluss native format) - requires Fluss SDK + HYBRID // Data in both tiers - read lake first, then log + } + + private final String databaseName; + private final String tableName; + private final long tableId; + private final int bucketId; + private final String partitionName; + private final long snapshotId; + private final String bootstrapServers; + private final TableFormatType tableFormatType; + + private final SplitTier tier; + private final List lakeFilePaths; + private final String lakeFormat; + private final long lakeSnapshotId; + private final long logStartOffset; + private final long logEndOffset; + + public FlussSplit(String databaseName, String tableName, long tableId, int bucketId, + String partitionName, long snapshotId, String bootstrapServers, + String filePath, long fileSize) { + this(databaseName, tableName, tableId, bucketId, partitionName, snapshotId, + bootstrapServers, filePath, fileSize, SplitTier.LAKE_ONLY, + Collections.emptyList(), "parquet", -1, -1, -1); + } + + public FlussSplit(String databaseName, String tableName, long tableId, int bucketId, + String partitionName, long snapshotId, String bootstrapServers, + String filePath, long fileSize, SplitTier tier, + List lakeFilePaths, String lakeFormat, long lakeSnapshotId, + long logStartOffset, long logEndOffset) { + super(LocationPath.of(filePath != null ? filePath : "/fluss/" + databaseName + "/" + tableName), + 0, fileSize, fileSize, 0, null, null); + this.databaseName = databaseName; + this.tableName = tableName; + this.tableId = tableId; + this.bucketId = bucketId; + this.partitionName = partitionName; + this.snapshotId = snapshotId; + this.bootstrapServers = bootstrapServers; + this.tableFormatType = TableFormatType.FLUSS; + this.tier = tier; + this.lakeFilePaths = lakeFilePaths != null ? new ArrayList<>(lakeFilePaths) : new ArrayList<>(); + this.lakeFormat = lakeFormat != null ? lakeFormat : "parquet"; + this.lakeSnapshotId = lakeSnapshotId; + this.logStartOffset = logStartOffset; + this.logEndOffset = logEndOffset; + } + + public FlussSplit(String databaseName, String tableName, long tableId) { + this(databaseName, tableName, tableId, 0, null, -1, null, null, 0); + } + + public static FlussSplit createLakeSplit(String databaseName, String tableName, long tableId, + int bucketId, String partitionName, String bootstrapServers, + List lakeFilePaths, String lakeFormat, long lakeSnapshotId) { + String primaryPath = lakeFilePaths != null && !lakeFilePaths.isEmpty() ? lakeFilePaths.get(0) : null; + return new FlussSplit(databaseName, tableName, tableId, bucketId, partitionName, + lakeSnapshotId, bootstrapServers, primaryPath, 0, SplitTier.LAKE_ONLY, + lakeFilePaths, lakeFormat, lakeSnapshotId, -1, -1); + } + + public static FlussSplit createLogSplit(String databaseName, String tableName, long tableId, + int bucketId, String partitionName, String bootstrapServers, + long logStartOffset, long logEndOffset) { + return new FlussSplit(databaseName, tableName, tableId, bucketId, partitionName, + -1, bootstrapServers, null, 0, SplitTier.LOG_ONLY, + Collections.emptyList(), null, -1, logStartOffset, logEndOffset); + } + + public static FlussSplit createHybridSplit(String databaseName, String tableName, long tableId, + int bucketId, String partitionName, String bootstrapServers, + List lakeFilePaths, String lakeFormat, long lakeSnapshotId, + long logStartOffset, long logEndOffset) { + String primaryPath = lakeFilePaths != null && !lakeFilePaths.isEmpty() ? lakeFilePaths.get(0) : null; + return new FlussSplit(databaseName, tableName, tableId, bucketId, partitionName, + lakeSnapshotId, bootstrapServers, primaryPath, 0, SplitTier.HYBRID, + lakeFilePaths, lakeFormat, lakeSnapshotId, logStartOffset, logEndOffset); + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public long getTableId() { + return tableId; + } + + public int getBucketId() { + return bucketId; + } + + public String getPartitionName() { + return partitionName; + } + + public long getSnapshotId() { + return snapshotId; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public TableFormatType getTableFormatType() { + return tableFormatType; + } + + public SplitTier getTier() { + return tier; + } + + public List getLakeFilePaths() { + return Collections.unmodifiableList(lakeFilePaths); + } + + public String getLakeFormat() { + return lakeFormat; + } + + public long getLakeSnapshotId() { + return lakeSnapshotId; + } + + public long getLogStartOffset() { + return logStartOffset; + } + + public long getLogEndOffset() { + return logEndOffset; + } + + public boolean isLakeSplit() { + return tier == SplitTier.LAKE_ONLY || tier == SplitTier.HYBRID; + } + + public boolean isLogSplit() { + return tier == SplitTier.LOG_ONLY || tier == SplitTier.HYBRID; + } + + public boolean isHybridSplit() { + return tier == SplitTier.HYBRID; + } + + public boolean hasLakeData() { + return lakeFilePaths != null && !lakeFilePaths.isEmpty(); + } + + public boolean hasLogData() { + return logStartOffset >= 0 && (logEndOffset < 0 || logEndOffset > logStartOffset); + } + + public boolean isPartitioned() { + return partitionName != null && !partitionName.isEmpty(); + } + + @Override + public String getConsistentHashString() { + StringBuilder sb = new StringBuilder(); + sb.append(databaseName).append(".").append(tableName); + if (partitionName != null) { + sb.append(".").append(partitionName); + } + sb.append(".bucket").append(bucketId); + return sb.toString(); + } + + @Override + public String toString() { + return "FlussSplit{" + + "db='" + databaseName + '\'' + + ", table='" + tableName + '\'' + + ", tableId=" + tableId + + ", bucketId=" + bucketId + + ", partition='" + partitionName + '\'' + + ", tier=" + tier + + ", lakeFiles=" + lakeFilePaths.size() + + ", lakeSnapshotId=" + lakeSnapshotId + + ", logOffsets=[" + logStartOffset + "," + logEndOffset + "]" + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java index 7d63b18cd13ffb..c7fd2b0dee0c79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -18,10 +18,12 @@ package org.apache.doris.datasource.operations; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.fluss.FlussMetadataOps; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.fluss.client.Connection; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.catalog.Catalog; @@ -35,4 +37,8 @@ public static HiveMetadataOps newHiveMetadataOps(HiveConf hiveConf, HMSExternalC public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { return new IcebergMetadataOps(dorisCatalog, catalog); } + + public static FlussMetadataOps newFlussMetadataOps(ExternalCatalog dorisCatalog, Connection flussConnection) { + return new FlussMetadataOps(dorisCatalog, flussConnection); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 0181a91979314d..b7d05bfc32f9dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -63,6 +63,8 @@ import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.paimon.source.PaimonScanNode; +import org.apache.doris.datasource.fluss.FlussExternalTable; +import org.apache.doris.datasource.fluss.source.FlussScanNode; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; import org.apache.doris.fs.DirectoryLister; @@ -644,6 +646,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof PaimonExternalTable) { scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); + } else if (table instanceof FlussExternalTable) { + scanNode = new FlussScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof TrinoConnectorExternalTable) { scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); } else if (table instanceof MaxComputeExternalTable) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactoryTest.java new file mode 100644 index 00000000000000..8bcd239f83e48a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactoryTest.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalCatalog; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class FlussExternalCatalogFactoryTest { + + @Test + public void testCreateCatalog() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_COORDINATOR_URI, "localhost:9123"); + + ExternalCatalog catalog = FlussExternalCatalogFactory.createCatalog( + 1L, "test_catalog", null, props, "test"); + + Assert.assertNotNull(catalog); + Assert.assertTrue(catalog instanceof FlussExternalCatalog); + Assert.assertEquals("test_catalog", catalog.getName()); + } + + @Test + public void testCreateCatalogWithBootstrapServers() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + + ExternalCatalog catalog = FlussExternalCatalogFactory.createCatalog( + 2L, "test_catalog2", null, props, ""); + + Assert.assertNotNull(catalog); + Assert.assertTrue(catalog instanceof FlussExternalCatalog); + } + + @Test + public void testCreateCatalogWithAdditionalProperties() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_COORDINATOR_URI, "localhost:9123"); + props.put("fluss.client.timeout", "30000"); + props.put("fluss.client.retry", "3"); + + ExternalCatalog catalog = FlussExternalCatalogFactory.createCatalog( + 3L, "test_catalog3", null, props, "catalog with extra props"); + + Assert.assertNotNull(catalog); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogTest.java new file mode 100644 index 00000000000000..4912e9812edadb --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalCatalogTest.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalCatalog; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class FlussExternalCatalogTest { + + @Test + public void testCreateCatalogWithBootstrapServers() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + + ExternalCatalog catalog = FlussExternalCatalogFactory.createCatalog( + 1L, "test_fluss_catalog", null, props, "test catalog"); + + Assert.assertNotNull(catalog); + Assert.assertEquals("test_fluss_catalog", catalog.getName()); + Assert.assertTrue(catalog instanceof FlussExternalCatalog); + } + + @Test + public void testCreateCatalogWithMultipleServers() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "host1:9123,host2:9123,host3:9123"); + + ExternalCatalog catalog = FlussExternalCatalogFactory.createCatalog( + 1L, "test_fluss_catalog", null, props, "test catalog"); + + Assert.assertNotNull(catalog); + Assert.assertEquals("test_fluss_catalog", catalog.getName()); + } + + @Test + public void testCheckPropertiesMissingBootstrapServers() { + Map props = new HashMap<>(); + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + + try { + catalog.checkProperties(); + Assert.fail("Should throw DdlException for missing bootstrap servers"); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS)); + } + } + + @Test + public void testCheckPropertiesWithBootstrapServers() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + catalog.checkProperties(); + } + + @Test + public void testCatalogProperties() { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + props.put("fluss.client.timeout", "30000"); + + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + Assert.assertEquals("localhost:9123", + catalog.getCatalogProperty().getOrDefault(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, null)); + } + + @Test + public void testCatalogSecurityProperties() { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + props.put(FlussExternalCatalog.FLUSS_SECURITY_PROTOCOL, "SASL_PLAINTEXT"); + props.put(FlussExternalCatalog.FLUSS_SASL_MECHANISM, "PLAIN"); + props.put(FlussExternalCatalog.FLUSS_SASL_USERNAME, "user"); + props.put(FlussExternalCatalog.FLUSS_SASL_PASSWORD, "password"); + + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + Assert.assertEquals("SASL_PLAINTEXT", + catalog.getCatalogProperty().getOrDefault(FlussExternalCatalog.FLUSS_SECURITY_PROTOCOL, null)); + } + + @Test + public void testCacheTtlProperty() throws DdlException { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + props.put(FlussExternalCatalog.FLUSS_TABLE_META_CACHE_TTL_SECOND, "300"); + + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + catalog.checkProperties(); + } + + @Test + public void testInvalidCacheTtlProperty() { + Map props = new HashMap<>(); + props.put(FlussExternalCatalog.FLUSS_BOOTSTRAP_SERVERS, "localhost:9123"); + props.put(FlussExternalCatalog.FLUSS_TABLE_META_CACHE_TTL_SECOND, "-1"); + + FlussExternalCatalog catalog = new FlussExternalCatalog( + 1L, "test", null, props, ""); + try { + catalog.checkProperties(); + Assert.fail("Should throw DdlException for negative cache TTL"); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("non-negative")); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalDatabaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalDatabaseTest.java new file mode 100644 index 00000000000000..b4e1bae449a476 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalDatabaseTest.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.datasource.ExternalCatalog; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class FlussExternalDatabaseTest { + + @Mock + private FlussExternalCatalog mockCatalog; + + private FlussExternalDatabase database; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + database = new FlussExternalDatabase(mockCatalog, 1L, "test_db", "test_db"); + } + + @Test + public void testDatabaseCreation() { + Assert.assertNotNull(database); + Assert.assertEquals("test_db", database.getName()); + Assert.assertEquals(1L, database.getId()); + Assert.assertEquals(mockCatalog, database.getCatalog()); + } + + @Test + public void testBuildTableInternal() { + FlussExternalTable table = database.buildTableInternal( + "remote_table", "local_table", 1L, mockCatalog, database); + + Assert.assertNotNull(table); + Assert.assertEquals("local_table", table.getName()); + Assert.assertEquals(1L, table.getId()); + Assert.assertEquals(mockCatalog, table.getCatalog()); + Assert.assertEquals(database, table.getDb()); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalTableTest.java new file mode 100644 index 00000000000000..1061e51bda6342 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussExternalTableTest.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableType; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.List; + +public class FlussExternalTableTest { + + @Mock + private FlussExternalCatalog mockCatalog; + + @Mock + private FlussExternalDatabase mockDatabase; + + private FlussExternalTable table; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + table = new FlussExternalTable(1L, "test_table", "test_table", mockCatalog, mockDatabase); + } + + @Test + public void testTableCreation() { + Assert.assertNotNull(table); + Assert.assertEquals("test_table", table.getName()); + Assert.assertEquals(1L, table.getId()); + Assert.assertEquals(mockCatalog, table.getCatalog()); + Assert.assertEquals(mockDatabase, table.getDb()); + } + + @Test + public void testTableType() { + Assert.assertEquals(TableType.FLUSS_EXTERNAL_TABLE, table.getType()); + } + + @Test + public void testToThrift() { + FlussExternalTable spyTable = Mockito.spy(table); + Mockito.when(spyTable.getDbName()).thenReturn("test_db"); + Mockito.when(spyTable.getName()).thenReturn("test_table"); + List emptySchema = Lists.newArrayList(); + Mockito.when(spyTable.getFullSchema()).thenReturn(emptySchema); + + TTableDescriptor descriptor = spyTable.toThrift(); + Assert.assertNotNull(descriptor); + Assert.assertEquals(TTableType.FLUSS_EXTERNAL_TABLE, descriptor.getTableType()); + Assert.assertEquals("test_table", descriptor.getTableName()); + Assert.assertEquals("test_db", descriptor.getDbName()); + Assert.assertNotNull(descriptor.getFlussTable()); + } + + @Test + public void testGetRemoteDbName() { + FlussExternalTable spyTable = Mockito.spy(table); + Mockito.when(spyTable.getRemoteDbName()).thenReturn("remote_db"); + String remoteDbName = spyTable.getRemoteDbName(); + Assert.assertEquals("remote_db", remoteDbName); + } + + @Test + public void testGetRemoteName() { + FlussExternalTable spyTable = Mockito.spy(table); + Mockito.when(spyTable.getRemoteName()).thenReturn("remote_table"); + String remoteName = spyTable.getRemoteName(); + Assert.assertEquals("remote_table", remoteName); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussMetadataOpsTest.java new file mode 100644 index 00000000000000..adb26d8033a669 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussMetadataOpsTest.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.datasource.ExternalCatalog; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class FlussMetadataOpsTest { + + @Mock + private ExternalCatalog mockCatalog; + + @Mock + private Connection mockConnection; + + @Mock + private Admin mockAdmin; + + private FlussMetadataOps metadataOps; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + Mockito.when(mockConnection.getAdmin()).thenReturn(mockAdmin); + metadataOps = new FlussMetadataOps(mockCatalog, mockConnection); + } + + @Test + public void testTableExist() throws Exception { + String dbName = "test_db"; + String tblName = "test_table"; + TablePath tablePath = TablePath.of(dbName, tblName); + TableInfo tableInfo = Mockito.mock(TableInfo.class); + + CompletableFuture future = CompletableFuture.completedFuture(tableInfo); + Mockito.when(mockAdmin.getTableInfo(tablePath)).thenReturn(future); + + boolean exists = metadataOps.tableExist(dbName, tblName); + Assert.assertTrue(exists); + } + + @Test + public void testTableNotExist() throws Exception { + String dbName = "test_db"; + String tblName = "non_existent_table"; + TablePath tablePath = TablePath.of(dbName, tblName); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new TableNotExistException("Table does not exist")); + Mockito.when(mockAdmin.getTableInfo(tablePath)).thenReturn(future); + + boolean exists = metadataOps.tableExist(dbName, tblName); + Assert.assertFalse(exists); + } + + @Test + public void testListTableNames() throws Exception { + String dbName = "test_db"; + List tableNames = new ArrayList<>(); + tableNames.add("table1"); + tableNames.add("table2"); + tableNames.add("table3"); + + CompletableFuture> future = CompletableFuture.completedFuture(tableNames); + Mockito.when(mockAdmin.listTables(dbName)).thenReturn(future); + + List result = metadataOps.listTableNames(dbName); + Assert.assertEquals(3, result.size()); + Assert.assertTrue(result.contains("table1")); + Assert.assertTrue(result.contains("table2")); + Assert.assertTrue(result.contains("table3")); + } + + @Test + public void testListTableNamesEmpty() throws Exception { + String dbName = "empty_db"; + List emptyList = new ArrayList<>(); + + CompletableFuture> future = CompletableFuture.completedFuture(emptyList); + Mockito.when(mockAdmin.listTables(dbName)).thenReturn(future); + + List result = metadataOps.listTableNames(dbName); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testGetTableInfo() throws Exception { + String dbName = "test_db"; + String tblName = "test_table"; + TablePath tablePath = TablePath.of(dbName, tblName); + TableInfo tableInfo = Mockito.mock(TableInfo.class); + + CompletableFuture future = CompletableFuture.completedFuture(tableInfo); + Mockito.when(mockAdmin.getTableInfo(tablePath)).thenReturn(future); + + TableInfo result = metadataOps.getTableInfo(dbName, tblName); + Assert.assertNotNull(result); + Assert.assertEquals(tableInfo, result); + } + + @Test + public void testGetAdmin() { + Admin admin = metadataOps.getAdmin(); + Assert.assertNotNull(admin); + Assert.assertEquals(mockAdmin, admin); + } + + @Test + public void testGetConnection() { + Connection connection = metadataOps.getConnection(); + Assert.assertNotNull(connection); + Assert.assertEquals(mockConnection, connection); + } + + @Test + public void testClose() { + // Close should not throw exception + metadataOps.close(); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussUtilsTest.java new file mode 100644 index 00000000000000..2279ec8c7737f8 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/FlussUtilsTest.java @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; + +import org.junit.Assert; +import org.junit.Test; + +public class FlussUtilsTest { + + @Test + public void testPrimitiveTypes() { + Assert.assertEquals(Type.BOOLEAN, FlussUtils.flussTypeToDorisType("BOOLEAN")); + Assert.assertEquals(Type.BOOLEAN, FlussUtils.flussTypeToDorisType("BOOL")); + Assert.assertEquals(Type.TINYINT, FlussUtils.flussTypeToDorisType("TINYINT")); + Assert.assertEquals(Type.TINYINT, FlussUtils.flussTypeToDorisType("INT8")); + Assert.assertEquals(Type.SMALLINT, FlussUtils.flussTypeToDorisType("SMALLINT")); + Assert.assertEquals(Type.SMALLINT, FlussUtils.flussTypeToDorisType("INT16")); + Assert.assertEquals(Type.INT, FlussUtils.flussTypeToDorisType("INT")); + Assert.assertEquals(Type.INT, FlussUtils.flussTypeToDorisType("INT32")); + Assert.assertEquals(Type.INT, FlussUtils.flussTypeToDorisType("INTEGER")); + Assert.assertEquals(Type.BIGINT, FlussUtils.flussTypeToDorisType("BIGINT")); + Assert.assertEquals(Type.BIGINT, FlussUtils.flussTypeToDorisType("INT64")); + Assert.assertEquals(Type.FLOAT, FlussUtils.flussTypeToDorisType("FLOAT")); + Assert.assertEquals(Type.DOUBLE, FlussUtils.flussTypeToDorisType("DOUBLE")); + } + + @Test + public void testStringTypes() { + Type stringType = FlussUtils.flussTypeToDorisType("STRING"); + Assert.assertTrue(stringType.isStringType()); + + Type varcharType = FlussUtils.flussTypeToDorisType("VARCHAR(100)"); + Assert.assertTrue(varcharType.isVarchar()); + Assert.assertEquals(100, ((ScalarType) varcharType).getLength()); + + Type charType = FlussUtils.flussTypeToDorisType("CHAR(32)"); + Assert.assertTrue(charType.isVarchar()); + } + + @Test + public void testBinaryTypes() { + Type binaryType = FlussUtils.flussTypeToDorisType("BINARY"); + Assert.assertTrue(binaryType.isStringType()); + + Type bytesType = FlussUtils.flussTypeToDorisType("BYTES"); + Assert.assertTrue(bytesType.isStringType()); + } + + @Test + public void testDecimalType() { + Type decimalType = FlussUtils.flussTypeToDorisType("DECIMAL(10,2)"); + Assert.assertTrue(decimalType.isDecimalV3Type()); + Assert.assertEquals(10, ((ScalarType) decimalType).getScalarPrecision()); + Assert.assertEquals(2, ((ScalarType) decimalType).getScalarScale()); + + Type defaultDecimal = FlussUtils.flussTypeToDorisType("DECIMAL"); + Assert.assertTrue(defaultDecimal.isDecimalV3Type()); + } + + @Test + public void testDateTimeTypes() { + Type dateType = FlussUtils.flussTypeToDorisType("DATE"); + Assert.assertTrue(dateType.isDateV2Type()); + + Type timeType = FlussUtils.flussTypeToDorisType("TIME"); + Assert.assertTrue(timeType.isTime()); + + Type timestampType = FlussUtils.flussTypeToDorisType("TIMESTAMP"); + Assert.assertTrue(timestampType.isDatetimeV2()); + + Type timestampLtzType = FlussUtils.flussTypeToDorisType("TIMESTAMP_LTZ"); + Assert.assertTrue(timestampLtzType.isDatetimeV2()); + } + + @Test + public void testArrayType() { + Type arrayType = FlussUtils.flussTypeToDorisType("ARRAY"); + Assert.assertTrue(arrayType.isArrayType()); + org.apache.doris.catalog.ArrayType array = (org.apache.doris.catalog.ArrayType) arrayType; + Assert.assertEquals(Type.INT, array.getItemType()); + + Type nestedArray = FlussUtils.flussTypeToDorisType("ARRAY"); + Assert.assertTrue(nestedArray.isArrayType()); + } + + @Test + public void testMapType() { + Type mapType = FlussUtils.flussTypeToDorisType("MAP"); + Assert.assertTrue(mapType.isMapType()); + org.apache.doris.catalog.MapType map = (org.apache.doris.catalog.MapType) mapType; + Assert.assertTrue(map.getKeyType().isStringType()); + Assert.assertEquals(Type.INT, map.getValueType()); + } + + @Test + public void testUnknownTypeDefaultsToString() { + Type unknownType = FlussUtils.flussTypeToDorisType("UNKNOWN_TYPE"); + Assert.assertEquals(Type.STRING, unknownType); + } + + @Test + public void testNullAndEmptyType() { + Type nullType = FlussUtils.flussTypeToDorisType(null); + Assert.assertEquals(Type.STRING, nullType); + + Type emptyType = FlussUtils.flussTypeToDorisType(""); + Assert.assertEquals(Type.STRING, emptyType); + } + + @Test + public void testCaseInsensitive() { + Assert.assertEquals(Type.BOOLEAN, FlussUtils.flussTypeToDorisType("boolean")); + Assert.assertEquals(Type.INT, FlussUtils.flussTypeToDorisType("int")); + Assert.assertEquals(Type.BIGINT, FlussUtils.flussTypeToDorisType("bigint")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSourceTest.java new file mode 100644 index 00000000000000..18795b0ee5982c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSourceTest.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.fluss.FlussExternalCatalog; +import org.apache.doris.datasource.fluss.FlussExternalTable; + +import org.apache.fluss.client.table.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class FlussSourceTest { + + @Mock + private TupleDescriptor mockTupleDesc; + + @Mock + private FlussExternalTable mockTable; + + @Mock + private FlussExternalCatalog mockCatalog; + + @Mock + private Table mockFlussTable; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + Mockito.when(mockTupleDesc.getTable()).thenReturn(mockTable); + Mockito.when(mockTable.getCatalog()).thenReturn(mockCatalog); + Mockito.when(mockTable.getRemoteDbName()).thenReturn("test_db"); + Mockito.when(mockTable.getRemoteName()).thenReturn("test_table"); + } + + @Test + public void testFlussSourceCreation() { + FlussSource source = new FlussSource(mockTupleDesc); + Assert.assertNotNull(source); + Assert.assertEquals(mockTable, source.getTargetTable()); + Assert.assertEquals(mockCatalog, source.getCatalog()); + } + + @Test + public void testGetTargetTable() { + FlussSource source = new FlussSource(mockTupleDesc); + FlussExternalTable targetTable = source.getTargetTable(); + Assert.assertNotNull(targetTable); + Assert.assertEquals(mockTable, targetTable); + } + + @Test + public void testGetCatalog() { + FlussSource source = new FlussSource(mockTupleDesc); + FlussExternalCatalog catalog = source.getCatalog(); + Assert.assertNotNull(catalog); + Assert.assertEquals(mockCatalog, catalog); + } + + @Test(expected = IllegalArgumentException.class) + public void testFlussSourceWithNonFlussTable() { + ExternalTable nonFlussTable = Mockito.mock(ExternalTable.class); + Mockito.when(mockTupleDesc.getTable()).thenReturn(nonFlussTable); + new FlussSource(mockTupleDesc); + } +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSplitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSplitTest.java new file mode 100644 index 00000000000000..e628e712df0ef4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/fluss/source/FlussSplitTest.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss.source; + +import org.apache.doris.datasource.TableFormatType; + +import org.junit.Assert; +import org.junit.Test; + +public class FlussSplitTest { + + @Test + public void testFlussSplitCreation() { + FlussSplit split = new FlussSplit("test_db", "test_table", 123L); + + Assert.assertNotNull(split); + Assert.assertEquals("test_db", split.getDatabaseName()); + Assert.assertEquals("test_table", split.getTableName()); + Assert.assertEquals(123L, split.getTableId()); + Assert.assertEquals(TableFormatType.FLUSS, split.getTableFormatType()); + } + + @Test + public void testGetConsistentHashString() { + FlussSplit split = new FlussSplit("test_db", "test_table", 123L); + String hashString = split.getConsistentHashString(); + + Assert.assertNotNull(hashString); + Assert.assertEquals("test_db.test_table.123", hashString); + } + + @Test + public void testGetters() { + FlussSplit split = new FlussSplit("db1", "table1", 456L); + + Assert.assertEquals("db1", split.getDatabaseName()); + Assert.assertEquals("table1", split.getTableName()); + Assert.assertEquals(456L, split.getTableId()); + Assert.assertEquals(TableFormatType.FLUSS, split.getTableFormatType()); + } +} + diff --git a/fe/src/main/java/org/apache/doris/catalog/FlussTable.java b/fe/src/main/java/org/apache/doris/catalog/FlussTable.java new file mode 100644 index 00000000000000..c5507761ea2934 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/FlussTable.java @@ -0,0 +1,40 @@ + +package org.apache.doris.catalog; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class FlussTable extends ExternalTable implements Writable { + + private String flussStreamName; + + public FlussTable() { + super(TableType.FLUSS); + } + + public FlussTable(long id, String name, Map properties) { + super(id, name, TableType.FLUSS); + this.flussStreamName = properties.getOrDefault("fluss.stream", "default_stream"); + } + + public String getFlussStreamName() { + return flussStreamName; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, flussStreamName); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.flussStreamName = Text.readString(in); + } +} diff --git a/fe/src/main/java/org/apache/doris/catalog/TableType.java b/fe/src/main/java/org/apache/doris/catalog/TableType.java new file mode 100644 index 00000000000000..21045689ee7e3b --- /dev/null +++ b/fe/src/main/java/org/apache/doris/catalog/TableType.java @@ -0,0 +1,19 @@ + +package org.apache.doris.catalog; + +public enum TableType { + OLAP, + SCHEMA, + MYSQL, + OLAP_EXTERNAL, + BROKER, + ELASTICSEARCH, + HIVE, + ICEBERG, + HUDI, + JDBC, + TEST_EXTERNAL, + PAIMON, + FLUSS, // Added for Fluss integration + MAX_VALUE +} diff --git a/fluss_mvp_test.sql b/fluss_mvp_test.sql new file mode 100644 index 00000000000000..0031bc92254029 --- /dev/null +++ b/fluss_mvp_test.sql @@ -0,0 +1,24 @@ + +-- Integration test for Doris-Fluss MVP (Phase 1) + +-- 1. Create a Fluss table +-- This should succeed if the FE changes are correct. +CREATE TABLE fluss_test_table ( + `id` INT, + `data` VARCHAR(255) +) +ENGINE=Fluss +PROPERTIES ( + "fluss.stream" = "my_test_stream" +); + +-- 2. Describe the table +-- This will show that Doris has correctly created the table with the 'Fluss' type. +DESCRIBE fluss_test_table; + +-- 3. Attempt to select from the table +-- This query is expected to FAIL. We have not implemented the BE part yet. +-- The failure will prove that the FE has correctly identified the table as a Fluss +-- table and is trying to use a non-existent execution path. +-- Look for an error message like "Not implemented yet" or "Unsupported table type". +SELECT * FROM fluss_test_table LIMIT 10; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 4f5f10bbc04a3c..f6d78d8763c21c 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -469,6 +469,13 @@ struct TRemoteDorisTable { 3: optional map properties } +struct TFlussTable { + 1: optional string db_name + 2: optional string table_name + 3: optional string bootstrap_servers + 4: optional map properties +} + // "Union" of all table types. struct TTableDescriptor { 1: required Types.TTableId id @@ -496,6 +503,7 @@ struct TTableDescriptor { 23: optional TLakeSoulTable lakesoulTable 24: optional TDictionaryTable dictionaryTable 25: optional TRemoteDorisTable remoteDorisTable + 26: optional TFlussTable flussTable } struct TDescriptorTable { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 8eea8f078db367..cfcf82cc839c78 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -399,6 +399,35 @@ struct TRemoteDorisFileDesc { 6: optional string password } +enum TFlussSplitTier { + LAKE_ONLY = 0, + LOG_ONLY = 1, + HYBRID = 2 +} + +struct TFlussFileDesc { + 1: optional string database_name + 2: optional string table_name + 3: optional i64 table_id + 4: optional i32 bucket_id + 5: optional string partition_name + 6: optional i64 snapshot_id + 7: optional string file_path + 8: optional string file_format + 9: optional string bootstrap_servers + + // Tier information for tiered storage + 10: optional TFlussSplitTier tier + + // Lake tier fields (Parquet/ORC files) + 11: optional list lake_file_paths + 12: optional i64 lake_snapshot_id + + // Log tier fields (Fluss native format) + 13: optional i64 log_start_offset + 14: optional i64 log_end_offset +} + struct TTableFormatFileDesc { 1: optional string table_format_type 2: optional TIcebergFileDesc iceberg_params @@ -410,6 +439,7 @@ struct TTableFormatFileDesc { 8: optional TLakeSoulFileDesc lakesoul_params 9: optional i64 table_level_row_count = -1 10: optional TRemoteDorisFileDesc remote_doris_params + 11: optional TFlussFileDesc fluss_params } // Deprecated, hive text talbe is a special format, not a serde type diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index a3aa07c6ee0682..ca2ac233f263fc 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -648,7 +648,8 @@ enum TTableType { LAKESOUL_TABLE = 13, TRINO_CONNECTOR_TABLE = 14, DICTIONARY_TABLE = 15, - REMOTE_DORIS_TABLE = 16 + REMOTE_DORIS_TABLE = 16, + FLUSS_EXTERNAL_TABLE = 17 } enum TKeysType { diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 910fcef82b002d..e38c5eacf14887 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -235,6 +235,12 @@ extArrowFlightSqlUser = "root" extArrowFlightSqlPassword= "" extArrowFlightHttpPort= 8030 +// Fluss catalog test config +// To enable Fluss test, start docker environment first: +// cd docker/integration-test/fluss && docker-compose up -d +enableFlussTest = false +flussBootstrapServers = "localhost:9123" + // iceberg rest catalog config iceberg_rest_uri_port=18181 iceberg_minio_port=19001 diff --git a/regression-test/suites/external_table_p0/fluss/test_fluss_basic_read.groovy b/regression-test/suites/external_table_p0/fluss/test_fluss_basic_read.groovy new file mode 100644 index 00000000000000..2286d577969511 --- /dev/null +++ b/regression-test/suites/external_table_p0/fluss/test_fluss_basic_read.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_fluss_basic_read", "p0,external,fluss,external_docker") { + + String enabled = context.config.otherConfigs.get("enableFlussTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Fluss test is not enabled, skipping") + return + } + + String catalog_name = "fluss_read_catalog" + String bootstrap_servers = context.config.otherConfigs.get("flussBootstrapServers") + + if (bootstrap_servers == null || bootstrap_servers.isEmpty()) { + bootstrap_servers = "localhost:9123" + } + + // Setup catalog + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + "type" = "fluss", + "bootstrap.servers" = "${bootstrap_servers}" + ); + """ + sql """USE ${catalog_name}.test_db""" + + // ============================================ + // Test: Basic SELECT * + // ============================================ + def result1 = sql """SELECT * FROM all_types LIMIT 10""" + logger.info("SELECT * result: ${result1}") + + // ============================================ + // Test: SELECT with Column Projection + // ============================================ + def result2 = sql """SELECT id, string_col FROM all_types LIMIT 10""" + logger.info("Projected SELECT result: ${result2}") + + // ============================================ + // Test: SELECT with WHERE clause (equality) + // ============================================ + def result3 = sql """SELECT * FROM all_types WHERE id = 1""" + logger.info("WHERE id=1 result: ${result3}") + + // ============================================ + // Test: SELECT with WHERE clause (range) + // ============================================ + def result4 = sql """SELECT * FROM all_types WHERE id > 0 AND id < 100""" + logger.info("WHERE range result: ${result4}") + + // ============================================ + // Test: SELECT with ORDER BY + // ============================================ + def result5 = sql """SELECT id, string_col FROM all_types ORDER BY id LIMIT 10""" + logger.info("ORDER BY result: ${result5}") + + // Verify ordering + if (result5.size() > 1) { + for (int i = 1; i < result5.size(); i++) { + assertTrue(result5[i][0] >= result5[i-1][0], "Results should be ordered by id") + } + } + + // ============================================ + // Test: SELECT COUNT(*) + // ============================================ + def result6 = sql """SELECT COUNT(*) FROM all_types""" + logger.info("COUNT(*) result: ${result6}") + assertTrue(result6[0][0] >= 0, "Count should be non-negative") + + // ============================================ + // Test: SELECT with GROUP BY + // ============================================ + def result7 = sql """SELECT bool_col, COUNT(*) as cnt FROM all_types GROUP BY bool_col""" + logger.info("GROUP BY result: ${result7}") + + // ============================================ + // Test: SELECT from Partitioned Table + // ============================================ + def result8 = sql """SELECT * FROM partitioned_table LIMIT 10""" + logger.info("Partitioned table result: ${result8}") + + // ============================================ + // Test: SELECT from Log Table + // ============================================ + def result9 = sql """SELECT * FROM log_table LIMIT 10""" + logger.info("Log table result: ${result9}") + + // ============================================ + // Cleanup + // ============================================ + sql """DROP CATALOG IF EXISTS ${catalog_name}""" +} diff --git a/regression-test/suites/external_table_p0/fluss/test_fluss_catalog.groovy b/regression-test/suites/external_table_p0/fluss/test_fluss_catalog.groovy new file mode 100644 index 00000000000000..e863a4f17c5148 --- /dev/null +++ b/regression-test/suites/external_table_p0/fluss/test_fluss_catalog.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_fluss_catalog", "p0,external,fluss,external_docker") { + + String enabled = context.config.otherConfigs.get("enableFlussTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Fluss test is not enabled, skipping") + return + } + + String catalog_name = "fluss_test_catalog" + String bootstrap_servers = context.config.otherConfigs.get("flussBootstrapServers") + + if (bootstrap_servers == null || bootstrap_servers.isEmpty()) { + bootstrap_servers = "localhost:9123" + } + + // ============================================ + // Test: Create Fluss Catalog + // ============================================ + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + "type" = "fluss", + "bootstrap.servers" = "${bootstrap_servers}" + ); + """ + + // Verify catalog was created + def catalogs = sql """SHOW CATALOGS""" + assertTrue(catalogs.toString().contains(catalog_name), "Catalog should be created") + + // ============================================ + // Test: List Databases + // ============================================ + def databases = sql """SHOW DATABASES FROM ${catalog_name}""" + logger.info("Databases in Fluss catalog: ${databases}") + assertTrue(databases.size() > 0, "Should have at least one database") + + // ============================================ + // Test: Switch to Fluss Catalog + // ============================================ + sql """USE ${catalog_name}.test_db""" + + // ============================================ + // Test: List Tables + // ============================================ + def tables = sql """SHOW TABLES""" + logger.info("Tables in test_db: ${tables}") + assertTrue(tables.size() > 0, "Should have at least one table") + + // ============================================ + // Test: Describe Table + // ============================================ + def schema = sql """DESC all_types""" + logger.info("Schema of all_types: ${schema}") + + // Verify expected columns exist + def columnNames = schema.collect { it[0] } + assertTrue(columnNames.contains("id"), "Should have 'id' column") + assertTrue(columnNames.contains("string_col"), "Should have 'string_col' column") + + // ============================================ + // Test: Create Catalog with Invalid Properties + // ============================================ + test { + sql """ + CREATE CATALOG invalid_fluss_catalog PROPERTIES ( + "type" = "fluss" + ); + """ + exception "Missing required property" + } + + // ============================================ + // Test: Catalog Properties with Security + // ============================================ + sql """DROP CATALOG IF EXISTS secure_fluss_catalog""" + sql """ + CREATE CATALOG secure_fluss_catalog PROPERTIES ( + "type" = "fluss", + "bootstrap.servers" = "${bootstrap_servers}", + "fluss.security.protocol" = "PLAINTEXT" + ); + """ + sql """DROP CATALOG secure_fluss_catalog""" + + // ============================================ + // Cleanup + // ============================================ + sql """DROP CATALOG IF EXISTS ${catalog_name}""" +} diff --git a/regression-test/suites/external_table_p0/fluss/test_fluss_predicate_pushdown.groovy b/regression-test/suites/external_table_p0/fluss/test_fluss_predicate_pushdown.groovy new file mode 100644 index 00000000000000..0d2c2e0f5cb722 --- /dev/null +++ b/regression-test/suites/external_table_p0/fluss/test_fluss_predicate_pushdown.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_fluss_predicate_pushdown", "p0,external,fluss,external_docker") { + + String enabled = context.config.otherConfigs.get("enableFlussTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Fluss test is not enabled, skipping") + return + } + + String catalog_name = "fluss_pushdown_catalog" + String bootstrap_servers = context.config.otherConfigs.get("flussBootstrapServers") + + if (bootstrap_servers == null || bootstrap_servers.isEmpty()) { + bootstrap_servers = "localhost:9123" + } + + // Setup catalog + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + "type" = "fluss", + "bootstrap.servers" = "${bootstrap_servers}" + ); + """ + sql """USE ${catalog_name}.test_db""" + + // ============================================ + // Test: Equality Predicate Pushdown + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE id = 1""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Range Predicate Pushdown + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE id > 10 AND id < 100""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: String Predicate Pushdown + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE string_col = 'test'""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: IN Predicate Pushdown + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE id IN (1, 2, 3, 4, 5)""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: IS NULL Predicate + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE string_col IS NULL""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: IS NOT NULL Predicate + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE string_col IS NOT NULL""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Compound Predicates (AND) + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE id > 0 AND string_col IS NOT NULL""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Compound Predicates (OR) + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE id = 1 OR id = 2""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Date/Time Predicates + // ============================================ + explain { + sql """SELECT * FROM all_types WHERE date_col > '2024-01-01'""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Partition Pruning (Partitioned Table) + // ============================================ + explain { + sql """SELECT * FROM partitioned_table WHERE dt = '2024-01-01'""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Test: Column Projection + // ============================================ + explain { + sql """SELECT id, string_col FROM all_types""" + contains "FLUSS_SCAN_NODE" + } + + // ============================================ + // Cleanup + // ============================================ + sql """DROP CATALOG IF EXISTS ${catalog_name}""" +} diff --git a/regression-test/suites/external_table_p0/fluss/test_fluss_types.groovy b/regression-test/suites/external_table_p0/fluss/test_fluss_types.groovy new file mode 100644 index 00000000000000..95068ee74d6c46 --- /dev/null +++ b/regression-test/suites/external_table_p0/fluss/test_fluss_types.groovy @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_fluss_types", "p0,external,fluss,external_docker") { + + String enabled = context.config.otherConfigs.get("enableFlussTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Fluss test is not enabled, skipping") + return + } + + String catalog_name = "fluss_types_catalog" + String bootstrap_servers = context.config.otherConfigs.get("flussBootstrapServers") + + if (bootstrap_servers == null || bootstrap_servers.isEmpty()) { + bootstrap_servers = "localhost:9123" + } + + // Setup catalog + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + "type" = "fluss", + "bootstrap.servers" = "${bootstrap_servers}" + ); + """ + sql """USE ${catalog_name}.test_db""" + + // ============================================ + // Test: Boolean Type + // ============================================ + def boolResult = sql """SELECT id, bool_col FROM all_types WHERE bool_col = true LIMIT 5""" + logger.info("Boolean type result: ${boolResult}") + + // ============================================ + // Test: Integer Types (TINYINT, SMALLINT, INT, BIGINT) + // ============================================ + def intResult = sql """ + SELECT id, tinyint_col, smallint_col, int_col, bigint_col + FROM all_types + WHERE tinyint_col IS NOT NULL + LIMIT 5 + """ + logger.info("Integer types result: ${intResult}") + + // ============================================ + // Test: Floating Point Types (FLOAT, DOUBLE) + // ============================================ + def floatResult = sql """ + SELECT id, float_col, double_col + FROM all_types + WHERE float_col IS NOT NULL + LIMIT 5 + """ + logger.info("Float types result: ${floatResult}") + + // ============================================ + // Test: Decimal Type + // ============================================ + def decimalResult = sql """ + SELECT id, decimal_col + FROM all_types + WHERE decimal_col IS NOT NULL + LIMIT 5 + """ + logger.info("Decimal type result: ${decimalResult}") + + // ============================================ + // Test: String Type + // ============================================ + def stringResult = sql """ + SELECT id, string_col + FROM all_types + WHERE string_col IS NOT NULL + LIMIT 5 + """ + logger.info("String type result: ${stringResult}") + + // ============================================ + // Test: Date Type + // ============================================ + def dateResult = sql """ + SELECT id, date_col + FROM all_types + WHERE date_col IS NOT NULL + LIMIT 5 + """ + logger.info("Date type result: ${dateResult}") + + // ============================================ + // Test: Timestamp Type + // ============================================ + def timestampResult = sql """ + SELECT id, timestamp_col + FROM all_types + WHERE timestamp_col IS NOT NULL + LIMIT 5 + """ + logger.info("Timestamp type result: ${timestampResult}") + + // ============================================ + // Test: Type Casting + // ============================================ + def castResult = sql """ + SELECT + CAST(int_col AS BIGINT) as int_to_bigint, + CAST(float_col AS DOUBLE) as float_to_double, + CAST(date_col AS STRING) as date_to_string + FROM all_types + WHERE int_col IS NOT NULL + LIMIT 5 + """ + logger.info("Type casting result: ${castResult}") + + // ============================================ + // Test: Aggregation on Numeric Types + // ============================================ + def aggResult = sql """ + SELECT + SUM(int_col) as sum_int, + AVG(double_col) as avg_double, + MIN(bigint_col) as min_bigint, + MAX(float_col) as max_float + FROM all_types + """ + logger.info("Aggregation result: ${aggResult}") + + // ============================================ + // Test: Date/Time Functions + // ============================================ + def dateFunc = sql """ + SELECT + id, + date_col, + YEAR(date_col) as year_val, + MONTH(date_col) as month_val, + DAY(date_col) as day_val + FROM all_types + WHERE date_col IS NOT NULL + LIMIT 5 + """ + logger.info("Date functions result: ${dateFunc}") + + // ============================================ + // Test: Schema Type Verification + // ============================================ + def schema = sql """DESC all_types""" + + def expectedTypes = [ + "id": "INT", + "bool_col": "BOOLEAN", + "tinyint_col": "TINYINT", + "smallint_col": "SMALLINT", + "int_col": "INT", + "bigint_col": "BIGINT", + "float_col": "FLOAT", + "double_col": "DOUBLE", + "string_col": "TEXT" + ] + + for (row in schema) { + String colName = row[0] + String colType = row[1] + if (expectedTypes.containsKey(colName)) { + logger.info("Column ${colName}: expected contains ${expectedTypes[colName]}, got ${colType}") + } + } + + // ============================================ + // Cleanup + // ============================================ + sql """DROP CATALOG IF EXISTS ${catalog_name}""" +} diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 948415feaf393e..9289427e4fbdcd 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -153,7 +153,7 @@ if [[ "${CLEAN}" -eq 1 ]] && [[ -d "${TP_SOURCE_DIR}" ]]; then fi # Download thirdparties. -eval "${TP_DIR}/download-thirdparty.sh ${packages[*]}" +bash "${TP_DIR}/download-thirdparty.sh" ${packages[*]} export LD_LIBRARY_PATH="${TP_DIR}/installed/lib:${LD_LIBRARY_PATH}" diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index 2334b1b9b23011..c80360c26806b1 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information