Skip to content

Commit 0c5a597

Browse files
committed
refactor(be): Phase 1 - Extract JniDataBridge utility class from JniConnector
Extract all static data exchange methods from JniConnector into a new JniDataBridge utility class. JniDataBridge is a pure stateless utility for C++ Block <-> JNI shared memory data exchange, data-source agnostic. Moved to JniDataBridge: - TableMetaAddress helper class - fill_block / fill_column (Java -> C++ Block) - to_java_table / parse_table_schema (C++ Block -> Java) - get_jni_type / get_jni_type_with_different_string (type mapping) - All private _fill_*_column helpers and templates JniConnector retains backward-compatible inline wrappers that delegate to JniDataBridge, so all existing callers (vjdbc_connector, vjni_format_transformer, UDF functions, 6 JNI readers) continue to work without changes. This is Phase 1 of the JNI read/write architecture refactoring plan. refactor(be/fe): Phase 2 - Unify JDBC writer with VJniFormatTransformer Refactor VJdbcTableWriter to use VJniFormatTransformer instead of inheriting from JdbcConnector. This follows the same pattern as VMCPartitionWriter for MaxCompute writes. C++ changes: - VJdbcTableWriter no longer inherits JdbcConnector; holds a VJniFormatTransformer instance instead - open() creates VJniFormatTransformer with JdbcJniWriter class name - write() delegates to VJniFormatTransformer::write() - Build writer params from TDataSink instead of JdbcConnectorParam - VJniFormatTransformer now uses JniDataBridge directly Java changes: - New JdbcJniWriter class extending JniWriter - Handles JDBC connection pool, PreparedStatement batch inserts, and transaction control (openTrans/commitTrans/rollbackTrans) - insertColumn/insertNullColumn logic adapted from BaseJdbcExecutor - Independent class, does not modify existing BaseJdbcExecutor This is Phase 2 of the JNI read/write architecture refactoring plan. refactor(be/fe): Phase 3 - JDBC reader integration via JniReader (transitional) Integrate JDBC reading into the unified JniReader/GenericReader framework while keeping the existing JdbcScanner as a transitional pipeline entry point (no FE changes required). C++ changes: - New JdbcJniReader (extends JniReader) delegates to JdbcJniScanner on the Java side via JniConnector, following the PaimonJniReader pattern - JdbcScanner rewritten to delegate to JdbcJniReader instead of JdbcConnector. Builds JDBC params from TupleDescriptor and passes them to JdbcJniReader. Old JdbcConnector/JdbcConnectorParam removed. - No FE changes needed: JDBCScanOperatorX still creates JdbcScanner using TJdbcScanNode params Java changes: - New JdbcJniScanner (extends JniScanner) manages its own connection pool, PreparedStatement, ResultSet lifecycle - Reads column values from ResultSet into VectorTable using the standard JniScanner getNext/getNextBatchMeta protocol - Independent of BaseJdbcExecutor, uses JdbcDataSource for pool mgmt This is Phase 3 of the JNI read/write architecture refactoring plan. refactor(be): Phase 4 - Strip JdbcConnector to utility-only class Remove all read/write functionality from JdbcConnector, reducing it from ~690 lines to ~200 lines. It now only serves as a utility for: - test_connection() (used by PInternalService::test_jdbc_connection) - clean_datasource() (used after test_connection) Removed: - TableConnector inheritance (no longer needed) - All read methods: query(), get_next(), _get_reader_params(), _cast_string_to_hll/bitmap/json() - All write methods: append(), exec_stmt_write(), begin_trans(), abort_trans(), finish_trans() - JdbcStatistic struct (was only used by old JdbcScanner) - All JNI method IDs for read/write operations - Data type casting maps and related member variables - TupleDescriptor dependency (not needed for test_connection) Updated: - internal_service.cpp: removed use_transaction param (no longer exists) JDBC read now flows: JdbcScanner -> JdbcJniReader -> JniConnector -> JdbcJniScanner JDBC write now flows: VJdbcTableWriter -> VJniFormatTransformer -> JdbcJniWriter This completes Phase 4 of the JNI read/write architecture refactoring. refactor(be/fe): Remove JdbcConnector entirely Replace JdbcConnector with JniConnector + JdbcConnectionTester for the last remaining use case (test_jdbc_connection in internal_service). Changes: - New JdbcConnectionTester.java (extends JniScanner): lightweight connection tester that verifies connectivity in open() - internal_service.cpp: test_jdbc_connection now uses JniConnector with JdbcConnectionTester instead of JdbcConnector - Extracted _resolve_jdbc_driver_url() as standalone function for driver URL resolution (was inside JdbcConnector) - Deleted vjdbc_connector.h and vjdbc_connector.cpp entirely JdbcConnector, JdbcConnectorParam, and all JNI method registrations for the old BaseJdbcExecutor path are now gone from the codebase. refactor(fe): Mark legacy BaseJdbcExecutor hierarchy as @deprecated After JdbcConnector removal, BaseJdbcExecutor and JdbcExecutorFactory are no longer called from any C++ code. They are retained temporarily because JdbcJniScanner does not yet implement all database-specific type conversions from the executor subclasses (MySQL, Oracle, PG, etc.). Added @deprecated annotations pointing to the new replacements: - Read: JdbcJniScanner (extends JniScanner) - Write: JdbcJniWriter (extends JniWriter) - Connection test: JdbcConnectionTester (extends JniScanner) refactor(fe/be): Add JdbcTypeHandler strategy for database-specific type handling Introduce the JdbcTypeHandler strategy pattern to replace the monolithic BaseJdbcExecutor class hierarchy with pluggable, per-database type handlers. New files (Java): - JdbcTypeHandler: strategy interface with 6 extension points (getColumnValue, getOutputConverter, setValidationQuery, initializeStatement, abortReadConnection, setSystemProperties) - DefaultTypeHandler: generic JDBC type reading + shared utilities - JdbcTypeHandlerFactory: maps table_type string to handler - 9 database-specific handlers: MySQLTypeHandler, OracleTypeHandler, PostgreSQLTypeHandler, ClickHouseTypeHandler, SQLServerTypeHandler, DB2TypeHandler, SapHanaTypeHandler, TrinoTypeHandler, GbaseTypeHandler Modified files: - JdbcJniScanner: refactored to delegate to JdbcTypeHandler instead of hardcoded getColumnValue(). Also initializes per-column output converters once in open(). - jdbc_scanner.h: added _odbc_table_type_to_string() helper - jdbc_scanner.cpp: passes table_type param to JdbcJniScanner Key design decisions: - Type handling logic ported 1:1 from BaseJdbcExecutor subclasses - MySQLTypeHandler uses Integer.MIN_VALUE fetchSize for streaming - MySQL/SQLServer handlers abort connection on incomplete reads - Oracle/DB2/SapHana have database-specific validation queries - Output converters cached per-column for performance refactor(fe/be): Integrate JDBC into FileScanner path (FileQueryScanNode) Migrate JdbcScanNode from ExternalScanNode to FileQueryScanNode, unifying JDBC reads with the same framework used by Paimon/Hudi/Iceberg/MaxCompute. Architecture change: Before: JdbcScanNode(ExternalScanNode) → JDBC_SCAN_NODE → JDBCScanOperatorX After: JdbcScanNode(FileQueryScanNode) → FILE_SCAN_NODE → FileScanOperatorX FE changes: - JdbcScanNode: extends FileQueryScanNode instead of ExternalScanNode - Implements getSplits() returning single JdbcSplit - Implements setScanParams() → TTableFormatFileDesc with jdbc_params - getFileFormatType() returns FORMAT_JNI - getPathPartitionKeys()/getLocationProperties() return empty - Preserves all predicate push-down logic unchanged - New JdbcSplit extends FileSplit with JDBC connection parameters - PhysicalPlanTranslator: updated visitPhysicalJdbcScan with proper TableRefInfo setup for FileQueryScanNode compatibility Thrift changes: - TTableFormatFileDesc: added field 11 'jdbc_params' (map<string,string>) BE changes: - file_scanner.cpp: added 'jdbc' branch in FORMAT_JNI reader creation - Extracts jdbc_params from TTableFormatFileDesc - Creates JdbcJniReader with params map - Added jdbc_jni_reader.h include format [refactor](be) Merge JniConnector into JniReader and remove Avro/LakeSoul - Merge all JniConnector instance methods into JniReader base class (open/get_next_block/close/fill_block/get_table_schema/get_statistics) - Update all 6 JniReader subclasses to pass connector_class/params/column_names to JniReader base constructor instead of creating JniConnector - Move JniConnector static method callers to JniDataBridge (UDF/UDAF/UDTF: to_java_table, parse_table_schema, fill_block) - Adapt internal_service.cpp JDBC connection test to use JniReader - Remove Avro JNI reader (avro_jni_reader.h/cpp) - Remove LakeSoul JNI reader (lakesoul_jni_reader.h/cpp) - Delete jni_connector.h/cpp entirely Files deleted: be/src/vec/exec/jni_connector.h be/src/vec/exec/jni_connector.cpp be/src/vec/exec/format/avro/avro_jni_reader.h be/src/vec/exec/format/avro/avro_jni_reader.cpp be/src/vec/exec/format/table/lakesoul_jni_reader.h be/src/vec/exec/format/table/lakesoul_jni_reader.cpp format 2 fix compile compile bug fix compile fix compile fix compile fix compile format fix review fix 1 fix 2 fix 3 format fix mysql array fix jdbc 2 oracle test sql format format 2 deprecated fix ut 2 support doris special type format pg date fix jdbc table fix oracle timestamp tz fix oracle sqlserver test fix pg binary
1 parent 10b052f commit 0c5a597

