Skip to content

Commit 5fe44d8

Browse files
committed
box: implement Lua call/eval over iproto for application threads
This commit lets a network client specify a thread id to forward a call/eval request to. To achieve that, we have to initialize a minimal Lua state in each application threads. We do that by making the tarantool_L global variable thread-local and factoring out tarantool_lua_init_minimal() from tarantool_lua_init() for use in application threads. The minimal initialization includes setting ctype ids for all our custom Lua types, such as decimal and datetime, because we need them for MsgPack serialization. These ctype ids are made thread-local as well. The only exception is tuple because tuple formats can't be used in any thread except tx. We explicitly override the tuple decoder to return a plain table instead of a tuple object if used in a non-tx thread, see luamp_decode_extension_box(). To safely use the box Lua call/eval infrastructure in any thread, we have to fix a few things: 1. execute_lua_refs and call_serializer_no_ext have to be made thread-local because they depend on the Lua state. 2. Apart from setting execute_lua_refs and call_serializer_no_ext, box_lua_call_init() registers an internal box library and sets a trigger on alter of the _func system space. Apparently, we can't do that in application threads. 3. get_call_serializer() uses current_session(), which creates a new session object if the current fiber doesn't have a session. Sessions aren't thread-safe and we don't set them for fibers running in application threads so we switch to fiber_get_session(). We also add an explicit check that this is an IPROTO session because for other sessions IPROTO feature bits are undefined. The latter is merely for code clarity because Lua call/eval are executed only from IPROTO. This commit completes the core part of the application threads infrastructure. Closes #12259 NO_DOC=internal NO_CHANGELOG=internal
1 parent 4cf2b86 commit 5fe44d8

23 files changed

+402
-91
lines changed

src/box/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ set(box_sources
297297
lua/key_def.c
298298
lua/merger.c
299299
lua/watcher.c
300+
lua/app_threads.c
300301
lua/iproto.c
301302
lua/func_adapter.c
302303
lua/tuple_format.c

src/box/app_threads.c

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@
88
#include <assert.h>
99
#include <limits.h>
1010
#include <stddef.h>
11+
#include <stdint.h>
1112
#include <stdlib.h>
1213
#include <stdio.h>
1314

1415
#include "diag.h"
1516
#include "fiber.h"
1617
#include "fiber_pool.h"
18+
#include "msgpuck.h"
19+
#include "port.h"
1720
#include "say.h"
1821
#include "tarantool_ev.h"
1922
#include "trivia/util.h"
23+
#include "xrow.h"
24+
25+
#include "lua/app_threads.h"
2026

2127
int app_thread_count;
2228

@@ -27,11 +33,13 @@ static void *
2733
app_thread_f(void *unused)
2834
{
2935
(void)unused;
36+
app_thread_lua_init();
3037
struct fiber_pool fiber_pool;
3138
fiber_pool_create(&fiber_pool, cord_name(cord()), INT_MAX,
3239
FIBER_POOL_IDLE_TIMEOUT);
3340
ev_run(loop(), 0);
3441
fiber_pool_destroy(&fiber_pool);
42+
app_thread_lua_free();
3543
return NULL;
3644
}
3745

@@ -68,3 +76,29 @@ app_threads_stop(void)
6876
app_thread_cords = NULL;
6977
app_thread_count = 0;
7078
}
79+
80+
int
81+
app_thread_process_call(struct call_request *request, struct port *port)
82+
{
83+
const char *name = request->name;
84+
uint32_t name_len = mp_decode_strl(&name);
85+
struct port args;
86+
port_msgpack_create(&args, request->args,
87+
request->args_end - request->args);
88+
int rc = app_thread_lua_call(name, name_len, &args, port);
89+
port_msgpack_destroy(&args);
90+
return rc;
91+
}
92+
93+
int
94+
app_thread_process_eval(struct call_request *request, struct port *port)
95+
{
96+
const char *expr = request->expr;
97+
uint32_t expr_len = mp_decode_strl(&expr);
98+
struct port args;
99+
port_msgpack_create(&args, request->args,
100+
request->args_end - request->args);
101+
int rc = app_thread_lua_eval(expr, expr_len, &args, port);
102+
port_msgpack_destroy(&args);
103+
return rc;
104+
}

