Skip to content

Commit 06484c7

Browse files
committed
[Enhancement](udf) clear cache when droping function
1 parent 78a7b22 commit 06484c7

File tree

9 files changed

+265
-10
lines changed

9 files changed

+265
-10
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
#include "runtime/index_policy/index_policy_mgr.h"
8989
#include "runtime/memory/global_memory_arbitrator.h"
9090
#include "runtime/snapshot_loader.h"
91+
#include "runtime/user_function_cache.h"
9192
#include "service/backend_options.h"
9293
#include "util/brpc_client_cache.h"
9394
#include "util/debug_points.h"
@@ -2396,12 +2397,17 @@ void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
23962397
}
23972398

23982399
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2400+
const auto& clean_req = req.clean_udf_cache_req;
2401+
23992402
if (doris::config::enable_java_support) {
2400-
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
2401-
static_cast<void>(
2402-
Jni::Util::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
2403-
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
2403+
static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2404+
}
2405+
2406+
if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2407+
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
24042408
}
2409+
2410+
LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
24052411
}
24062412

24072413
void report_index_policy_callback(const ClusterInfo* cluster_info) {

be/src/runtime/user_function_cache.cpp

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "util/dynamic_util.h"
4747
#include "util/md5.h"
4848
#include "util/string_util.h"
49+
#include "udf/python/python_server.h"
4950

5051
namespace doris {
5152

@@ -111,7 +112,15 @@ UserFunctionCacheEntry::~UserFunctionCacheEntry() {
111112

112113
// delete library file if should_delete_library is set
113114
if (should_delete_library.load()) {
114-
unlink(lib_file.c_str());
115+
if (type == LibType::PY_ZIP) {
116+
// For Python UDF, we need to delete both the unzipped directory and the original zip file.
117+
auto st = io::global_local_filesystem()->delete_directory_or_file(lib_file);
118+
119+
st = io::global_local_filesystem()->delete_file(lib_file + ".zip");
120+
121+
} else {
122+
unlink(lib_file.c_str());
123+
}
115124
}
116125
}
117126

@@ -174,10 +183,20 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
174183
<< ", other_checksum info: = " << it->second->debug_string();
175184
return Status::InternalError("duplicate function id");
176185
}
186+
187+
std::string full_path = dir + "/" + file;
177188
// create a cache entry and put it into entry map
178-
std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
179-
function_id, checksum, dir + "/" + file, lib_type);
189+
std::shared_ptr<UserFunctionCacheEntry> entry =
190+
UserFunctionCacheEntry::create_shared(function_id, checksum, full_path, lib_type);
180191
entry->is_downloaded = true;
192+
193+
// For Python UDF, _check_cache_is_python_udf has already unzipped the file.
194+
// Set lib_file to the unzipped directory.
195+
if (lib_type == LibType::PY_ZIP) {
196+
entry->lib_file = full_path.substr(0, full_path.size() - 4);
197+
entry->is_unziped = true;
198+
}
199+
181200
_entry_map[function_id] = entry;
182201

183202
return Status::OK();
@@ -547,4 +566,29 @@ Status UserFunctionCache::_check_and_return_default_java_udf_url(const std::stri
547566
return Status::OK();
548567
}
549568

569+
void UserFunctionCache::drop_function_cache(int64_t fid) {
570+
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
571+
{
572+
std::lock_guard<std::mutex> l(_cache_lock);
573+
auto it = _entry_map.find(fid);
574+
if (it == _entry_map.end()) {
575+
return;
576+
}
577+
entry = it->second;
578+
_entry_map.erase(it);
579+
}
580+
581+
// For Python UDF, clear module cache in Python server before deleting files
582+
if (entry->type == LibType::PY_ZIP && !entry->lib_file.empty()) {
583+
auto status = PythonServerManager::instance().clear_module_cache(entry->lib_file);
584+
if (!status.ok()) [[unlikely]] {
585+
LOG(WARNING) << "drop_function_cache: failed to clear Python module cache for "
586+
<< entry->lib_file << ": " << status.to_string();
587+
}
588+
}
589+
590+
// Mark for deletion, destructor will delete the files
591+
entry->should_delete_library.store(true);
592+
}
593+
550594
} // namespace doris