File tree

67 files changed

+4591
-2679
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+4591
-2679
lines changed

be/src/service/internal_service.cpp

Lines changed: 90 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
#include <stdint.h>
4242
#include <sys/stat.h>
4343
#include <vec/data_types/data_type.h>
44-
#include <vec/exec/vjdbc_connector.h>
44+
#include <vec/exec/format/jni_reader.h>
4545
#include <vec/sink/varrow_flight_result_writer.h>
4646

4747
#include <algorithm>
@@ -107,6 +107,7 @@
107107
#include "util/brpc_client_cache.h"
108108
#include "util/brpc_closure.h"
109109
#include "util/doris_metrics.h"
110+
#include "util/jdbc_utils.h"
110111
#include "util/md5.h"
111112
#include "util/metrics.h"
112113
#include "util/network_util.h"
@@ -119,7 +120,6 @@
119120
#include "util/uid_util.h"
120121
#include "vec/common/variant_util.h"
121122
#include "vec/core/block.h"
122-
#include "vec/exec/format/avro//avro_jni_reader.h"
123123
#include "vec/exec/format/csv/csv_reader.h"
124124
#include "vec/exec/format/generic_reader.h"
125125
#include "vec/exec/format/json/new_json_reader.h"
@@ -867,11 +867,7 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
867867
file_slots, io_ctx.get(), io_ctx);
868868
break;
869869
}
870-
case TFileFormatType::FORMAT_AVRO: {
871-
reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range,
872-
file_slots);
873-
break;
874-
}
870+
875871
default:
876872
st = Status::InternalError("Not supported file format in fetch table schema: {}",
877873
params.format_type);
@@ -991,7 +987,6 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con
991987
fmt::format("InternalService::test_jdbc_connection"));
992988
SCOPED_ATTACH_TASK(mem_tracker);
993989
TTableDescriptor table_desc;
994-
vectorized::JdbcConnectorParam jdbc_param;
995990
Status st = Status::OK();
996991
{
997992
const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
@@ -1004,35 +999,96 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con
1004999
}
10051000
}
10061001
TJdbcTable jdbc_table = (table_desc.jdbcTable);
1007-
jdbc_param.catalog_id = jdbc_table.catalog_id;
1008-
jdbc_param.driver_class = jdbc_table.jdbc_driver_class;
1009-
jdbc_param.driver_path = jdbc_table.jdbc_driver_url;
1010-
jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum;
1011-
jdbc_param.jdbc_url = jdbc_table.jdbc_url;
1012-
jdbc_param.user = jdbc_table.jdbc_user;
1013-
jdbc_param.passwd = jdbc_table.jdbc_password;
1014-
jdbc_param.query_string = request->query_str();
1015-
jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type());
1016-
jdbc_param.use_transaction = false;
1017-
jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size;
1018-
jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size;
1019-
jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time;
1020-
jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time;
1021-
jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive;
1022-
1023-
std::unique_ptr<vectorized::JdbcConnector> jdbc_connector;
1024-
jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param));
1025-
1026-
st = jdbc_connector->test_connection();
1027-
st.to_protobuf(result->mutable_status());
10281002

