Skip to content

Commit 791b792

Browse files
enjoy-binbinrjd15372
authored andcommitted
FUNCTION FLUSH re-create lua VM, fix flush not gc, fix flush async + load crash (#1826)
There will be two issues in this test: ``` test {FUNCTION - test function flush} { for {set i 0} {$i < 10000} {incr i} { r function load [get_function_code LUA test_$i test_$i {return 'hello'}] } set before_flush_memory [s used_memory_vm_functions] r function flush sync set after_flush_memory [s used_memory_vm_functions] puts "flush sync, before_flush_memory: $before_flush_memory, after_flush_memory: $after_flush_memory" for {set i 0} {$i < 10000} {incr i} { r function load [get_function_code LUA test_$i test_$i {return 'hello'}] } set before_flush_memory [s used_memory_vm_functions] r function flush async set after_flush_memory [s used_memory_vm_functions] puts "flush async, before_flush_memory: $before_flush_memory, after_flush_memory: $after_flush_memory" for {set i 0} {$i < 10000} {incr i} { r function load [get_function_code LUA test_$i test_$i {return 'hello'}] } puts "Test done" } ``` The first one is the test output, we can see that after executing FUNCTION FLUSH, used_memory_vm_functions has not changed at all: ``` flush sync, before_flush_memory: 2962432, after_flush_memory: 2962432 flush async, before_flush_memory: 4504576, after_flush_memory: 4504576 ``` The second one is there is a crash when loading the functions during the async flush: ``` === VALKEY BUG REPORT START: Cut & paste starting from here === # valkey 255.255.255 crashed by signal: 11, si_code: 2 # Accessing address: 0xe0429b7100000a3c # Crashed running the instruction at: 0x102e0b09c ------ STACK TRACE ------ EIP: 0 valkey-server 0x0000000102e0b09c luaH_getstr + 52 Backtrace: 0 libsystem_platform.dylib 0x000000018b066584 _sigtramp + 56 1 valkey-server 0x0000000102e01054 luaD_precall + 96 2 valkey-server 0x0000000102e01b10 luaD_call + 104 3 valkey-server 0x0000000102e00d1c luaD_rawrunprotected + 76 4 valkey-server 0x0000000102e01e3c luaD_pcall + 60 5 valkey-server 0x0000000102dfc630 lua_pcall + 300 6 valkey-server 0x0000000102f77770 luaEngineCompileCode + 708 7 valkey-server 0x0000000102f71f50 scriptingEngineCallCompileCode + 104 8 valkey-server 0x0000000102f700b0 functionsCreateWithLibraryCtx + 2088 9 valkey-server 0x0000000102f70898 functionLoadCommand + 312 10 valkey-server 0x0000000102e3978c call + 416 11 valkey-server 0x0000000102e3b5b8 processCommand + 3340 12 valkey-server 0x0000000102e563cc processInputBuffer + 520 13 valkey-server 0x0000000102e55808 readQueryFromClient + 92 14 valkey-server 0x0000000102f696e0 connSocketEventHandler + 180 15 valkey-server 0x0000000102e20480 aeProcessEvents + 372 16 valkey-server 0x0000000102e4aad0 main + 26412 17 dyld 0x000000018acab154 start + 2476 ------ STACK TRACE DONE ------ ``` The reason is that, in the old implementation (introduced in 7.0), FUNCTION FLUSH use lua_unref to remove the script from lua VM. lua_unref does not trigger the gc, it causes us to not be able to effectively reclaim memory after the FUNCTION FLUSH. The other issue is that, since we don't re-create the lua VM in FUNCTION FLUSH, loading the functions during a FUNCTION FLUSH ASYNC will result a crash because lua engine state is not thread-safe. The correct solution is to re-create a new Lua VM to use, just like SCRIPT FLUSH. --------- Signed-off-by: Binbin <[email protected]> Signed-off-by: Ricardo Dias <[email protected]> Co-authored-by: Ricardo Dias <[email protected]> (cherry picked from commit b4c93cc) Signed-off-by: cherukum-amazon <[email protected]>
1 parent 11f47a1 commit 791b792

File tree

17 files changed

+278
-127
lines changed

17 files changed

+278
-127
lines changed

src/db.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
687687
if (with_functions) {
688688
serverAssert(dbnum == -1);
689689
/* TODO: fix this callback incompatibility. The arg is not used. */
690-
functionsLibCtxClearCurrent(async, (void (*)(dict *))callback);
690+
functionReset(async, (void (*)(dict *))callback);
691691
}
692692

693693
/* Also fire the end event. Note that this event will fire almost

src/eval.c

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac
146146
listIter *iter = listGetIterator(engine_callbacks, 0);
147147
listNode *node = NULL;
148148
while ((node = listNext(iter)) != NULL) {
149-
callableLazyEvalReset *callback = listNodeValue(node);
149+
callableLazyEnvReset *callback = listNodeValue(node);
150150
if (callback != NULL) {
151-
callback->engineLazyEvalResetCallback(callback->context);
151+
callback->engineLazyEnvResetCallback(callback->context);
152152
zfree(callback);
153153
}
154154
}
@@ -159,7 +159,7 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac
159159

160160
static void resetEngineEvalEnvCallback(scriptingEngine *engine, void *context) {
161161
int async = context != NULL;
162-
callableLazyEvalReset *callback = scriptingEngineCallResetEvalEnvFunc(engine, async);
162+
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_EVAL, async);
163163

164164
if (async) {
165165
list *callbacks = context;
@@ -174,7 +174,6 @@ void evalRelease(int async) {
174174
list *engine_callbacks = listCreate();
175175
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, engine_callbacks);
176176
freeEvalScriptsAsync(evalCtx.scripts, evalCtx.scripts_lru_list, engine_callbacks);
177-
178177
} else {
179178
freeEvalScripts(evalCtx.scripts, evalCtx.scripts_lru_list, NULL);
180179
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, NULL);

src/functions.c

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,32 +165,62 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) {
165165
lib_ctx->cache_memory = 0;
166166
}
167167

168-
void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) {
168+
static void resetEngineOrCollectResetCallbacks(scriptingEngine *engine, void *context) {
169+
int async = context != NULL;
170+
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_FUNCTION, async);
171+
172+
if (async) {
173+
list *callbacks = context;
174+
listAddNodeTail(callbacks, callback);
175+
}
176+
}
177+
178+
void functionsLibCtxReleaseCurrent(int async, void(callback)(dict *)) {
169179
if (async) {
170-
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
171-
curr_functions_lib_ctx = functionsLibCtxCreate();
172-
freeFunctionsAsync(old_l_ctx);
180+
list *engine_callbacks = listCreate();
181+
scriptingEngineManagerForEachEngine(resetEngineOrCollectResetCallbacks, engine_callbacks);
182+
freeFunctionsAsync(curr_functions_lib_ctx, engine_callbacks);
173183
} else {
174-
functionsLibCtxClear(curr_functions_lib_ctx, callback);
184+
functionsLibCtxFree(curr_functions_lib_ctx, callback, NULL);
185+
scriptingEngineManagerForEachEngine(resetEngineOrCollectResetCallbacks, NULL);
175186
}
176187
}
177188

178189
/* Free the given functions ctx */
179190
static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int async) {
180191
if (async) {
181-
freeFunctionsAsync(functions_lib_ctx);
192+
freeFunctionsAsync(functions_lib_ctx, NULL);
182193
} else {
183-
functionsLibCtxFree(functions_lib_ctx);
194+
functionsLibCtxFree(functions_lib_ctx, NULL, NULL);
184195
}
185196
}
186197

