Skip to content

Commit 551e1ff

Browse files
committed
1
1 parent 78a7b22 commit 551e1ff

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

be/src/udf/python/python_server.cpp

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
#include <fmt/core.h>
2424
#include <sys/poll.h>
2525
#include <sys/stat.h>
26+
#include <unistd.h>
2627

2728
#include <boost/asio.hpp>
2829
#include <boost/process.hpp>
30+
#include <fstream>
2931

3032
#include "common/config.h"
3133
#include "udf/python/python_udaf_client.h"
@@ -110,6 +112,7 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version
110112

111113
_initialized_versions.insert(version);
112114
_start_health_check_thread();
115+
_start_memory_monitor_thread();
113116

114117
return Status::OK();
115118
}
@@ -252,15 +255,21 @@ void PythonServerManager::_start_health_check_thread() {
252255
}
253256

254257
void PythonServerManager::shutdown() {
255-
// Signal health check thread to stop
258+
// Signal health check thread and memory monitor thread to stop
256259
_shutdown_flag.store(true, std::memory_order_release);
257260
_health_check_cv.notify_one();
261+
_memory_monitor_cv.notify_one();
258262

259263
if (_health_check_thread && _health_check_thread->joinable()) {
260264
_health_check_thread->join();
261265
_health_check_thread.reset();
262266
}
263267

268+
if (_memory_monitor_thread && _memory_monitor_thread->joinable()) {
269+
_memory_monitor_thread->join();
270+
_memory_monitor_thread.reset();
271+
}
272+
264273
// Shutdown all processes
265274
std::lock_guard<std::mutex> lock(_pools_mutex);
266275
for (auto& [version, pool] : _process_pools) {
@@ -273,6 +282,77 @@ void PythonServerManager::shutdown() {
273282
_process_pools.clear();
274283
}
275284

285+
void PythonServerManager::_start_memory_monitor_thread() {
286+
if (_memory_monitor_thread) return;
287+
288+
_memory_monitor_thread = std::make_unique<std::thread>([this]() {
289+
while (!_shutdown_flag.load(std::memory_order_acquire)) {
290+
// Wait for interval or shutdown signal
291+
{
292+
std::unique_lock<std::mutex> lock(_memory_monitor_mutex);
293+
_memory_monitor_cv.wait_for(lock, std::chrono::seconds(15), [this]() {
294+
return _shutdown_flag.load(std::memory_order_acquire);
295+
});
296+
}
297+
298+
if (_shutdown_flag.load(std::memory_order_acquire)) break;
299+
300+
_refresh_memory_stats();
301+
}
302+
303+
LOG(INFO) << "Python process memory monitor thread exiting";
304+
});
305+
}
306+
307+
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes) {
308+
// Read from /proc/{pid}/statm
309+
// Format: size resident shared text lib data dt
310+
std::string statm_path = fmt::format("/proc/{}/statm", pid);
311+
std::ifstream statm_file(statm_path);
312+
313+
if (!statm_file.is_open()) {
314+
return Status::InternalError("Cannot open {}", statm_path);
315+
}
316+
317+
size_t vms_pages = 0, rss_pages = 0;
318+
statm_file >> vms_pages >> rss_pages;
319+
320+
if (statm_file.fail()) {
321+
return Status::InternalError("Failed to read {}", statm_path);
322+
}
323+
324+
// Convert pages to bytes
325+
long page_size = sysconf(_SC_PAGESIZE);
326+
*rss_bytes = rss_pages * page_size;
327+
*vms_bytes = vms_pages * page_size;
328+
329+
return Status::OK();
330+
}
331+
332+
void PythonServerManager::_refresh_memory_stats() {
333+
std::lock_guard<std::mutex> lock(_pools_mutex);
334+
335+
int64_t total_rss = 0;
336+
337+
for (const auto& [version, pool] : _process_pools) {
338+
for (const auto& process : pool) {
339+
if (!process || !process->is_alive()) continue;
340+
341+
size_t rss_bytes = 0, vms_bytes = 0;
342+
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes, &vms_bytes);
343+
344+
if (s.ok()) {
345+
total_rss += rss_bytes;
346+
} else [[unlikely]] {
347+
LOG(WARNING) << "Failed to read memory info for Python process (pid="
348+
<< process->get_child_pid() << "): " << s.to_string();
349+
}
350+
}
351+
}
352+
_mem_tracker.set_consumption(total_rss);
353+
LOG(INFO) << _mem_tracker.log_usage();
354+
}
355+
276356
// Explicit template instantiation for UDF, UDAF and UDTF clients
277357
template Status PythonServerManager::get_client<PythonUDFClient>(
278358
const PythonUDFMeta& func_meta, const PythonVersion& version,

be/src/udf/python/python_server.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,19 @@
2323
#include <thread>
2424

2525
#include "common/status.h"
26+
#include "runtime/memory/mem_tracker.h"
2627
#include "udf/python/python_udf_meta.h"
2728
#include "udf/python/python_udf_runtime.h"
2829

2930
namespace doris {
3031

32+
// Memory information for a single Python UDF process
33+
struct PythonProcessMemoryInfo {
34+
pid_t pid;
35+
size_t rss_bytes; // Resident Set Size (physical memory)
36+
size_t vms_bytes; // Virtual Memory Size
37+
};
38+
3139
class PythonServerManager {
3240
public:
3341
PythonServerManager() = default;
@@ -59,6 +67,22 @@ class PythonServerManager {
5967
*/
6068
void _start_health_check_thread();
6169

70+
/**
71+
* Start memory monitor background thread
72+
* Thread periodically reads /proc/{pid}/statm for each Python process
73+
*/
74+
void _start_memory_monitor_thread();
75+
76+
/**
77+
* Read memory information for a single process from /proc/{pid}/statm
78+
*/
79+
Status _read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes);
80+
81+
/**
82+
* Refresh memory statistics for all Python processes
83+
*/
84+
void _refresh_memory_stats();
85+
6286
std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
6387
// Protects _process_pools access
6488
std::mutex _pools_mutex;
@@ -69,6 +93,13 @@ class PythonServerManager {
6993
std::atomic<bool> _shutdown_flag {false};
7094
std::condition_variable _health_check_cv;
7195
std::mutex _health_check_mutex;
96+
97+
// Memory monitoring
98+
std::unique_ptr<std::thread> _memory_monitor_thread;
99+
std::condition_variable _memory_monitor_cv;
100+
std::mutex _memory_monitor_mutex;
101+
std::atomic<int64_t> _total_memory_bytes {0};
102+
MemTracker _mem_tracker {"PythonUDFProcesses"};
72103
};
73104

74105
} // namespace doris

0 commit comments

Comments
 (0)