1029-
Status clean_st = jdbc_connector->clean_datasource();
1030-
if (!clean_st.ok()) {
1031-
LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg();
1003+
// Resolve driver URL to absolute file:// path
1004+
std::string driver_url;
1005+
st = JdbcUtils::resolve_driver_url(jdbc_table.jdbc_driver_url, &driver_url);
1006+
if (!st.ok()) {
1007+
st.to_protobuf(result->mutable_status());
1008+
return;
1009+
}
1010+
1011+
// Build params for JdbcConnectionTester
1012+
std::map<std::string, std::string> params;
1013+
params["jdbc_url"] = jdbc_table.jdbc_url;
1014+
params["jdbc_user"] = jdbc_table.jdbc_user;
1015+
params["jdbc_password"] = jdbc_table.jdbc_password;
1016+
params["jdbc_driver_class"] = jdbc_table.jdbc_driver_class;
1017+
params["jdbc_driver_url"] = driver_url;
1018+
params["query_sql"] = request->query_str();
1019+
params["catalog_id"] = std::to_string(jdbc_table.catalog_id);
1020+
params["connection_pool_min_size"] = std::to_string(jdbc_table.connection_pool_min_size);
1021+
params["connection_pool_max_size"] = std::to_string(jdbc_table.connection_pool_max_size);
1022+
params["connection_pool_max_wait_time"] =
1023+
std::to_string(jdbc_table.connection_pool_max_wait_time);
1024+
params["connection_pool_max_life_time"] =
1025+
std::to_string(jdbc_table.connection_pool_max_life_time);
1026+
params["connection_pool_keep_alive"] =
1027+
jdbc_table.connection_pool_keep_alive ? "true" : "false";
1028+
params["clean_datasource"] = "true";
1029+
// Map jdbc_table_type (TOdbcTableType enum value) to string name
1030+
// for JdbcTypeHandlerFactory to select the correct type handler.
1031+
// This ensures the right validation query is used (e.g. Oracle: "SELECT 1 FROM dual").
1032+
if (request->has_jdbc_table_type()) {
1033+
std::string type_name;
1034+
switch (request->jdbc_table_type()) {
1035+
case 0:
1036+
type_name = "MYSQL";
1037+
break;
1038+
case 1:
1039+
type_name = "ORACLE";
1040+
break;
1041+
case 2:
1042+
type_name = "POSTGRESQL";
1043+
break;
1044+
case 3:
1045+
type_name = "SQLSERVER";
1046+
break;
1047+
case 6:
1048+
type_name = "CLICKHOUSE";
1049+
break;
1050+
case 7:
1051+
type_name = "SAP_HANA";
1052+
break;
1053+
case 8:
1054+
type_name = "TRINO";
1055+
break;
1056+
case 9:
1057+
type_name = "PRESTO";
1058+
break;
1059+
case 10:
1060+
type_name = "OCEANBASE";
1061+
break;
1062+
case 11:
1063+
type_name = "OCEANBASE_ORACLE";
1064+
break;
1065+
case 13:
1066+
type_name = "DB2";
1067+
break;
1068+
case 14:
1069+
type_name = "GBASE";
1070+
break;
1071+
default:
1072+
break;
1073+
}
1074+
if (!type_name.empty()) {
1075+
params["table_type"] = type_name;
1076+
}
10321077
}
1033-
Status close_st = jdbc_connector->close();
1078+
// required_fields and columns_types are required by JniReader
1079+
params["required_fields"] = "result";
1080+
params["columns_types"] = "int";
1081+
1082+
// Use JniReader to create JdbcConnectionTester, which tests
1083+
// the connection in its open() method.
1084+
auto jni_reader = std::make_unique<vectorized::JniReader>(
1085+
"org/apache/doris/jdbc/JdbcConnectionTester", params);
1086+
st = jni_reader->open(nullptr, nullptr);
1087+
st.to_protobuf(result->mutable_status());
1088+
1089+
Status close_st = jni_reader->close();
10341090
if (!close_st.ok()) {
1035-
LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg();
1091+
LOG(WARNING) << "Failed to close JDBC connection tester: " << close_st.msg();
10361092
}
10371093
});
10381094