198+
void functionReset(int async, void(callback)(dict *)) {
199+
functionsLibCtxReleaseCurrent(async, callback);
200+
functionsInit();
201+
}
202+
187203
/* Free the given functions ctx */
188-
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
189-
functionsLibCtxClear(functions_lib_ctx, NULL);
204+
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks) {
205+
functionsLibCtxClear(functions_lib_ctx, callback);
190206
dictRelease(functions_lib_ctx->functions);
191207
dictRelease(functions_lib_ctx->libraries);
192208
dictRelease(functions_lib_ctx->engines_stats);
193209
zfree(functions_lib_ctx);
210+
211+
if (engine_callbacks) {
212+
listIter *iter = listGetIterator(engine_callbacks, 0);
213+
listNode *node = NULL;
214+
while ((node = listNext(iter)) != NULL) {
215+
callableLazyEnvReset *engine_callback = listNodeValue(node);
216+
if (engine_callback != NULL) {
217+
engine_callback->engineLazyEnvResetCallback(engine_callback->context);
218+
zfree(engine_callback);
219+
}
220+
}
221+
listReleaseIterator(iter);
222+
listRelease(engine_callbacks);
223+
}
194224
}
195225

196226
/* Swap the current functions ctx with the given one.
@@ -824,7 +854,7 @@ void functionFlushCommand(client *c) {
824854
return;
825855
}
826856

827-
functionsLibCtxClearCurrent(async, NULL);
857+
functionReset(async, NULL);
828858

829859
/* Indicate that the command changed the data so it will be replicated and
830860
* counted as a data change (for persistence configuration) */