src/box/app_threads.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
extern "C" {
1010
#endif /* defined(__cplusplus) */
1111

12+
struct call_request;
13+
struct port;
14+
1215
enum {
1316
APP_THREADS_MAX = 1000,
1417
};
@@ -32,6 +35,18 @@ app_threads_start(int thread_count);
3235
void
3336
app_threads_stop(void);
3437

38+
/**
39+
* Processes CALL request in this application thread.
40+
*/
41+
int
42+
app_thread_process_call(struct call_request *request, struct port *port);
43+
44+
/**
45+
* Processes EVAL request in this application thread.
46+
*/
47+
int
48+
app_thread_process_eval(struct call_request *request, struct port *port);
49+
3550
#if defined(__cplusplus)
3651
} /* extern "C" */
3752
#endif /* defined(__cplusplus) */

src/box/iproto.cc

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ struct iproto_thread {
163163
* Used for returning request results.
164164
*/
165165
struct cpipe ret_pipe;
166+
/** CALL/EVAL route. */
167+
struct cmsg_hop call_route[2];
166168
/** Route used to destroy an IPROTO connection. */
167169
struct cmsg_hop destroy_route[2];
168170
} *srv;
@@ -177,7 +179,6 @@ struct iproto_thread {
177179
struct cmsg_hop rollback_on_disconnect_route[2];
178180
struct cmsg_hop disconnect_route[2];
179181
struct cmsg_hop misc_route[2];
180-
struct cmsg_hop call_route[2];
181182
struct cmsg_hop select_route[2];
182183
struct cmsg_hop process1_route[2];
183184
struct cmsg_hop sql_route[2];
@@ -1940,7 +1941,8 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
19401941
diag_set(ClientError, ER_NO_SUCH_THREAD, thread_id);
19411942
goto error;
19421943
}
1943-
if (thread_id != XROW_THREAD_UNSPEC && thread_id != 0) {
1944+
if (thread_id != XROW_THREAD_UNSPEC && thread_id != 0 &&
1945+
type != IPROTO_CALL && type != IPROTO_EVAL) {
19441946
diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_THREAD,
19451947
iproto_type_name(type), thread_id);
19461948
goto error;
@@ -1990,6 +1992,9 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
19901992
return;
19911993
}
19921994

1995+
if (thread_id != XROW_THREAD_UNSPEC)
1996+
msg->srv_id = thread_id;
1997+
19931998
rc = iproto_msg_decode(msg, &route);
19941999
if (rc == 0) {
19952000
assert(route != NULL);
@@ -2054,7 +2059,7 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
20542059
case IPROTO_CALL_16:
20552060
case IPROTO_CALL:
20562061
case IPROTO_EVAL:
2057-
*route = iproto_thread->call_route;
2062+
*route = iproto_thread->srv[msg->srv_id].call_route;
20582063
if (xrow_decode_call(&msg->header, &msg->call))
20592064
return -1;
20602065
return 0;
@@ -2772,7 +2777,7 @@ tx_process_select(struct cmsg *m)
27722777
}
27732778

27742779
static int
2775-
tx_process_call_on_yield(struct trigger *trigger, void *event)
2780+
srv_process_call_on_yield(struct trigger *trigger, void *event)
27762781
{
27772782
(void)event;
27782783
struct iproto_msg *msg = (struct iproto_msg *)trigger->data;
@@ -2810,7 +2815,7 @@ tx_process_call(struct cmsg *m)
28102815
* a long polling request.
28112816
*/
28122817
struct trigger fiber_on_yield;
2813-
trigger_create(&fiber_on_yield, tx_process_call_on_yield, msg, NULL);
2818+
trigger_create(&fiber_on_yield, srv_process_call_on_yield, msg, NULL);
28142819
trigger_add(&fiber()->on_yield, &fiber_on_yield);
28152820

28162821
int rc;
@@ -2885,6 +2890,60 @@ tx_process_call(struct cmsg *m)
28852890
tx_end_msg(msg, &svp);
28862891
}
28872892