be/src/util/jdbc_utils.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "util/jdbc_utils.h"
19+
20+
#include <filesystem>
21+
22+
#include "common/config.h"
23+
24+
namespace doris {
25+
26+
Status JdbcUtils::resolve_driver_url(const std::string& url, std::string* result_url) {
27+
// Already a full URL (e.g. "file:///path/to/driver.jar" or "hdfs://...")
28+
if (url.find(":/") != std::string::npos) {
29+
*result_url = url;
30+
return Status::OK();
31+
}
32+
33+
const char* doris_home = std::getenv("DORIS_HOME");
34+
if (doris_home == nullptr) {
35+
return Status::InternalError("DORIS_HOME environment variable is not set");
36+
}
37+
38+
std::string default_url = std::string(doris_home) + "/plugins/jdbc_drivers";
39+
std::string default_old_url = std::string(doris_home) + "/jdbc_drivers";
40+
41+
if (config::jdbc_drivers_dir == default_url) {
42+
std::string target_path = default_url + "/" + url;
43+
std::string old_target_path = default_old_url + "/" + url;
44+
if (std::filesystem::exists(target_path)) {
45+
*result_url = "file://" + target_path;
46+
} else if (std::filesystem::exists(old_target_path)) {
47+
*result_url = "file://" + old_target_path;
48+
} else {
49+
return Status::InternalError("JDBC driver file does not exist: " + url);
50+
}
51+
} else {
52+
*result_url = "file://" + config::jdbc_drivers_dir + "/" + url;
53+
}
54+
return Status::OK();
55+
}
56+
57+
} // namespace doris

