Skip to content

Commit d13ee15

Browse files
committed
1
1 parent 78a7b22 commit d13ee15

File tree

2 files changed

+103
-2
lines changed

2 files changed

+103
-2
lines changed

be/src/udf/python/python_server.cpp

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include <boost/asio.hpp>
2828
#include <boost/process.hpp>
29+
#include <fstream>
2930

3031
#include "common/config.h"
3132
#include "udf/python/python_udaf_client.h"
@@ -110,6 +111,7 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version
110111

111112
_initialized_versions.insert(version);
112113
_start_health_check_thread();
114+
_start_memory_monitor_thread();
113115

114116
return Status::OK();
115117
}
@@ -252,15 +254,21 @@ void PythonServerManager::_start_health_check_thread() {
252254
}
253255

254256
void PythonServerManager::shutdown() {
255-
// Signal health check thread to stop
257+
// Signal health check thread and memory monitor thread to stop
256258
_shutdown_flag.store(true, std::memory_order_release);
257259
_health_check_cv.notify_one();
260+
_memory_monitor_cv.notify_one();
258261

259262
if (_health_check_thread && _health_check_thread->joinable()) {
260263
_health_check_thread->join();
261264
_health_check_thread.reset();
262265
}
263266

267+
if (_memory_monitor_thread && _memory_monitor_thread->joinable()) {
268+
_memory_monitor_thread->join();
269+
_memory_monitor_thread.reset();
270+
}
271+
264272
// Shutdown all processes
265273
std::lock_guard<std::mutex> lock(_pools_mutex);
266274
for (auto& [version, pool] : _process_pools) {
@@ -273,6 +281,77 @@ void PythonServerManager::shutdown() {
273281
_process_pools.clear();
274282
}
275283

284+
void PythonServerManager::_start_memory_monitor_thread() {
285+
if (_memory_monitor_thread) return;
286+
287+
_memory_monitor_thread = std::make_unique<std::thread>([this]() {
288+
while (!_shutdown_flag.load(std::memory_order_acquire)) {
289+
// Wait for interval or shutdown signal
290+
{
291+
std::unique_lock<std::mutex> lock(_memory_monitor_mutex);
292+
_memory_monitor_cv.wait_for(lock, std::chrono::seconds(15), [this]() {
293+
return _shutdown_flag.load(std::memory_order_acquire);
294+
});
295+
}
296+
297+
if (_shutdown_flag.load(std::memory_order_acquire)) break;
298+
299+
_refresh_memory_stats();
300+
}
301+
302+
LOG(INFO) << "Python process memory monitor thread exiting";
303+
});
304+
}
305+
306+
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes) {
307+
// Read from /proc/{pid}/statm
308+
// Format: size resident shared text lib data dt
309+
std::string statm_path = fmt::format("/proc/{}/statm", pid);
310+
std::ifstream statm_file(statm_path);
311+
312+
if (!statm_file.is_open()) {
313+
return Status::InternalError("Cannot open {}", statm_path);
314+
}
315+
316+
size_t vms_pages = 0, rss_pages = 0;
317+
statm_file >> vms_pages >> rss_pages;
318+
319+
if (statm_file.fail()) {
320+
return Status::InternalError("Failed to read {}", statm_path);
321+
}
322+
323+
// Convert pages to bytes
324+
long page_size = sysconf(_SC_PAGESIZE);
325+
*rss_bytes = rss_pages * page_size;
326+
*vms_bytes = vms_pages * page_size;
327+
328+
return Status::OK();
329+
}
330+
331+
void PythonServerManager::_refresh_memory_stats() {
332+
std::lock_guard<std::mutex> lock(_pools_mutex);
333+
334+
int64_t total_rss = 0;
335+
336+
for (const auto& [version, pool] : _process_pools) {
337+
for (const auto& process : pool) {
338+
if (!process || !process->is_alive()) continue;
339+
340+
size_t rss_bytes = 0, vms_bytes = 0;
341+
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes, &vms_bytes);
342+
343+
if (s.ok()) {
344+
total_rss += rss_bytes;
345+
} else [[unlikely]] {
346+
LOG(WARNING) << "Failed to read memory info for Python process (pid="
347+
<< process->get_child_pid() << "): " << s.to_string();
348+
}
349+
}
350+
}
351+
_mem_tracker.set_consumption(total_rss);
352+
LOG(INFO) << _mem_tracker.log_usage();
353+
}
354+
276355
// Explicit template instantiation for UDF, UDAF and UDTF clients
277356
template Status PythonServerManager::get_client<PythonUDFClient>(
278357
const PythonUDFMeta& func_meta, const PythonVersion& version,

be/src/udf/python/python_server.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
#include <thread>
2424

2525
#include "common/status.h"
26+
#include "runtime/memory/mem_tracker.h"
2627
#include "udf/python/python_udf_meta.h"
2728
#include "udf/python/python_udf_runtime.h"
2829

2930
namespace doris {
30-
3131
class PythonServerManager {
3232
public:
3333
PythonServerManager() = default;
@@ -59,6 +59,22 @@ class PythonServerManager {
5959
*/
6060
void _start_health_check_thread();
6161

62+
/**
63+
* Start memory monitor background thread
64+
* Thread periodically reads /proc/{pid}/statm for each Python process
65+
*/
66+
void _start_memory_monitor_thread();
67+
68+
/**
69+
* Read memory information for a single process from /proc/{pid}/statm
70+
*/
71+
Status _read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes);
72+
73+
/**
74+
* Refresh memory statistics for all Python processes
75+
*/
76+
void _refresh_memory_stats();
77+
6278
std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
6379
// Protects _process_pools access
6480
std::mutex _pools_mutex;
@@ -69,6 +85,12 @@ class PythonServerManager {
6985
std::atomic<bool> _shutdown_flag {false};
7086
std::condition_variable _health_check_cv;
7187
std::mutex _health_check_mutex;
88+
89+
// Memory monitoring
90+
std::unique_ptr<std::thread> _memory_monitor_thread;
91+
std::condition_variable _memory_monitor_cv;
92+
std::mutex _memory_monitor_mutex;
93+
MemTracker _mem_tracker {"PythonUDFProcesses"};
7294
};
7395

7496
} // namespace doris

0 commit comments

Comments
 (0)