Skip to content

Commit c4f7c86

Browse files
committed
[Enhancement](udf) clear cache when droping function
1 parent 78a7b22 commit c4f7c86

File tree

27 files changed

+834
-30
lines changed

27 files changed

+834
-30
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
#include "runtime/index_policy/index_policy_mgr.h"
8989
#include "runtime/memory/global_memory_arbitrator.h"
9090
#include "runtime/snapshot_loader.h"
91+
#include "runtime/user_function_cache.h"
9192
#include "service/backend_options.h"
9293
#include "util/brpc_client_cache.h"
9394
#include "util/debug_points.h"
@@ -2396,12 +2397,17 @@ void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
23962397
}
23972398

23982399
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2400+
const auto& clean_req = req.clean_udf_cache_req;
2401+
23992402
if (doris::config::enable_java_support) {
2400-
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
2401-
static_cast<void>(
2402-
Jni::Util::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
2403-
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
2403+
static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2404+
}
2405+
2406+
if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2407+
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
24042408
}
2409+
2410+
LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
24052411
}
24062412

24072413
void report_index_policy_callback(const ClusterInfo* cluster_info) {

be/src/runtime/user_function_cache.cpp

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "io/fs/local_file_system.h"
4343
#include "runtime/exec_env.h"
4444
#include "runtime/plugin/cloud_plugin_downloader.h"
45+
#include "udf/python/python_server.h"
4546
#include "util/defer_op.h"
4647
#include "util/dynamic_util.h"
4748
#include "util/md5.h"
@@ -111,7 +112,20 @@ UserFunctionCacheEntry::~UserFunctionCacheEntry() {
111112

112113
// delete library file if should_delete_library is set
113114
if (should_delete_library.load()) {
114-
unlink(lib_file.c_str());
115+
if (type == LibType::PY_ZIP) {
116+
// For Python UDF, we need to delete both the unzipped directory and the original zip file.
117+
auto st = io::global_local_filesystem()->delete_directory_or_file(lib_file);
118+
119+
st = io::global_local_filesystem()->delete_file(lib_file + ".zip");
120+
121+
if (!st.ok()) [[unlikely]] {
122+
LOG(WARNING) << "failed to delete python udf files, lib_file=" << lib_file << ": "
123+
<< st.to_string();
124+
}
125+
126+
} else {
127+
unlink(lib_file.c_str());
128+
}
115129
}
116130
}
117131

@@ -174,10 +188,20 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
174188
<< ", other_checksum info: = " << it->second->debug_string();
175189
return Status::InternalError("duplicate function id");
176190
}
191+
192+
std::string full_path = dir + "/" + file;
177193
// create a cache entry and put it into entry map
178-
std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
179-
function_id, checksum, dir + "/" + file, lib_type);
194+
std::shared_ptr<UserFunctionCacheEntry> entry =
195+
UserFunctionCacheEntry::create_shared(function_id, checksum, full_path, lib_type);
180196
entry->is_downloaded = true;
197+
198+
// For Python UDF, _check_cache_is_python_udf has already unzipped the file.
199+
// Set lib_file to the unzipped directory.
200+
if (lib_type == LibType::PY_ZIP) {
201+
entry->lib_file = full_path.substr(0, full_path.size() - 4);
202+
entry->is_unziped = true;
203+
}
204+
181205
_entry_map[function_id] = entry;
182206

183207
return Status::OK();
@@ -547,4 +571,29 @@ Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::stri
547571
return Status::OK();
548572
}
549573

574+
void UserFunctionCache::drop_function_cache(int64_t fid) {
575+
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
576+
{
577+
std::lock_guard<std::mutex> l(_cache_lock);
578+
auto it = _entry_map.find(fid);
579+
if (it == _entry_map.end()) {
580+
return;
581+
}
582+
entry = it->second;
583+
_entry_map.erase(it);
584+
}
585+
586+
// For Python UDF, clear module cache in Python server before deleting files
587+
if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) {
588+
auto status = PythonServerManager::instance().clear_module_cache(entry->lib_file);
589+
if (!status.ok()) [[unlikely]] {
590+
LOG(WARNING) << "drop_function_cache: failed to clear Python module cache for "
591+
<< entry->lib_file << ": " << status.to_string();
592+
}
593+
}
594+
595+
// Mark for deletion, destructor will delete the files
596+
entry->should_delete_library.store(true);
597+
}
598+
550599
} // namespace doris