be/src/runtime/user_function_cache.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class UserFunctionCache {
6262
Status get_pypath(int64_t fid, const std::string& url, const std::string& checksum,
6363
std::string* libpath);
6464

65+
// Drop the cached function library by function id.
66+
void drop_function_cache(int64_t fid);
67+
6568
#ifndef BE_TEST
6669
private:
6770
#endif

be/src/udf/python/python_server.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <boost/asio.hpp>
2828
#include <boost/process.hpp>
2929

30+
#include "arrow/flight/client.h"
3031
#include "common/config.h"
3132
#include "udf/python/python_udaf_client.h"
3233
#include "udf/python/python_udf_client.h"
@@ -273,6 +274,78 @@ void PythonServerManager::shutdown() {
273274
_process_pools.clear();
274275
}
275276

277+
Status PythonServerManager::clear_module_cache(const std::string& location) {
278+
if (location.empty()) {
279+
return Status::InvalidArgument("Empty location for clear_module_cache");
280+
}
281+
282+
std::lock_guard<std::mutex> lock(_pools_mutex);
283+
284+
std::string body = fmt::format(R"({{"location": "{}"}})", location);
285+
286+
int success_count = 0;
287+
int fail_count = 0;
288+
bool has_active_process = false;
289+
290+
for (auto& [version, pool] : _process_pools) {
291+
for (auto& process : pool) {
292+
if (!process || !process->is_alive()) {
293+
continue;
294+
}
295+
has_active_process = true;
296+
try {
297+
auto loc_result = arrow::flight::Location::Parse(process->get_uri());
298+
if (!loc_result.ok()) [[unlikely]] {
299+
fail_count++;
300+
continue;
301+
}
302+
303+
auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
304+
if (!client_result.ok()) [[unlikely]] {
305+
fail_count++;
306+
continue;
307+
}
308+
auto client = std::move(*client_result);
309+
310+
arrow::flight::Action action;
311+
action.type = "clear_module_cache";
312+
action.body = arrow::Buffer::FromString(body);
313+
314+
auto result_stream = client->DoAction(action);
315+
if (!result_stream.ok()) {
316+
fail_count++;
317+
continue;
318+
}
319+
320+
auto result = (*result_stream)->Next();
321+
if (result.ok() && *result) {
322+
success_count++;
323+
} else {
324+
fail_count++;
325+
}
326+
327+
} catch (...) {
328+
fail_count++;
329+
}
330+
}
331+
}
332+
333+
if (!has_active_process) {
334+
return Status::OK();
335+
}
336+
337+
LOG(INFO) << "clear_module_cache completed for location=" << location
338+
<< ", success=" << success_count << ", failed=" << fail_count;
339+
340+
if (fail_count > 0) {
341+
return Status::InternalError(
342+
"clear_module_cache failed for location={}, success={}, failed={}",
343+
location, success_count, fail_count);
344+
}
345+
346+
return Status::OK();
347+
}
348+
276349
// Explicit template instantiation for UDF, UDAF and UDTF clients
277350
template Status PythonServerManager::get_client<PythonUDFClient>(
278351
const PythonUDFMeta& func_meta, const PythonVersion& version,

be/src/udf/python/python_server.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ class PythonServerManager {
4848

4949
Status get_process(const PythonVersion& version, ProcessPtr* process);
5050

51+
// Clear Python module cache for a specific UDF location across all processes
52+
Status clear_module_cache(const std::string& location);
53+
5154
Status ensure_pool_initialized(const PythonVersion& version);
5255

5356
void shutdown();

be/src/udf/python/python_server.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,6 +2442,97 @@ def do_exchange(
24422442
else:
24432443
raise ValueError(f"Unsupported client type: {python_udf_meta.client_type}")
24442444

2445+
def do_action(
2446+
self,
2447+
context: flight.ServerCallContext,
2448+
action: flight.Action,
2449+
):
2450+
"""
2451+
Handle Flight actions for cache management.
2452+
2453+
Supported actions:
2454+
- "clear_module_cache": Clear Python module cache for a specific location
2455+
Body: JSON with "location" field (the UDF cache directory path)
2456+
"""
2457+
action_type = action.type
2458+
2459+
if action_type == "clear_module_cache":
2460+
yield from self._handle_clear_module_cache(action.body.to_pybytes())
2461+
else:
2462+
raise flight.FlightUnavailableError(f"Unknown action: {action_type}")
2463+
2464+
def _handle_clear_module_cache(self, body: bytes):
2465+
"""
2466+
Clear Python module cache for a specific UDF location.
2467+
2468+
This removes modules from sys.modules that were loaded from the specified
2469+
location, allowing fresh imports when a new UDF with the same module name
2470+
is created.
2471+
"""
2472+
try:
2473+
params = json.loads(body.decode("utf-8"))
2474+
location = params.get("location", "")
2475+
2476+
if not location:
2477+
yield flight.Result(b'{"success": false, "error": "empty location"}')
2478+
return
2479+
2480+
cleared_modules = self._clear_modules_from_location(location)
2481+
2482+
result = {
2483+
"success": True,
2484+
"cleared_modules": cleared_modules,
2485+
"location": location,
2486+
}
2487+
yield flight.Result(json.dumps(result).encode("utf-8"))
2488+
2489+
except Exception as e:
2490+
logging.error("clear_module_cache failed: %s", e)
2491+
yield flight.Result(json.dumps({
2492+
"success": False,
2493+
"error": str(e)
2494+
}).encode("utf-8"))
2495+
2496+
def _clear_modules_from_location(self, location: str) -> list:
2497+
"""
2498+
Remove all modules from sys.modules that were loaded from the given location.
2499+
2500+
Returns list of cleared module names.
2501+
"""
2502+
cleared = []
2503+
modules_to_remove = []
2504+
2505+
# Find all modules loaded from this location
2506+
for name, module in list(sys.modules.items()):
2507+
if module is None:
2508+
continue
2509+
2510+
# Check if module was loaded from this location
2511+
module_file = getattr(module, "__file__", None)
2512+
module_path = getattr(module, "__path__", None)
2513+
2514+
if module_file and module_file.startswith(location):
2515+
modules_to_remove.append(name)
2516+
elif module_path:
2517+
# For packages, check __path__
2518+
for p in module_path:
2519+
if p.startswith(location):
2520+
modules_to_remove.append(name)
2521+
break
2522+
2523+
# Remove modules (do this separately to avoid modifying dict during iteration)
2524+
for name in modules_to_remove:
2525+
del sys.modules[name]
2526+
cleared.append(name)
2527+
2528+
# Also clear any import locks for these modules
2529+
with ModuleUDFLoader._import_locks_lock:
2530+
for name in cleared:
2531+
if name in ModuleUDFLoader._import_locks:
2532+
del ModuleUDFLoader._import_locks[name]
2533+
2534+
return cleared
2535+
24452536

24462537
class UDAFOperationType(Enum):
24472538
"""Enum representing UDAF operation types."""

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropFunctionCommand.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.doris.analysis.StmtType;
2323
import org.apache.doris.catalog.Database;
2424
import org.apache.doris.catalog.Env;
25+
import org.apache.doris.catalog.Function;
2526
import org.apache.doris.catalog.FunctionSearchDesc;
27+
import org.apache.doris.common.AnalysisException;
2628
import org.apache.doris.common.ErrorCode;
2729
import org.apache.doris.common.ErrorReport;
2830
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -71,6 +73,34 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
7173
}
7274
argsDef.analyze();
7375
FunctionSearchDesc function = new FunctionSearchDesc(functionName, argsDef.getArgTypes(), argsDef.isVariadic());
76+
77+
// Get function id before dropping, for cleaning cached library files in BE
78+
long functionId = -1;
79+
try {
80+
Function fn = null;
81+
if (SetType.GLOBAL.equals(setType)) {
82+
fn = Env.getCurrentEnv().getGlobalFunctionMgr().getFunction(function);
83+
} else {
84+
String dbName = functionName.getDb();
85+
if (dbName == null) {
86+
dbName = ctx.getDatabase();
87+
functionName.setDb(dbName);
88+
}
89+
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
90+
if (db != null) {
91+
fn = db.getFunction(function);
92+
}
93+
}
94+
if (fn != null) {
95+
functionId = fn.getId();
96+
} else {
97+
LOG.warn("Function not found: {}, setType: {}", function.getName(), setType);
98+
}
99+
} catch (AnalysisException e) {
100+
LOG.warn("Function not found when getting function id: {}, error: {}",
101+
function.getName(), e.getMessage());
102+
}
103+
74104
if (SetType.GLOBAL.equals(setType)) {
75105
Env.getCurrentEnv().getGlobalFunctionMgr().dropFunction(function, ifExists);
76106
} else {
@@ -90,9 +120,10 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
90120
String functionSignature = getSignatureString();
91121
AgentBatchTask batchTask = new AgentBatchTask();
92122
for (Backend backend : backendsInfo.values()) {
93-
CleanUDFCacheTask cleanUDFCacheTask = new CleanUDFCacheTask(backend.getId(), functionSignature);
123+
CleanUDFCacheTask cleanUDFCacheTask = new CleanUDFCacheTask(backend.getId(), functionSignature, functionId);
94124
batchTask.addTask(cleanUDFCacheTask);
95-
LOG.info("clean udf cache in be {}, beId {}", backend.getHost(), backend.getId());
125+
LOG.info("clean udf cache in be {}, beId {}, functionId {}",
126+
backend.getHost(), backend.getId(), functionId);
96127
}
97128
AgentTaskExecutor.submit(batchTask);
98129
}

fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,18 @@
2727
public class CleanUDFCacheTask extends AgentTask {
2828
private static final Logger LOG = LogManager.getLogger(CleanUDFCacheTask.class);
2929
private String functionSignature;
30+
private long functionId;
3031

31-
public CleanUDFCacheTask(long backendId, String signature) {
32+
public CleanUDFCacheTask(long backendId, String signature, long functionId) {
3233
super(null, backendId, TTaskType.CLEAN_UDF_CACHE, -1, -1, -1, -1, -1, -1, -1);
3334
this.functionSignature = signature;
35+
this.functionId = functionId;
3436
}
3537

3638
public TCleanUDFCacheReq toThrift() {
3739
TCleanUDFCacheReq req = new TCleanUDFCacheReq();
3840
req.setFunctionSignature(this.functionSignature);
41+
req.setFunctionId(this.functionId);
3942
return req;
4043
}
4144
}

gensrc/thrift/AgentService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ struct TCleanTrashReq {}
162162

163163
struct TCleanUDFCacheReq {
164164
1: optional string function_signature //function_name(arg_type)
165+
2: optional i64 function_id // function id for cleaning cached library files
165166
}
166167

167168
enum TCompressionType {

0 commit comments

Comments
 (0)