src/functions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ dict *functionsLibGet(void);
9090
size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
9191
functionsLibCtx *functionsLibCtxGetCurrent(void);
9292
functionsLibCtx *functionsLibCtxCreate(void);
93-
void functionsLibCtxClearCurrent(int async, void(callback)(dict *));
94-
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
93+
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks);
9594
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *));
9695
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);
96+
void functionReset(int async, void(callback)(dict *));
9797

9898
void functionsRemoveLibFromEngine(scriptingEngine *engine);
9999

src/lazyfree.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ void lazyFreeEvalScripts(void *args[]) {
6666
/* Release the functions ctx. */
6767
void lazyFreeFunctionsCtx(void *args[]) {
6868
functionsLibCtx *functions_lib_ctx = args[0];
69+
list *engine_callbacks = args[1];
6970
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
70-
functionsLibCtxFree(functions_lib_ctx);
71+
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
7172
atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed);
7273
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
7374
}
@@ -239,13 +240,13 @@ void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_ca
239240
}
240241

241242
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
242-
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
243+
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, list *engine_callbacks) {
243244
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
244245
atomic_fetch_add_explicit(&lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx),
245246
memory_order_relaxed);
246-
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx);
247+
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 2, functions_lib_ctx, engine_callbacks);
247248
} else {
248-
functionsLibCtxFree(functions_lib_ctx);
249+
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
249250
}
250251
}
251252

src/lua/engine_lua.c

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414
#define LUA_ENGINE_NAME "LUA"
1515
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
1616

17-
typedef struct luaFunction {
18-
lua_State *lua; /* Pointer to the lua context where this function was created. Only used in EVAL context. */
19-
int function_ref; /* Special ID that allows getting the Lua function object from the Lua registry */
20-
} luaFunction;
21-
2217
typedef struct luaEngineCtx {
2318
lua_State *eval_lua; /* The Lua interpreter for EVAL commands. We use just one for all EVAL calls */
2419
lua_State *function_lua; /* The Lua interpreter for FCALL commands. We use just one for all FCALL calls */
@@ -260,17 +255,9 @@ static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
260255
serverAssert(module_ctx == NULL);
261256

262257
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
263-
lua_State *lua = NULL;
264-
int lua_function_ref = -1;
265-
266-
if (type == VMSE_EVAL) {
267-
lua = lua_engine_ctx->eval_lua;
268-
luaFunction *script = compiled_function->function;
269-
lua_function_ref = script->function_ref;
270-
} else {
271-
lua = lua_engine_ctx->function_lua;
272-
lua_function_ref = luaFunctionGetLuaFunctionRef(compiled_function);
273-
}
258+
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
259+
luaFunction *script = compiled_function->function;
260+
int lua_function_ref = script->function_ref;
274261

275262
/* Push the pcall error handler function on the stack. */
276263
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
@@ -290,10 +277,10 @@ static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
290277
lua_pop(lua, 1); /* Remove the error handler. */
291278
}
292279

