Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion be/src/udf/python/python_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <fstream>

#include "common/config.h"
#include "udf/python/python_udaf_client.h"
Expand Down Expand Up @@ -110,6 +111,7 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version

_initialized_versions.insert(version);
_start_health_check_thread();
_start_memory_monitor_thread();

return Status::OK();
}
Expand Down Expand Up @@ -252,15 +254,21 @@ void PythonServerManager::_start_health_check_thread() {
}

void PythonServerManager::shutdown() {
// Signal health check thread to stop
// Signal health check thread and memory monitor thread to stop
_shutdown_flag.store(true, std::memory_order_release);
_health_check_cv.notify_one();
_memory_monitor_cv.notify_one();

if (_health_check_thread && _health_check_thread->joinable()) {
_health_check_thread->join();
_health_check_thread.reset();
}

if (_memory_monitor_thread && _memory_monitor_thread->joinable()) {
_memory_monitor_thread->join();
_memory_monitor_thread.reset();
}

// Shutdown all processes
std::lock_guard<std::mutex> lock(_pools_mutex);
for (auto& [version, pool] : _process_pools) {
Expand All @@ -273,6 +281,77 @@ void PythonServerManager::shutdown() {
_process_pools.clear();
}

void PythonServerManager::_start_memory_monitor_thread() {
if (_memory_monitor_thread) return;

_memory_monitor_thread = std::make_unique<std::thread>([this]() {
while (!_shutdown_flag.load(std::memory_order_acquire)) {
// Wait for interval or shutdown signal
{
std::unique_lock<std::mutex> lock(_memory_monitor_mutex);
_memory_monitor_cv.wait_for(lock, std::chrono::seconds(15), [this]() {
return _shutdown_flag.load(std::memory_order_acquire);
});
}

if (_shutdown_flag.load(std::memory_order_acquire)) break;

_refresh_memory_stats();
}

LOG(INFO) << "Python process memory monitor thread exiting";
});
}

Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes) {
// Read from /proc/{pid}/statm
// Format: size resident shared text lib data dt
std::string statm_path = fmt::format("/proc/{}/statm", pid);
std::ifstream statm_file(statm_path);

if (!statm_file.is_open()) {
return Status::InternalError("Cannot open {}", statm_path);
}

size_t vms_pages = 0, rss_pages = 0;
statm_file >> vms_pages >> rss_pages;

if (statm_file.fail()) {
return Status::InternalError("Failed to read {}", statm_path);
}

// Convert pages to bytes
long page_size = sysconf(_SC_PAGESIZE);
*rss_bytes = rss_pages * page_size;
*vms_bytes = vms_pages * page_size;

return Status::OK();
}

void PythonServerManager::_refresh_memory_stats() {
std::lock_guard<std::mutex> lock(_pools_mutex);

int64_t total_rss = 0;

for (const auto& [version, pool] : _process_pools) {
for (const auto& process : pool) {
if (!process || !process->is_alive()) continue;

size_t rss_bytes = 0, vms_bytes = 0;
Status s = _read_process_memory(process->get_child_pid(), &rss_bytes, &vms_bytes);

if (s.ok()) {
total_rss += rss_bytes;
} else [[unlikely]] {
LOG(WARNING) << "Failed to read memory info for Python process (pid="
<< process->get_child_pid() << "): " << s.to_string();
}
}
}
_mem_tracker.set_consumption(total_rss);
LOG(INFO) << _mem_tracker.log_usage();
}

// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
Expand Down
24 changes: 23 additions & 1 deletion be/src/udf/python/python_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include <thread>

#include "common/status.h"
#include "runtime/memory/mem_tracker.h"
#include "udf/python/python_udf_meta.h"
#include "udf/python/python_udf_runtime.h"

namespace doris {

class PythonServerManager {
public:
PythonServerManager() = default;
Expand Down Expand Up @@ -59,6 +59,22 @@ class PythonServerManager {
*/
void _start_health_check_thread();

/**
* Start memory monitor background thread
* Thread periodically reads /proc/{pid}/statm for each Python process
*/
void _start_memory_monitor_thread();

/**
* Read memory information for a single process from /proc/{pid}/statm
*/
Status _read_process_memory(pid_t pid, size_t* rss_bytes, size_t* vms_bytes);

/**
* Refresh memory statistics for all Python processes
*/
void _refresh_memory_stats();

std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
// Protects _process_pools access
std::mutex _pools_mutex;
Expand All @@ -69,6 +85,12 @@ class PythonServerManager {
std::atomic<bool> _shutdown_flag {false};
std::condition_variable _health_check_cv;
std::mutex _health_check_mutex;

// Memory monitoring
std::unique_ptr<std::thread> _memory_monitor_thread;
std::condition_variable _memory_monitor_cv;
std::mutex _memory_monitor_mutex;
MemTracker _mem_tracker {"PythonUDFProcesses"};
};

} // namespace doris
Loading