Skip to content

Commit 3bf910c

Browse files
authored
[Enhancement](pyudf) Support MemTracker in PythonUdf (#60655)
1 parent 5310494 commit 3bf910c

File tree

5 files changed

+215
-57
lines changed

5 files changed

+215
-57
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,9 @@ DEFINE_String(python_venv_interpreter_paths, "");
11291129
// max python processes in global shared pool, each version can have up to this many processes
11301130
// 0 means use CPU core count as default, otherwise use the specified value
11311131
DEFINE_mInt32(max_python_process_num, "0");
1132+
// Memory limit in bytes for all Python UDF processes; warning is logged when exceeded
1133+
// default is 10GB
1134+
DEFINE_mInt64(python_udf_processes_memory_limit_bytes, "10737418240");
11321135

11331136
// Set config randomly to check more issues in github workflow
11341137
DEFINE_Bool(enable_fuzzy_mode, "false");

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,8 @@ DECLARE_String(python_venv_root_path);
11901190
DECLARE_String(python_venv_interpreter_paths);
11911191
// max python processes in global shared pool, each version can have up to this many processes
11921192
DECLARE_mInt32(max_python_process_num);
1193+
// Memory limit in bytes for all Python UDF processes; warning is logged when exceeded
1194+
DECLARE_mInt64(python_udf_processes_memory_limit_bytes);
11931195

11941196
// Set config randomly to check more issues in github workflow
11951197
DECLARE_Bool(enable_fuzzy_mode);

be/src/udf/python/python_server.cpp

Lines changed: 101 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include <boost/asio.hpp>
2828
#include <boost/process.hpp>
29+
#include <fstream>
2930

3031
#include "common/config.h"
3132
#include "udf/python/python_udaf_client.h"
@@ -191,66 +192,70 @@ Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* proce
191192
void PythonServerManager::_start_health_check_thread() {
192193
if (_health_check_thread) return;
193194

194-
LOG(INFO) << "Starting Python process health check thread (interval: 60 seconds)";
195+
LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
195196

196197
_health_check_thread = std::make_unique<std::thread>([this]() {
197198
// Health check loop
198199
while (!_shutdown_flag.load(std::memory_order_acquire)) {
199200
// Wait for interval or shutdown signal
200201
{
201202
std::unique_lock<std::mutex> lock(_health_check_mutex);
202-
_health_check_cv.wait_for(lock, std::chrono::seconds(60), [this]() {
203+
_health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
203204
return _shutdown_flag.load(std::memory_order_acquire);
204205
});
205206
}
206207

207208
if (_shutdown_flag.load(std::memory_order_acquire)) break;
208209

209-
std::lock_guard<std::mutex> lock(_pools_mutex);
210-
211-
int total_checked = 0;
212-
int total_dead = 0;
213-
int total_recreated = 0;
214-
215-
for (auto& [version, pool] : _process_pools) {
216-
for (size_t i = 0; i < pool.size(); ++i) {
217-
auto& process = pool[i];
218-
if (!process) continue;
219-
220-
total_checked++;
221-
if (!process->is_alive()) {
222-
total_dead++;
223-
LOG(WARNING)
224-
<< "Detected dead Python process (pid=" << process->get_child_pid()
225-
<< ", version=" << version.to_string() << "), recreating...";
226-
227-
ProcessPtr new_process;
228-
Status s = fork(version, &new_process);
229-
if (s.ok()) {
230-
pool[i] = std::move(new_process);
231-
total_recreated++;
232-
LOG(INFO) << "Successfully recreated Python process for version "
233-
<< version.to_string();
234-
} else {
235-
LOG(ERROR) << "Failed to recreate Python process for version "
236-
<< version.to_string() << ": " << s.to_string();
237-
pool.erase(pool.begin() + i);
238-
--i;
239-
}
240-
}
241-
}
242-
}
243-
244-
if (total_dead > 0) {
245-
LOG(INFO) << "Health check completed: checked=" << total_checked
246-
<< ", dead=" << total_dead << ", recreated=" << total_recreated;
247-
}
210+
_check_and_recreate_processes();
211+
_refresh_memory_stats();
248212
}
249213

250214
LOG(INFO) << "Python process health check thread exiting";
251215
});
252216
}
253217

218+
void PythonServerManager::_check_and_recreate_processes() {
219+
std::lock_guard<std::mutex> lock(_pools_mutex);
220+
221+
int total_checked = 0;
222+
int total_dead = 0;
223+
int total_recreated = 0;
224+
225+
for (auto& [version, pool] : _process_pools) {
226+
for (size_t i = 0; i < pool.size(); ++i) {
227+
auto& process = pool[i];
228+
if (!process) continue;
229+
230+
total_checked++;
231+
if (!process->is_alive()) {
232+
total_dead++;
233+
LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
234+
<< ", version=" << version.to_string() << "), recreating...";
235+
236+
ProcessPtr new_process;
237+
Status s = fork(version, &new_process);
238+
if (s.ok()) {
239+
pool[i] = std::move(new_process);
240+
total_recreated++;
241+
LOG(INFO) << "Successfully recreated Python process for version "
242+
<< version.to_string();
243+
} else {
244+
LOG(ERROR) << "Failed to recreate Python process for version "
245+
<< version.to_string() << ": " << s.to_string();
246+
pool.erase(pool.begin() + i);
247+
--i;
248+
}
249+
}
250+
}
251+
}
252+
253+
if (total_dead > 0) {
254+
LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
255+
<< ", recreated=" << total_recreated;
256+
}
257+
}
258+
254259
void PythonServerManager::shutdown() {
255260
// Signal health check thread to stop
256261
_shutdown_flag.store(true, std::memory_order_release);
@@ -273,6 +278,61 @@ void PythonServerManager::shutdown() {
273278
_process_pools.clear();
274279
}
275280

281+
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
282+
// Read from /proc/{pid}/statm
283+
// Format: size resident shared text lib data dt
284+
std::string statm_path = fmt::format("/proc/{}/statm", pid);
285+
std::ifstream statm_file(statm_path);
286+
287+
if (!statm_file.is_open()) {
288+
return Status::InternalError("Cannot open {}", statm_path);
289+
}
290+
291+
size_t size_pages = 0, rss_pages = 0;
292+
// we only care about RSS, read and ignore the total size field
293+
statm_file >> size_pages >> rss_pages;
294+
295+
if (statm_file.fail()) {
296+
return Status::InternalError("Failed to read {}", statm_path);
297+
}
298+
299+
// Convert pages to bytes
300+
long page_size = sysconf(_SC_PAGESIZE);
301+
*rss_bytes = rss_pages * page_size;
302+
303+
return Status::OK();
304+
}
305+
306+
void PythonServerManager::_refresh_memory_stats() {
307+
std::lock_guard<std::mutex> lock(_pools_mutex);
308+
309+
int64_t total_rss = 0;
310+
311+
for (const auto& [version, pool] : _process_pools) {
312+
for (const auto& process : pool) {
313+
if (!process || !process->is_alive()) continue;
314+
315+
size_t rss_bytes = 0;
316+
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
317+
318+
if (s.ok()) {
319+
total_rss += rss_bytes;
320+
} else [[unlikely]] {
321+
LOG(WARNING) << "Failed to read memory info for Python process (pid="
322+
<< process->get_child_pid() << "): " << s.to_string();
323+
}
324+
}
325+
}
326+
_mem_tracker.set_consumption(total_rss);
327+
LOG(INFO) << _mem_tracker.log_usage();
328+
329+
if (config::python_udf_processes_memory_limit_bytes > 0 &&
330+
total_rss > config::python_udf_processes_memory_limit_bytes) {
331+
LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
332+
<< ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
333+
}
334+
}
335+
276336
// Explicit template instantiation for UDF, UDAF and UDTF clients
277337
template Status PythonServerManager::get_client<PythonUDFClient>(
278338
const PythonUDFMeta& func_meta, const PythonVersion& version,

be/src/udf/python/python_server.h

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
#include <thread>
2424

2525
#include "common/status.h"
26+
#include "runtime/memory/mem_tracker.h"
2627
#include "udf/python/python_udf_meta.h"
2728
#include "udf/python/python_udf_runtime.h"
2829

2930
namespace doris {
30-
3131
class PythonServerManager {
3232
public:
3333
PythonServerManager() = default;
@@ -52,13 +52,37 @@ class PythonServerManager {
5252

5353
void shutdown();
5454

55+
#ifdef BE_TEST
56+
// For unit testing only.
57+
void check_and_recreate_processes_for_test() { _check_and_recreate_processes(); }
58+
59+
std::unordered_map<PythonVersion, std::vector<ProcessPtr>>& process_pools_for_test() {
60+
return _process_pools;
61+
}
62+
#endif
63+
5564
private:
5665
/**
5766
* Start health check background thread (called once by ensure_pool_initialized)
58-
* Thread periodically checks process health and recreates dead processes
67+
* Thread periodically checks process health and refreshes memory stats
5968
*/
6069
void _start_health_check_thread();
6170

71+
/**
72+
* Check process health and recreate dead processes
73+
*/
74+
void _check_and_recreate_processes();
75+
76+
/**
77+
* Read resident set size (RSS) for a single process from /proc/{pid}/statm
78+
*/
79+
Status _read_process_memory(pid_t pid, size_t* rss_bytes);
80+
81+
/**
82+
* Refresh memory statistics for all Python processes
83+
*/
84+
void _refresh_memory_stats();
85+
6286
std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
6387
// Protects _process_pools access
6488
std::mutex _pools_mutex;
@@ -69,6 +93,7 @@ class PythonServerManager {
6993
std::atomic<bool> _shutdown_flag {false};
7094
std::condition_variable _health_check_cv;
7195
std::mutex _health_check_mutex;
96+
MemTracker _mem_tracker {"PythonUDFProcesses"};
7297
};
7398

7499
} // namespace doris

0 commit comments

Comments
 (0)