Skip to content

Commit e9a8870

Browse files
authored
feat(metactl): add lua grpc client functionality (#18438)
Extend lua subcommand with gRPC client capabilities for meta operations: - Add new_grpc_client() function to create gRPC client instances - Implement async get() and upsert() methods with tuple error handling - Export NULL constant to Lua environment for proper null handling - Add comprehensive test suite for gRPC client functionality - Support both stdin and file input modes for Lua scripts The gRPC client returns (result, error) tuples following Lua conventions and converts Rust structures to native Lua tables for better usability. Example usage: ```lua local client = new_grpc_client("127.0.0.1:9191") -- Store a key-value pair local result, err = client:upsert("my_key", "my_value") if err then print("Error:", err) else print("Stored:", to_string(result)) end -- Retrieve the value local value, err = client:get("my_key") if err then print("Error:", err) else print("Retrieved:", to_string(value)) end ``` Run with: `databend-metactl lua --file script.lua` Or pipe: `echo 'script content' | databend-metactl lua`
1 parent 8bd17f7 commit e9a8870

File tree

6 files changed

+394
-2
lines changed

6 files changed

+394
-2
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ match-template = "0.0.1"
387387
md-5 = "0.10.5"
388388
memchr = { version = "2", default-features = false }
389389
micromarshal = "0.7.0"
390-
mlua = { version = "0.11", features = ["lua54", "vendored"] }
390+
mlua = { version = "0.11", features = ["lua54", "vendored", "async", "serialize"] }
391391
mockall = "0.11.2"
392392
mysql_async = { version = "0.34", default-features = false, features = ["native-tls-tls"] }
393393
naive-cityhash = "0.2.0"

src/meta/binaries/metactl/main.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ use databend_meta::version::METASRV_COMMIT_VERSION;
5454
use display_more::DisplayOptionExt;
5555
use futures::stream::TryStreamExt;
5656
use mlua::Lua;
57+
use mlua::LuaSerdeExt;
58+
use mlua::UserData;
59+
use mlua::UserDataMethods;
60+
use mlua::Value;
5761
use serde::Deserialize;
5862

5963
#[derive(Debug, Deserialize, Parser)]
@@ -233,6 +237,34 @@ impl App {
233237
async fn run_lua(&self, args: &LuaArgs) -> anyhow::Result<()> {
234238
let lua = Lua::new();
235239

240+
// Register new_grpc_client function
241+
let new_grpc_client = lua
242+
.create_function(|_lua, address: String| {
243+
let client = MetaGrpcClient::try_create(
244+
vec![address],
245+
"root",
246+
"xxx",
247+
Some(Duration::from_secs(2)),
248+
Some(Duration::from_secs(1)),
249+
None,
250+
)
251+
.map_err(|e| {
252+
mlua::Error::external(format!("Failed to create gRPC client: {}", e))
253+
})?;
254+
255+
Ok(LuaGrpcClient::new(client))
256+
})
257+
.map_err(|e| anyhow::anyhow!("Failed to create new_grpc_client function: {}", e))?;
258+
259+
lua.globals()
260+
.set("new_grpc_client", new_grpc_client)
261+
.map_err(|e| anyhow::anyhow!("Failed to register new_grpc_client: {}", e))?;
262+
263+
// Export NULL constant to Lua environment
264+
lua.globals()
265+
.set("NULL", Value::NULL)
266+
.map_err(|e| anyhow::anyhow!("Failed to register NULL constant: {}", e))?;
267+
236268
let script = match &args.file {
237269
Some(path) => std::fs::read_to_string(path)?,
238270
None => {
@@ -242,7 +274,7 @@ impl App {
242274
}
243275
};
244276

245-
if let Err(e) = lua.load(&script).exec() {
277+
if let Err(e) = lua.load(&script).exec_async().await {
246278
return Err(anyhow::anyhow!("Lua execution error: {}", e));
247279
}
248280
Ok(())
@@ -264,6 +296,44 @@ impl App {
264296
}
265297
}
266298

299+
struct LuaGrpcClient {
300+
client: Arc<ClientHandle>,
301+
}
302+
303+
impl LuaGrpcClient {
304+
fn new(client: Arc<ClientHandle>) -> Self {
305+
Self { client }
306+
}
307+
}
308+
309+
impl UserData for LuaGrpcClient {
310+
fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
311+
methods.add_async_method("get", |lua, this, key: String| async move {
312+
match this.client.get_kv(&key).await {
313+
Ok(result) => match lua.to_value(&result) {
314+
Ok(lua_value) => Ok((Some(lua_value), None::<String>)),
315+
Err(e) => Ok((None::<Value>, Some(format!("Lua conversion error: {}", e)))),
316+
},
317+
Err(e) => Ok((None::<Value>, Some(format!("gRPC error: {}", e)))),
318+
}
319+
});
320+
321+
methods.add_async_method(
322+
"upsert",
323+
|lua, this, (key, value): (String, String)| async move {
324+
let upsert = UpsertKV::update(key, value.as_bytes());
325+
match this.client.request(upsert).await {
326+
Ok(result) => match lua.to_value(&result) {
327+
Ok(lua_value) => Ok((Some(lua_value), None::<String>)),
328+
Err(e) => Ok((None::<Value>, Some(format!("Lua conversion error: {}", e)))),
329+
},
330+
Err(e) => Ok((None::<Value>, Some(format!("gRPC error: {}", e)))),
331+
}
332+
},
333+
);
334+
}
335+
}
336+
267337
#[derive(Debug, Clone, Deserialize, Subcommand)]
268338
enum CtlCommand {
269339
Status(StatusArgs),

tests/metactl/lua_util.lua

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
-- Function to check if a value is NULL (mlua NULL constant)
2+
function is_null(value)
3+
return value == NULL
4+
end
5+
6+
-- Function to normalize NULL values to nil for easier handling
7+
function normalize_value(value)
8+
if is_null(value) then
9+
return nil
10+
end
11+
return value
12+
end
13+
14+
-- Function to check if a table represents a bytes vector and convert it to string
15+
function bytes_vector_to_string(value)
16+
-- Check if table represents a bytes vector (integer indices starting from 1, u8 values)
17+
local max_index = 0
18+
local min_index = math.huge
19+
20+
-- First pass: check if all keys are integers and find range
21+
for k, v in pairs(value) do
22+
if type(k) ~= "number" or k ~= math.floor(k) or k < 1 then
23+
return nil -- Not a bytes vector
24+
end
25+
if type(v) ~= "number" or v ~= math.floor(v) or v < 0 or v > 255 then
26+
return nil -- Not u8 values
27+
end
28+
max_index = math.max(max_index, k)
29+
min_index = math.min(min_index, k)
30+
end
31+
32+
-- Check if indices are consecutive starting from 1
33+
if min_index == 1 then
34+
local expected_length = max_index
35+
local actual_length = 0
36+
for _ in pairs(value) do
37+
actual_length = actual_length + 1
38+
end
39+
if actual_length == expected_length then
40+
-- Check size limit to prevent memory exhaustion (max 1MB)
41+
if max_index > 1048576 then
42+
return '"<bytes vector too large: ' .. max_index .. ' bytes>"'
43+
end
44+
45+
-- Convert bytes vector to string
46+
local chars = {}
47+
for i = 1, max_index do
48+
chars[i] = string.char(value[i])
49+
end
50+
return '"' .. table.concat(chars) .. '"'
51+
end
52+
end
53+
54+
return nil -- Not a valid bytes vector
55+
end
56+
57+
-- Function to convert a value or table into a single-line string recursively
58+
function to_string(value)
59+
if value == nil then
60+
return "nil"
61+
end
62+
63+
if is_null(value) then
64+
return "NULL"
65+
end
66+
67+
if type(value) == "boolean" then
68+
return tostring(value)
69+
end
70+
71+
if type(value) == "number" then
72+
return tostring(value)
73+
end
74+
75+
if type(value) == "string" then
76+
-- Escape quotes in the string
77+
return '"' .. string.gsub(value, '"', '\\"') .. '"'
78+
end
79+
80+
if type(value) == "table" then
81+
-- Check if it's a bytes vector first
82+
local bytes_string = bytes_vector_to_string(value)
83+
if bytes_string then
84+
return bytes_string
85+
end
86+
87+
-- Regular table processing
88+
local result = "{"
89+
local keys = {}
90+
91+
-- Collect all keys
92+
for k in pairs(value) do
93+
table.insert(keys, k)
94+
end
95+
96+
-- Sort keys if they're all strings or numbers
97+
local can_sort = true
98+
for _, k in ipairs(keys) do
99+
if type(k) ~= "string" and type(k) ~= "number" then
100+
can_sort = false
101+
break
102+
end
103+
end
104+
105+
if can_sort then
106+
table.sort(keys, function(a, b)
107+
if type(a) == type(b) then
108+
return a < b
109+
else
110+
-- Put numbers before strings
111+
return type(a) == "number"
112+
end
113+
end)
114+
end
115+
116+
-- Process each key-value pair
117+
local first = true
118+
for _, k in ipairs(keys) do
119+
local v = value[k]
120+
121+
-- Skip nil and NULL values
122+
if v ~= nil and not is_null(v) then
123+
if not first then
124+
result = result .. ","
125+
end
126+
first = false
127+
128+
local key_str
129+
if type(k) == "string" then
130+
key_str = '"' .. string.gsub(k, '"', '\\"') .. '"'
131+
else
132+
key_str = "[" .. tostring(k) .. "]"
133+
end
134+
135+
result = result .. key_str .. "=" .. to_string(v)
136+
end
137+
end
138+
139+
result = result .. "}"
140+
return result
141+
end
142+
143+
-- Handle function, userdata, thread
144+
return "<" .. type(value) .. ">"
145+
end

0 commit comments

Comments
 (0)