be/src/util/jdbc_utils.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <string>
21+
22+
#include "common/status.h"
23+
24+
namespace doris {
25+
26+
/**
27+
* Utility functions for JDBC driver management.
28+
*/
29+
class JdbcUtils {
30+
public:
31+
/**
32+
* Resolve a JDBC driver URL to an absolute file:// URL.
33+
*
34+
* FE sends just the JAR filename (e.g. "mysql-connector-java-8.0.25.jar").
35+
* This method resolves it to a full file:// URL by searching in the
36+
* configured jdbc_drivers_dir (or the default DORIS_HOME/plugins/jdbc_drivers).
37+
*
38+
* If the URL already contains ":/", it is assumed to be a full URL and
39+
* returned as-is.
40+
*
41+
* @param url The driver URL from FE (may be just a filename)
42+
* @param result_url Output: the resolved file:// URL
43+
* @return Status::OK on success, or InternalError if the file is not found
44+
*/
45+
static Status resolve_driver_url(const std::string& url, std::string* result_url);
46+
};
47+
48+
} // namespace doris

be/src/vec/aggregate_functions/aggregate_function_java_udaf.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
#include "vec/common/string_ref.h"
3939
#include "vec/core/field.h"
4040
#include "vec/core/types.h"
41-
#include "vec/exec/jni_connector.h"
41+
#include "vec/exec/jni_data_bridge.h"
4242
#include "vec/io/io_helper.h"
4343

4444
namespace doris::vectorized {
@@ -110,8 +110,8 @@ struct AggregateJavaUdafData {
110110
std::to_string(i)));
111111
}
112112
std::unique_ptr<long[]> input_table;
113-
RETURN_IF_ERROR(JniConnector::to_java_table(&input_block, input_table));
114-
auto input_table_schema = JniConnector::parse_table_schema(&input_block);
113+
RETURN_IF_ERROR(JniDataBridge::to_java_table(&input_block, input_table));
114+
auto input_table_schema = JniDataBridge::parse_table_schema(&input_block);
115115
std::map<String, String> input_params = {
116116
{"meta_address", std::to_string((long)input_table.get())},
117117
{"required_fields", input_table_schema.first},
@@ -190,7 +190,7 @@ struct AggregateJavaUdafData {
190190

191191
Block output_block;
192192
output_block.insert(ColumnWithTypeAndName(to.get_ptr(), result_type, "_result_"));
193-
auto output_table_schema = JniConnector::parse_table_schema(&output_block);
193+
auto output_table_schema = JniDataBridge::parse_table_schema(&output_block);
194194
std::string output_nullable = result_type->is_nullable() ? "true" : "false";
195195
std::map<String, String> output_params = {{"is_nullable", output_nullable},
196196
{"required_fields", output_table_schema.first},
@@ -205,7 +205,7 @@ struct AggregateJavaUdafData {
205205
.with_arg(output_map)
206206
.call(&output_address));
207207

208-
return JniConnector::fill_block(&output_block, {0}, output_address);
208+
return JniDataBridge::fill_block(&output_block, {0}, output_address);
209209
}
210210

211211
private:

0 commit comments

Comments
 (0)