2893+
/**
2894+
* Process a CALL/EVAL request in a serving thread.
2895+
*/
2896+
static void
2897+
srv_process_call(struct cmsg *m)
2898+
{
2899+
struct iproto_msg *msg = srv_accept_msg(m);
2900+
struct obuf *out;
2901+
struct obuf_svp svp;
2902+
struct port port;
2903+
int count;
2904+
int rc;
2905+
2906+
struct trigger fiber_on_yield;
2907+
trigger_create(&fiber_on_yield, srv_process_call_on_yield, msg, NULL);
2908+
trigger_add(&fiber()->on_yield, &fiber_on_yield);
2909+
2910+
switch (msg->header.type) {
2911+
case IPROTO_CALL:
2912+
rc = app_thread_process_call(&msg->call, &port);
2913+
break;
2914+
case IPROTO_EVAL:
2915+
rc = app_thread_process_eval(&msg->call, &port);
2916+
break;
2917+
default:
2918+
unreachable();
2919+
}
2920+
2921+
trigger_clear(&fiber_on_yield);
2922+
2923+
if (rc != 0)
2924+
goto error;
2925+
2926+
out = iproto_msg_obuf(msg);
2927+
iproto_prepare_select(out, &svp);
2928+
count = port_dump_msgpack(&port, out);
2929+
port_destroy(&port);
2930+
if (count < 0) {
2931+
obuf_rollback_to_svp(out, &svp);
2932+
goto error;
2933+
}
2934+
iproto_reply_select(out, &svp, msg->header.sync, /*schema_version=*/0,
2935+
count, /*box_tuple_as_ext=*/false);
2936+
iproto_wpos_create(&msg->wpos, out);
2937+
srv_end_msg(msg);
2938+
return;
2939+
error:
2940+
out = iproto_msg_obuf(msg);
2941+
iproto_reply_error(out, diag_last_error(&fiber()->diag),
2942+
msg->header.sync, /*schema_version=*/0);
2943+
iproto_wpos_create(&msg->wpos, out);
2944+
srv_end_msg(msg);
2945+
}
2946+
28882947
static void
28892948
tx_process_id(struct iproto_connection *con, const struct id_request *id)
28902949
{
@@ -3879,8 +3938,6 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
38793938
iproto_thread->disconnect_route[1] = {net_finish_disconnect, NULL};
38803939
iproto_thread->misc_route[0] = {tx_process_misc, net_pipe};
38813940
iproto_thread->misc_route[1] = {net_send_msg, NULL};
3882-
iproto_thread->call_route[0] = {tx_process_call, net_pipe};
3883-
iproto_thread->call_route[1] = {net_send_msg, NULL};
38843941
iproto_thread->select_route[0] = {tx_process_select, net_pipe};
38853942
iproto_thread->select_route[1] = {net_send_msg, NULL};
38863943
iproto_thread->process1_route[0] = {tx_process1, net_pipe};
@@ -3897,30 +3954,27 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
38973954
iproto_thread->push_route[1] = {tx_end_push, NULL};
38983955

38993956
struct cmsg_hop **dml_route = iproto_thread->dml_route;
3900-
assert(dml_route[IPROTO_OK] == NULL);
39013957
dml_route[IPROTO_SELECT] = iproto_thread->select_route;
39023958
dml_route[IPROTO_INSERT] = iproto_thread->process1_route;
39033959
dml_route[IPROTO_REPLACE] = iproto_thread->process1_route;
39043960
dml_route[IPROTO_UPDATE] = iproto_thread->process1_route;
39053961
dml_route[IPROTO_DELETE] = iproto_thread->process1_route;
3906-
dml_route[IPROTO_CALL_16] = iproto_thread->call_route;
3907-
dml_route[IPROTO_AUTH] = iproto_thread->misc_route;
3908-
dml_route[IPROTO_EVAL] = iproto_thread->call_route;
39093962
dml_route[IPROTO_UPSERT] = iproto_thread->process1_route;
3910-
dml_route[IPROTO_CALL] = iproto_thread->call_route;
3911-
dml_route[IPROTO_EXECUTE] = iproto_thread->sql_route;
3912-
assert(dml_route[IPROTO_NOP] == NULL);
3913-
dml_route[IPROTO_PREPARE] = iproto_thread->sql_route;
3914-
assert(dml_route[IPROTO_BEGIN] == NULL);
3915-
assert(dml_route[IPROTO_COMMIT] == NULL);
3916-
assert(dml_route[IPROTO_ROLLBACK] == NULL);
39173963
dml_route[IPROTO_INSERT_ARROW] = iproto_thread->process1_route;
39183964

39193965
iproto_thread->connect_route[0] = {tx_process_connect, net_pipe};
39203966
iproto_thread->connect_route[1] = {net_send_greeting, NULL};
39213967
iproto_thread->override_route[0] = {tx_process_override, net_pipe};
39223968
iproto_thread->override_route[1] = {net_send_msg, NULL};
39233969

3970+
iproto_thread->srv[0].call_route[0] = {tx_process_call, net_pipe};
3971+
iproto_thread->srv[0].call_route[1] = {net_send_msg, NULL};
3972+
for (int i = 1; i < iproto_thread->srv_count; i++) {
3973+
iproto_thread->srv[i].call_route[0] =
3974+
{srv_process_call, &iproto_thread->srv[i].ret_pipe};
3975+
iproto_thread->srv[i].call_route[1] =
3976+
{net_send_msg, NULL};
3977+
}
39243978
for (int i = 0; i < iproto_thread->srv_count; i++) {
39253979
iproto_thread->srv[i].destroy_route[0] =
39263980
{srv_process_destroy, &iproto_thread->srv[i].ret_pipe};

src/box/lua/app_threads.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* SPDX-License-Identifier: BSD-2-Clause
3+
*
4+
* Copyright 2010-2026, Tarantool AUTHORS, please see AUTHORS file.
5+
*/
6+
#include "box/lua/app_threads.h"
7+
8+
#include <lua.h>
9+
#include <stdint.h>
10+
11+
#include "lua/init.h"
12+
#include "box/lua/call.h"
13+
14+
void
15+
app_thread_lua_init(void)
16+
{
17+
tarantool_lua_init_minimal();
18+
box_lua_call_init(tarantool_L);
19+
}
20+
21+
void
22+
app_thread_lua_free(void)
23+
{
24+
lua_close(tarantool_L);
25+
tarantool_L = NULL;
26+
}
27+
28+
int
29+
app_thread_lua_call(const char *name, uint32_t name_len,
30+
struct port *args, struct port *ret)
31+
{
32+
return box_lua_call(name, name_len, args, ret);
33+
}
34+
35+
int
36+
app_thread_lua_eval(const char *expr, uint32_t expr_len,
37+
struct port *args, struct port *ret)
38+
{
39+
return box_lua_eval(expr, expr_len, args, ret);
40+
}

src/box/lua/app_threads.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: BSD-2-Clause
3+
*
4+
* Copyright 2010-2026, Tarantool AUTHORS, please see AUTHORS file.
5+
*/
6+
#pragma once
7+
8+
#include <stdint.h>
9+
10+
#if defined(__cplusplus)
11+
extern "C" {
12+
#endif /* defined(__cplusplus) */
13+
14+
struct port;
15+
16+
void
17+
app_thread_lua_init(void);
18+
19+
void
20+
app_thread_lua_free(void);
21+
22+
/**
23+
* Executes a Lua function in this application thread.
24+
*/
25+
int
26+
app_thread_lua_call(const char *name, uint32_t name_len,
27+
struct port *args, struct port *ret);
28+
29+
/**
30+
* Executes a Lua expression in this application thread.
31+
*/
32+
int
33+
app_thread_lua_eval(const char *expr, uint32_t expr_len,
34+
struct port *args, struct port *ret);
35+
36+
#if defined(__cplusplus)
37+
} /* extern "C" */
38+
#endif /* defined(__cplusplus) */

0 commit comments

Comments
 (0)