293-
static void resetEvalContext(void *context) {
294-
lua_State *eval_lua = context;
295-
lua_gc(eval_lua, LUA_GCCOLLECT, 0);
296-
lua_close(eval_lua);
280+
static void resetLuaContext(void *context) {
281+
lua_State *lua = context;
282+
lua_gc(lua, LUA_GCCOLLECT, 0);
283+
lua_close(lua);
297284

298285
#if !defined(USE_LIBC)
299286
/* The lua interpreter may hold a lot of memory internally, and lua is
@@ -308,27 +295,30 @@ static void resetEvalContext(void *context) {
308295
#endif
309296
}
310297

311-
static callableLazyEvalReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
312-
engineCtx *engine_ctx,
313-
int async) {
298+
static callableLazyEnvReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
299+
engineCtx *engine_ctx,
300+
subsystemType type,
301+
int async) {
314302
/* The lua engine is implemented in the core, and not in a Valkey Module */
315303
serverAssert(module_ctx == NULL);
316304

317305
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
318-
serverAssert(lua_engine_ctx->eval_lua);
319-
callableLazyEvalReset *callback = NULL;
306+
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
307+
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
308+
serverAssert(lua);
309+
callableLazyEnvReset *callback = NULL;
320310

321311
if (async) {
322312
callback = zcalloc(sizeof(*callback));
323-
*callback = (callableLazyEvalReset){
324-
.context = lua_engine_ctx->eval_lua,
325-
.engineLazyEvalResetCallback = resetEvalContext,
313+
*callback = (callableLazyEnvReset){
314+
.context = lua,
315+
.engineLazyEnvResetCallback = resetLuaContext,
326316
};
327317
} else {
328-
resetEvalContext(lua_engine_ctx->eval_lua);
318+
resetLuaContext(lua);
329319
}
330320

331-
initializeLuaState(lua_engine_ctx, VMSE_EVAL);
321+
initializeLuaState(lua_engine_ctx, type);
332322

333323
return callback;
334324
}
@@ -350,21 +340,21 @@ static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
350340
compiledFunction *compiled_function) {
351341
/* The lua engine is implemented in the core, and not in a Valkey Module */
352342
serverAssert(module_ctx == NULL);
343+
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
353344

354345
luaEngineCtx *lua_engine_ctx = engine_ctx;
355-
if (type == VMSE_EVAL) {
356-
luaFunction *script = (luaFunction *)compiled_function->function;
357-
if (lua_engine_ctx->eval_lua == script->lua) {
358-
/* The lua context is still the same, which means that we're not
359-
* resetting the whole eval context, and therefore, we need to
360-
* delete the function from the lua context.
361-
*/
362-
lua_unref(lua_engine_ctx->eval_lua, script->function_ref);
363-
}
364-
zfree(script);
365-
} else {
366-
luaFunctionFreeFunction(lua_engine_ctx->function_lua, compiled_function->function);
346+
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
347+
serverAssert(lua);
348+
349+
luaFunction *script = (luaFunction *)compiled_function->function;
350+
if (lua == script->lua) {
351+
/* The lua context is still the same, which means that we're not
352+
* resetting the whole eval context, and therefore, we need to
353+
* delete the function from the lua context.
354+
*/
355+
lua_unref(lua, script->function_ref);
367356
}
357+
zfree(script);
368358

369359
if (compiled_function->name) {
370360
decrRefCount(compiled_function->name);
@@ -379,11 +369,12 @@ int luaEngineInitEngine(void) {
379369
ldbInit();
380370

381371
engineMethods methods = {
372+
.version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION,
382373
.compile_code = luaEngineCompileCode,
383374
.free_function = luaEngineFreeFunction,
384375
.call_function = luaEngineFunctionCall,
385376
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
386-
.reset_eval_env = luaEngineResetEvalEnv,
377+
.reset_env = luaEngineResetEvalEnv,
387378
.get_memory_info = luaEngineGetMemoryInfo,
388379
};
389380

src/lua/engine_lua.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
#include "../scripting_engine.h"
55
#include <lua.h>
66

7+
typedef struct luaFunction {
8+
lua_State *lua; /* Pointer to the lua context where this function was created. Only used in EVAL context. */
9+
int function_ref; /* Special ID that allows getting the Lua function object from the Lua registry */
10+
} luaFunction;
11+
712
int luaEngineInitEngine(void);
813

914
#endif /* _ENGINE_LUA_ */

0 commit comments

Comments
 (0)