Skip to content

Commit a2c6d0a

Browse files
authored
feat(meta): add async Lua spawn and sleep support with tests (#18445)
Add comprehensive async functionality to Lua environment: - Implement async spawn() function using tokio::task::spawn_local - Add async sleep() function with tokio::time::sleep - Create LuaTask wrapper with proper join() method - Use LocalSet for async Lua script execution in metactl Test coverage includes: - Basic spawn functionality with concurrent tasks - Cross-task gRPC client operations - Sleep timing verification - Dynamic test discovery system This enables concurrent Lua script execution within the async runtime for improved performance and capability.
1 parent cde8b4d commit a2c6d0a

File tree

9 files changed

+329
-104
lines changed

9 files changed

+329
-104
lines changed

src/meta/binaries/metactl/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,11 @@ impl App {
245245
}
246246
};
247247

248-
if let Err(e) = lua.load(&script).exec_async().await {
248+
#[allow(clippy::disallowed_types)]
249+
let local = tokio::task::LocalSet::new();
250+
let res = local.run_until(lua.load(&script).exec_async()).await;
251+
252+
if let Err(e) = res {
249253
return Err(anyhow::anyhow!("Lua execution error: {}", e));
250254
}
251255
Ok(())

src/meta/control/src/lua_support.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cell::RefCell;
16+
use std::rc::Rc;
1517
use std::sync::Arc;
1618
use std::time::Duration;
1719

@@ -25,6 +27,7 @@ use mlua::LuaSerdeExt;
2527
use mlua::UserData;
2628
use mlua::UserDataMethods;
2729
use mlua::Value;
30+
use tokio::time;
2831

2932
pub struct LuaGrpcClient {
3033
client: Arc<ClientHandle>,
@@ -64,6 +67,31 @@ impl UserData for LuaGrpcClient {
6467
}
6568
}
6669

70+
pub struct LuaTask {
71+
handle: Rc<RefCell<Option<tokio::task::JoinHandle<mlua::Value>>>>,
72+
}
73+
74+
impl UserData for LuaTask {
75+
fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
76+
methods.add_async_method("join", |_lua, this, ()| async move {
77+
let handle_opt = this.handle.borrow_mut().take();
78+
match handle_opt {
79+
Some(handle) => match handle.await {
80+
Ok(result) => Ok(result),
81+
Err(e) => {
82+
eprintln!("Join error: {}", e);
83+
Ok(mlua::Value::Nil)
84+
}
85+
},
86+
None => {
87+
eprintln!("Handle already consumed - task was already awaited");
88+
Ok(mlua::Value::Nil)
89+
}
90+
}
91+
});
92+
}
93+
}
94+
6795
pub fn setup_lua_environment(lua: &Lua) -> anyhow::Result<()> {
6896
// Register new_grpc_client function
6997
let new_grpc_client = lua
@@ -91,6 +119,43 @@ pub fn setup_lua_environment(lua: &Lua) -> anyhow::Result<()> {
91119
.set("NULL", Value::NULL)
92120
.map_err(|e| anyhow::anyhow!("Failed to register NULL constant: {}", e))?;
93121

122+
// Register spawn function that delegates to tokio::task::spawn_local
123+
let spawn_fn = lua
124+
.create_function(|_lua, func: mlua::Function| {
125+
#[allow(clippy::disallowed_methods)]
126+
let handle = tokio::task::spawn_local(async move {
127+
match func.call_async::<mlua::Value>(()).await {
128+
Ok(result) => result,
129+
Err(e) => {
130+
eprintln!("Spawned task error: {}", e);
131+
mlua::Value::Nil
132+
}
133+
}
134+
});
135+
136+
Ok(LuaTask {
137+
handle: Rc::new(RefCell::new(Some(handle))),
138+
})
139+
})
140+
.map_err(|e| anyhow::anyhow!("Failed to create spawn function: {}", e))?;
141+
142+
lua.globals()
143+
.set("spawn", spawn_fn)
144+
.map_err(|e| anyhow::anyhow!("Failed to register spawn function: {}", e))?;
145+
146+
// Register async sleep function
147+
let sleep_fn = lua
148+
.create_async_function(|_lua, seconds: f64| async move {
149+
let duration = Duration::from_secs_f64(seconds);
150+
time::sleep(duration).await;
151+
Ok(())
152+
})
153+
.map_err(|e| anyhow::anyhow!("Failed to create sleep function: {}", e))?;
154+
155+
lua.globals()
156+
.set("sleep", sleep_fn)
157+
.map_err(|e| anyhow::anyhow!("Failed to register sleep function: {}", e))?;
158+
94159
Ok(())
95160
}
96161

tests/metactl/metactl_utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python3
22

3+
import sys
34
import json
45
import subprocess
56
import time
@@ -11,6 +12,34 @@
1112
metactl_bin = f"./target/{BUILD_PROFILE}/databend-metactl"
1213

1314

15+
def load_lua_util():
16+
"""Load lua utility functions from lua_util.lua file."""
17+
with open("tests/metactl/lua_util.lua", 'r') as f:
18+
return f.read()
19+
20+
21+
def metactl_run_lua(lua_script=None, lua_filename=None):
22+
cmds = [
23+
metactl_bin, "lua"
24+
]
25+
26+
if lua_filename:
27+
cmds += [lua_filename]
28+
print("metactl_run_lua from_file:", lua_filename, file=sys.stderr)
29+
else:
30+
print("metactl_run_lua from_stdin:", lua_script, file=sys.stderr)
31+
32+
result = subprocess.run(cmds, input=lua_script, capture_output=True, text=True)
33+
34+
print("metactl_run_lua result:", result, file=sys.stderr)
35+
36+
if result.returncode != 0:
37+
raise Exception(result)
38+
39+
output = result.stdout.strip()
40+
return output
41+
42+
1443
def metactl_upsert(grpc_addr, key, value):
1544
"""Upsert a key-value pair using the upsert subcommand."""
1645
result = run_command([

tests/metactl/subcommands/cmd_lua_grpc.py

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -84,77 +84,9 @@ def test_lua_grpc_client():
8484
shutil.rmtree(".databend", ignore_errors=True)
8585

8686

87-
def test_lua_grpc_from_file():
88-
"""Test lua subcommand with gRPC client using file input."""
89-
print_title("Test lua subcommand with gRPC client from file")
90-
91-
# Setup meta service
92-
grpc_addr = setup_test_environment()
93-
94-
lua_util_str = load_lua_util()
95-
96-
# Create a temporary Lua script file
97-
with tempfile.NamedTemporaryFile(mode='w', suffix='.lua', delete=False) as f:
98-
f.write(f'''
99-
{lua_util_str}
100-
101-
local client = new_grpc_client("{grpc_addr}")
102-
103-
-- Upsert multiple key-value pairs
104-
local upsert1_result, upsert1_err = client:upsert("key1", "value1")
105-
if upsert1_err then
106-
print("Key1 upsert error:", upsert1_err)
107-
end
108-
109-
local upsert2_result, upsert2_err = client:upsert("key2", "value2")
110-
if upsert2_err then
111-
print("Key2 upsert error:", upsert2_err)
112-
end
113-
114-
-- Get and print the values
115-
local result1, err1 = client:get("key1")
116-
if err1 then
117-
print("Key1 error:", err1)
118-
else
119-
print("Key1 result:", to_string(result1))
120-
end
121-
122-
local result2, err2 = client:get("key2")
123-
if err2 then
124-
print("Key2 error:", err2)
125-
else
126-
print("Key2 result:", to_string(result2))
127-
end
128-
''')
129-
lua_file = f.name
130-
131-
# Run metactl lua with file
132-
result = subprocess.run([
133-
metactl_bin, "lua",
134-
"--file", lua_file
135-
], capture_output=True, text=True, check=True)
136-
137-
output = result.stdout.strip()
138-
print("output:", output)
139-
140-
expected_output = '''Key1 result:\t{"data"="value1","seq"=1}
141-
Key2 result:\t{"data"="value2","seq"=2}'''
142-
143-
# Check if entire output matches expected value
144-
assert output == expected_output, f"Expected:\n{expected_output}\n\nActual:\n{output}"
145-
146-
print("✓ Lua gRPC client file test passed")
147-
148-
# Only cleanup on success
149-
os.unlink(lua_file)
150-
kill_databend_meta()
151-
shutil.rmtree(".databend", ignore_errors=True)
152-
153-
15487
def main():
15588
"""Main function to run all lua gRPC tests."""
15689
test_lua_grpc_client()
157-
test_lua_grpc_from_file()
15890

15991

16092
if __name__ == "__main__":
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env python3
2+
3+
import subprocess
4+
import time
5+
from metactl_utils import metactl_bin
6+
from utils import print_title
7+
8+
def test_lua_sleep():
9+
"""Test sleep functionality."""
10+
print_title("Test sleep functionality")
11+
12+
lua_script = '''
13+
print("Before sleep")
14+
sleep(0.5)
15+
print("After sleep")
16+
'''
17+
18+
start_time = time.time()
19+
20+
result = subprocess.run([
21+
metactl_bin, "lua"
22+
], input=lua_script, capture_output=True, text=True, check=True)
23+
24+
execution_time = time.time() - start_time
25+
26+
expected_output = "Before sleep\nAfter sleep"
27+
assert result.stdout.strip() == expected_output
28+
29+
# Verify timing
30+
assert execution_time >= 0.5, f"Expected >= 0.5s, got {execution_time:.2f}s"
31+
assert execution_time < 2.0, f"Expected < 2.0s, got {execution_time:.2f}s"
32+
33+
print(f"✓ Sleep test passed ({execution_time:.2f}s)")
34+
35+
36+
def main():
37+
"""Main function to run lua sleep test."""
38+
test_lua_sleep()
39+
40+
41+
if __name__ == "__main__":
42+
main()
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#!/usr/bin/env python3
2+
3+
import subprocess
4+
import tempfile
5+
import os
6+
import shutil
7+
from metactl_utils import metactl_bin, load_lua_util, metactl_run_lua
8+
from utils import print_title, kill_databend_meta, start_meta_node
9+
10+
11+
def test_spawn_basic():
12+
"""Test basic spawn functionality without gRPC."""
13+
print_title("Test basic spawn functionality")
14+
15+
lua_util_str = load_lua_util()
16+
17+
lua_script = f'''
18+
{lua_util_str}
19+
20+
print("Testing basic spawn functionality...")
21+
22+
local task1 = spawn(function()
23+
print("Task 1: Starting")
24+
sleep(0.1)
25+
print("Task 1: Finished")
26+
end)
27+
28+
local task2 = spawn(function()
29+
print("Task 2: Starting")
30+
sleep(0.2)
31+
print("Task 2: Finished")
32+
end)
33+
34+
print("Created both tasks, now joining...")
35+
36+
task1:join()
37+
print("Task 1 joined")
38+
39+
task2:join()
40+
print("Task 2 joined")
41+
42+
print("All tasks completed!")
43+
'''
44+
45+
output = metactl_run_lua(lua_script=lua_script)
46+
47+
expected_phrases = [
48+
"Testing basic spawn functionality...",
49+
"Task 1: Starting",
50+
"Task 2: Starting",
51+
"Created both tasks, now joining...",
52+
"Task 1: Finished",
53+
"Task 1 joined",
54+
"Task 2: Finished",
55+
"Task 2 joined",
56+
"All tasks completed!"
57+
]
58+
59+
for phrase in expected_phrases:
60+
assert phrase in output, f"Expected phrase '{phrase}' not found in output:\n{output}"
61+
print("✓ Basic spawn functionality test passed")
62+
63+
64+
def main():
65+
"""Test spawn functionality with multiple concurrent tasks."""
66+
test_spawn_basic()
67+
68+
69+
if __name__ == "__main__":
70+
main()

0 commit comments

Comments
 (0)