be/src/runtime/user_function_cache.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class UserFunctionCache {
6262
Status get_pypath(int64_t fid, const std::string& url, const std::string& checksum,
6363
std::string* libpath);
6464

65+
// Drop the cached function library by function id.
66+
void drop_function_cache(int64_t fid);
67+
6568
#ifndef BE_TEST
6669
private:
6770
#endif

be/src/udf/python/python_server.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <boost/asio.hpp>
2828
#include <boost/process.hpp>
2929

30+
#include "arrow/flight/client.h"
3031
#include "common/config.h"
3132
#include "udf/python/python_udaf_client.h"
3233
#include "udf/python/python_udf_client.h"
@@ -273,6 +274,78 @@ void PythonServerManager::shutdown() {
273274
_process_pools.clear();
274275
}
275276

277+
Status PythonServerManager::clear_module_cache(const std::string& location) {
278+
if (location.empty()) {
279+
return Status::InvalidArgument("Empty location for clear_module_cache");
280+
}
281+
282+
std::lock_guard<std::mutex> lock(_pools_mutex);
283+
284+
std::string body = fmt::format(R"({{"location": "{}"}})", location);
285+
286+
int success_count = 0;
287+
int fail_count = 0;
288+
bool has_active_process = false;
289+
290+
for (auto& [version, pool] : _process_pools) {
291+
for (auto& process : pool) {
292+
if (!process || !process->is_alive()) {
293+
continue;
294+
}
295+
has_active_process = true;
296+
try {
297+
auto loc_result = arrow::flight::Location::Parse(process->get_uri());
298+
if (!loc_result.ok()) [[unlikely]] {
299+
fail_count++;
300+
continue;
301+
}
302+
303+
auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
304+
if (!client_result.ok()) [[unlikely]] {
305+
fail_count++;
306+
continue;
307+
}
308+
auto client = std::move(*client_result);
309+
310+
arrow::flight::Action action;
311+
action.type = "clear_module_cache";
312+
action.body = arrow::Buffer::FromString(body);
313+
314+
auto result_stream = client->DoAction(action);
315+
if (!result_stream.ok()) {
316+
fail_count++;
317+
continue;
318+
}
319+
320+
auto result = (*result_stream)->Next();
321+
if (result.ok() && *result) {
322+
success_count++;
323+
} else {
324+
fail_count++;
325+
}
326+
327+
} catch (...) {
328+
fail_count++;
329+
}
330+
}
331+
}
332+
333+
if (!has_active_process) {
334+
return Status::OK();
335+
}
336+
337+
LOG(INFO) << "clear_module_cache completed for location=" << location
338+
<< ", success=" << success_count << ", failed=" << fail_count;
339+
340+
if (fail_count > 0) {
341+
return Status::InternalError(
342+
"clear_module_cache failed for location={}, success={}, failed={}", location,
343+
success_count, fail_count);
344+
}
345+
346+
return Status::OK();
347+
}
348+
276349
// Explicit template instantiation for UDF, UDAF and UDTF clients
277350
template Status PythonServerManager::get_client<PythonUDFClient>(
278351
const PythonUDFMeta& func_meta, const PythonVersion& version,

be/src/udf/python/python_server.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ class PythonServerManager {
4848

4949
Status get_process(const PythonVersion& version, ProcessPtr* process);
5050

51+
// Clear Python module cache for a specific UDF location across all processes
52+
Status clear_module_cache(const std::string& location);
53+
5154
Status ensure_pool_initialized(const PythonVersion& version);
5255

5356
void shutdown();

0 commit comments

Comments
 (0)