Skip to content

Commit e2c4976

Browse files
sjyangoyangshijie
authored andcommitted
[Feat](udtf) Support Python UDTF for Doris (#58348)
1 parent 1886274 commit e2c4976

File tree

66 files changed

+12892
-422
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+12892
-422
lines changed

be/src/runtime/exec_env_init.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
#include "service/backend_options.h"
103103
#include "service/backend_service.h"
104104
#include "service/point_query_executor.h"
105-
#include "udf/python/python_udf_server.h"
105+
#include "udf/python/python_server.h"
106106
#include "util/bfd_parser.h"
107107
#include "util/bit_util.h"
108108
#include "util/brpc_client_cache.h"
@@ -927,7 +927,7 @@ void ExecEnv::destroy() {
927927
_s_tracking_memory = false;
928928

929929
clear_storage_resource();
930-
PythonUDFServerManager::instance().shutdown();
930+
PythonServerManager::instance().shutdown();
931931
LOG(INFO) << "Doris exec envorinment is destoried.";
932932
}
933933

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "udf/python/python_client.h"
19+
20+
#include "arrow/flight/client.h"
21+
#include "arrow/flight/server.h"
22+
#include "common/compiler_util.h"
23+
#include "common/status.h"
24+
#include "udf/python/python_udf_meta.h"
25+
#include "udf/python/python_udf_runtime.h"
26+
#include "util/arrow/utils.h"
27+
28+
namespace doris {
29+
30+
Status PythonClient::init(const PythonUDFMeta& func_meta, ProcessPtr process) {
31+
if (_inited) {
32+
return Status::InternalError("PythonClient has already been initialized");
33+
}
34+
35+
// Set operation name based on client type
36+
switch (func_meta.client_type) {
37+
case PythonClientType::UDF:
38+
_operation_name = "Python UDF";
39+
break;
40+
case PythonClientType::UDAF:
41+
_operation_name = "Python UDAF";
42+
break;
43+
case PythonClientType::UDTF:
44+
_operation_name = "Python UDTF";
45+
break;
46+
default:
47+
return Status::InternalError("Invalid Python client type");
48+
}
49+
50+
// Parse and connect to Python server location
51+
arrow::flight::Location location;
52+
RETURN_DORIS_STATUS_IF_RESULT_ERROR(location,
53+
arrow::flight::Location::Parse(process->get_uri()));
54+
RETURN_DORIS_STATUS_IF_RESULT_ERROR(_arrow_client, FlightClient::Connect(location));
55+
56+
// Serialize function metadata to JSON command
57+
std::string command;
58+
RETURN_IF_ERROR(func_meta.serialize_to_json(&command));
59+
60+
// Create Flight descriptor and establish bidirectional streaming
61+
FlightDescriptor descriptor = FlightDescriptor::Command(command);
62+
arrow::flight::FlightClient::DoExchangeResult exchange_res;
63+
RETURN_DORIS_STATUS_IF_RESULT_ERROR(exchange_res, _arrow_client->DoExchange(descriptor));
64+
65+
_reader = std::move(exchange_res.reader);
66+
_writer = std::move(exchange_res.writer);
67+
_process = std::move(process);
68+
_inited = true;
69+
70+
return Status::OK();
71+
}
72+
73+
Status PythonClient::close() {
74+
if (!_inited || !_writer) {
75+
return Status::OK();
76+
}
77+
78+
auto writer_res = _writer->Close();
79+
if (!writer_res.ok()) {
80+
// Don't propagate error from close, just log it
81+
LOG(WARNING) << "Error closing Python client writer: " << writer_res.message();
82+
}
83+
84+
_inited = false;
85+
_begin = false;
86+
_arrow_client.reset();
87+
_writer.reset();
88+
_reader.reset();
89+
90+
// Return process to pool if available
91+
if (auto* pool = _process->pool(); pool) {
92+
pool->return_process(std::move(_process));
93+
}
94+
95+
return Status::OK();
96+
}
97+
98+
Status PythonClient::handle_error(arrow::Status status) {
99+
DCHECK(!status.ok());
100+
101+
// Clean up resources
102+
_writer.reset();
103+
_reader.reset();
104+
_process->shutdown();
105+
106+
// Extract and clean error message
107+
std::string msg = status.message();
108+
LOG(ERROR) << _operation_name << " error: " << msg;
109+
110+
// Remove Python traceback noise for cleaner error messages
111+
size_t pos = msg.find("The above exception was the direct cause");
112+
if (pos != std::string::npos) {
113+
msg = msg.substr(0, pos);
114+
}
115+
116+
return Status::RuntimeError(trim(msg));
117+
}
118+
119+
Status PythonClient::check_process_alive() const {
120+
if (UNLIKELY(!_process->is_alive())) {
121+
return Status::RuntimeError("{} process is not alive", _operation_name);
122+
}
123+
return Status::OK();
124+
}
125+
126+
Status PythonClient::begin_stream(const std::shared_ptr<arrow::Schema>& schema) {
127+
if (UNLIKELY(!_begin)) {
128+
auto begin_res = _writer->Begin(schema);
129+
if (!begin_res.ok()) {
130+
return handle_error(begin_res);
131+
}
132+
_begin = true;
133+
}
134+
return Status::OK();
135+
}
136+
137+
Status PythonClient::write_batch(const arrow::RecordBatch& input) {
138+
auto write_res = _writer->WriteRecordBatch(input);
139+
if (!write_res.ok()) {
140+
return handle_error(write_res);
141+
}
142+
return Status::OK();
143+
}
144+
145+
Status PythonClient::read_batch(std::shared_ptr<arrow::RecordBatch>* output) {
146+
auto read_res = _reader->Next();
147+
if (!read_res.ok()) {
148+
return handle_error(read_res.status());
149+
}
150+
151+
arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
152+
if (!chunk.data) {
153+
_process->shutdown();
154+
return Status::InternalError("Received null RecordBatch from {} server", _operation_name);
155+
}
156+
157+
*output = std::move(chunk.data);
158+
return Status::OK();
159+
}
160+
161+
} // namespace doris

be/src/udf/python/python_client.h

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <arrow/status.h>
21+
22+
#include "arrow/flight/client.h"
23+
#include "common/status.h"
24+
#include "udf/python/python_udf_meta.h"
25+
#include "udf/python/python_udf_runtime.h"
26+
#include "util/arrow/utils.h"
27+
28+
namespace doris {
29+
30+
/**
31+
* Base class for Python UDF/UDAF/UDTF clients
32+
*
33+
* Provides common functionality for communicating with Python server via Arrow Flight:
34+
* - Connection management
35+
* - Stream initialization
36+
* - Error handling
37+
* - Process lifecycle management
38+
*/
39+
class PythonClient {
40+
public:
41+
using FlightDescriptor = arrow::flight::FlightDescriptor;
42+
using FlightClient = arrow::flight::FlightClient;
43+
using FlightStreamWriter = arrow::flight::FlightStreamWriter;
44+
using FlightStreamReader = arrow::flight::FlightStreamReader;
45+
46+
PythonClient() = default;
47+
virtual ~PythonClient() = default;
48+
49+
/**
50+
* Initialize connection to Python server
51+
* @param func_meta Function metadata (contains client_type for operation name)
52+
* @param process Python process handle
53+
* @return Status
54+
*/
55+
Status init(const PythonUDFMeta& func_meta, ProcessPtr process);
56+
57+
/**
58+
* Close connection and cleanup resources
59+
* @return Status
60+
*/
61+
Status close();
62+
63+
/**
64+
* Handle Arrow Flight error
65+
* @param status Arrow status
66+
* @return Doris Status with formatted error message
67+
*/
68+
Status handle_error(arrow::Status status);
69+
70+
/**
71+
* Get process information for debugging
72+
* @return Process string representation
73+
*/
74+
std::string print_process() const { return _process->to_string(); }
75+
76+
protected:
77+
/**
78+
* Check if process is alive
79+
* @return Status
80+
*/
81+
Status check_process_alive() const;
82+
83+
/**
84+
* Begin Flight stream with schema (called only once per stream)
85+
* @param schema Input schema
86+
* @return Status
87+
*/
88+
Status begin_stream(const std::shared_ptr<arrow::Schema>& schema);
89+
90+
/**
91+
* Write RecordBatch to server
92+
* @param input Input RecordBatch
93+
* @return Status
94+
*/
95+
Status write_batch(const arrow::RecordBatch& input);
96+
97+
/**
98+
* Read RecordBatch from server
99+
* @param output Output RecordBatch
100+
* @return Status
101+
*/
102+
Status read_batch(std::shared_ptr<arrow::RecordBatch>* output);
103+
104+
// Common state
105+
bool _inited = false;
106+
bool _begin = false; // Track if Begin() has been called
107+
std::string _operation_name; // Operation name for error messages
108+
std::unique_ptr<FlightClient> _arrow_client;
109+
std::unique_ptr<FlightStreamWriter> _writer;
110+
std::unique_ptr<FlightStreamReader> _reader;
111+
ProcessPtr _process;
112+
113+
private:
114+
DISALLOW_COPY_AND_ASSIGN(PythonClient);
115+
};
116+
117+
} // namespace doris

be/src/udf/python/python_env.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include <vector>
2626

2727
#include "common/status.h"
28-
#include "udf/python/python_udf_server.h"
28+
#include "udf/python/python_server.h"
2929
#include "util/string_util.h"
3030

3131
namespace doris {
@@ -283,7 +283,7 @@ Status PythonVersionManager::init(PythonEnvType env_type, const fs::path& python
283283
std::vector<PythonVersion> versions;
284284
RETURN_IF_ERROR(_env_scanner->scan());
285285
RETURN_IF_ERROR(_env_scanner->get_versions(&versions));
286-
RETURN_IF_ERROR(PythonUDFServerManager::instance().init(versions));
286+
RETURN_IF_ERROR(PythonServerManager::instance().init(versions));
287287
return Status::OK();
288288
}
289289

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include "udf/python/python_udf_server.h"
18+
#include "udf/python/python_server.h"
1919

2020
#include <butil/fd_utility.h>
2121
#include <dirent.h>
@@ -29,10 +29,11 @@
2929
#include "common/config.h"
3030
#include "udf/python/python_udaf_client.h"
3131
#include "udf/python/python_udf_client.h"
32+
#include "udf/python/python_udtf_client.h"
3233

3334
namespace doris {
3435

35-
Status PythonUDFServerManager::init(const std::vector<PythonVersion>& versions) {
36+
Status PythonServerManager::init(const std::vector<PythonVersion>& versions) {
3637
std::lock_guard<std::mutex> lock(_pools_mutex);
3738
for (const auto& version : versions) {
3839
if (_pools.find(version) != _pools.end()) continue;
@@ -45,9 +46,8 @@ Status PythonUDFServerManager::init(const std::vector<PythonVersion>& versions)
4546
}
4647

4748
template <typename T>
48-
Status PythonUDFServerManager::get_client(const PythonUDFMeta& func_meta,
49-
const PythonVersion& version,
50-
std::shared_ptr<T>* client) {
49+
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
50+
std::shared_ptr<T>* client) {
5151
PythonUDFProcessPoolPtr* pool = nullptr;
5252
{
5353
std::lock_guard<std::mutex> lock(_pools_mutex);
@@ -65,12 +65,12 @@ Status PythonUDFServerManager::get_client(const PythonUDFMeta& func_meta,
6565
return Status::OK();
6666
}
6767

68-
Status PythonUDFServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* process) {
68+
Status PythonServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* process) {
6969
DCHECK(pool != nullptr);
7070
const PythonVersion& version = pool->get_python_version();
7171
// e.g. /usr/local/python3.7/bin/python3
7272
std::string python_executable_path = version.get_executable_path();
73-
// e.g. /{DORIS_HOME}/plugins/python_udf/python_udf_server.py
73+
// e.g. /{DORIS_HOME}/plugins/python_udf/python_server.py
7474
std::string fight_server_path = get_fight_server_path();
7575
// e.g. grpc+unix:///home/doris/output/be/lib/udf/python/python_udf
7676
std::string base_unix_socket_path = get_base_unix_socket_path();
@@ -131,7 +131,7 @@ Status PythonUDFServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* proc
131131
return Status::OK();
132132
}
133133

134-
void PythonUDFServerManager::shutdown() {
134+
void PythonServerManager::shutdown() {
135135
std::lock_guard lock(_pools_mutex);
136136
for (auto& pool : _pools) {
137137
pool.second->shutdown();
@@ -140,13 +140,17 @@ void PythonUDFServerManager::shutdown() {
140140
LOG(INFO) << "Python UDF server manager shutdown successfully";
141141
}
142142

143-
// Explicit template instantiation for UDF and UDAF clients
144-
template Status PythonUDFServerManager::get_client<PythonUDFClient>(
143+
// Explicit template instantiation for UDF, UDAF and UDTF clients
144+
template Status PythonServerManager::get_client<PythonUDFClient>(
145145
const PythonUDFMeta& func_meta, const PythonVersion& version,
146146
std::shared_ptr<PythonUDFClient>* client);
147147

148-
template Status PythonUDFServerManager::get_client<PythonUDAFClient>(
148+
template Status PythonServerManager::get_client<PythonUDAFClient>(
149149
const PythonUDFMeta& func_meta, const PythonVersion& version,
150150
std::shared_ptr<PythonUDAFClient>* client);
151151

152+
template Status PythonServerManager::get_client<PythonUDTFClient>(
153+
const PythonUDFMeta& func_meta, const PythonVersion& version,
154+
std::shared_ptr<PythonUDTFClient>* client);
155+
152156
} // namespace doris

0 commit comments

Comments
 (0)