Skip to content

Commit 5ca8e8d

Browse files
author
yangshijie
committed
[Feat](opt) Optimize Python UDF/UDAF/UDTF for Doris
1 parent e2c4976 commit 5ca8e8d

File tree

92 files changed

+1707
-2863
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+1707
-2863
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,14 +1094,9 @@ DEFINE_String(python_conda_root_path, "");
10941094
DEFINE_String(python_venv_root_path, "${DORIS_HOME}/lib/udf/python");
10951095
// python interpreter paths used by venv, e.g. /usr/bin/python3.7:/usr/bin/python3.6
10961096
DEFINE_String(python_venv_interpreter_paths, "");
1097-
// python deps index url
1098-
DEFINE_String(python_deps_index_url, "https://pypi.org/simple/");
1099-
// min number of python process
1100-
DEFINE_Int32(min_python_process_nums, "16");
1101-
// max number of python process
1102-
DEFINE_Int32(max_python_process_nums, "256");
1103-
// timeout in milliseconds when waiting for available python process
1104-
DEFINE_Int32(python_process_pool_wait_timeout_ms, "30000");
1097+
// max python processes in global shared pool, each version can have up to this many processes
1098+
// 0 means use CPU core count as default, otherwise use the specified value
1099+
DEFINE_mInt32(max_python_process_num, "0");
11051100

11061101
// Set config randomly to check more issues in github workflow
11071102
DEFINE_Bool(enable_fuzzy_mode, "false");

be/src/common/config.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,14 +1141,8 @@ DECLARE_String(python_conda_root_path);
11411141
DECLARE_String(python_venv_root_path);
11421142
// python interpreter paths used by venv, e.g. /usr/bin/python3.7:/usr/bin/python3.6
11431143
DECLARE_String(python_venv_interpreter_paths);
1144-
// python deps index url
1145-
DECLARE_String(python_deps_index_url);
1146-
// min number of python process
1147-
DECLARE_Int32(min_python_process_nums);
1148-
// max number of python process
1149-
DECLARE_Int32(max_python_process_nums);
1150-
// timeout in milliseconds when waiting for available python process
1151-
DECLARE_Int32(python_process_pool_wait_timeout_ms);
1144+
// max python processes in global shared pool, each version can have up to this many processes
1145+
DECLARE_mInt32(max_python_process_num);
11521146

11531147
// Set config randomly to check more issues in github workflow
11541148
DECLARE_Bool(enable_fuzzy_mode);

be/src/udf/python/python_client.cpp

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "arrow/flight/client.h"
2121
#include "arrow/flight/server.h"
2222
#include "common/compiler_util.h"
23+
#include "common/config.h"
2324
#include "common/status.h"
2425
#include "udf/python/python_udf_meta.h"
2526
#include "udf/python/python_udf_runtime.h"
@@ -86,11 +87,7 @@ Status PythonClient::close() {
8687
_arrow_client.reset();
8788
_writer.reset();
8889
_reader.reset();
89-
90-
// Return process to pool if available
91-
if (auto* pool = _process->pool(); pool) {
92-
pool->return_process(std::move(_process));
93-
}
90+
_process.reset();
9491

9592
return Status::OK();
9693
}
@@ -101,7 +98,6 @@ Status PythonClient::handle_error(arrow::Status status) {
10198
// Clean up resources
10299
_writer.reset();
103100
_reader.reset();
104-
_process->shutdown();
105101

106102
// Extract and clean error message
107103
std::string msg = status.message();
@@ -116,13 +112,6 @@ Status PythonClient::handle_error(arrow::Status status) {
116112
return Status::RuntimeError(trim(msg));
117113
}
118114

119-
Status PythonClient::check_process_alive() const {
120-
if (UNLIKELY(!_process->is_alive())) {
121-
return Status::RuntimeError("{} process is not alive", _operation_name);
122-
}
123-
return Status::OK();
124-
}
125-
126115
Status PythonClient::begin_stream(const std::shared_ptr<arrow::Schema>& schema) {
127116
if (UNLIKELY(!_begin)) {
128117
auto begin_res = _writer->Begin(schema);
@@ -150,7 +139,6 @@ Status PythonClient::read_batch(std::shared_ptr<arrow::RecordBatch>* output) {
150139

151140
arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
152141
if (!chunk.data) {
153-
_process->shutdown();
154142
return Status::InternalError("Received null RecordBatch from {} server", _operation_name);
155143
}
156144

be/src/udf/python/python_client.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ class PythonClient {
7171
* Get process information for debugging
7272
* @return Process string representation
7373
*/
74-
std::string print_process() const { return _process->to_string(); }
74+
std::string print_process() const { return _process ? _process->to_string() : "null"; }
7575

76-
protected:
7776
/**
78-
* Check if process is alive
79-
* @return Status
77+
* Get the underlying Python process
78+
* @return Process pointer
8079
*/
81-
Status check_process_alive() const;
80+
ProcessPtr get_process() const { return _process; }
8281

82+
protected:
8383
/**
8484
* Begin Flight stream with schema (called only once per stream)
8585
* @param schema Input schema

be/src/udf/python/python_env.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ Status PythonVersionManager::init(PythonEnvType env_type, const fs::path& python
283283
std::vector<PythonVersion> versions;
284284
RETURN_IF_ERROR(_env_scanner->scan());
285285
RETURN_IF_ERROR(_env_scanner->get_versions(&versions));
286-
RETURN_IF_ERROR(PythonServerManager::instance().init(versions));
287286
return Status::OK();
288287
}
289288

0 commit comments

Comments
 (0)