Skip to content

Commit b8cc6b4

Browse files
Gumixlocker
authored andcommitted
box/lua: introduce space:insert_arrow(arrow)
The new method takes a string with the data in Arrow IPC format as an argument, see commit be34a84 ("box: introduce box_insert_arrow()"). The data is deserialized and inserted into a space. Follow-up tarantool#10508 Needed for tarantool/tarantool-ee#865 NO_DOC=No sense in mentioning in CE NO_CHANGELOG=No sense in mentioning in CE
1 parent 740daed commit b8cc6b4

File tree

4 files changed

+124
-1
lines changed

4 files changed

+124
-1
lines changed

src/box/lua/index.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "box/lua/tuple.h"
3838
#include "box/lua/misc.h"
3939
#include "small/region.h"
40+
#include "arrow_ipc.h"
4041
#include "fiber.h"
4142

4243
/** {{{ box.index Lua library: access to spaces and indexes
@@ -419,6 +420,33 @@ lbox_index_compact(lua_State *L)
419420
return 0;
420421
}
421422

423+
static int
424+
lbox_insert_arrow(lua_State *L)
425+
{
426+
if (lua_gettop(L) != 2 || !lua_isnumber(L, 1) ||
427+
lua_type(L, 2) != LUA_TSTRING) {
428+
diag_set(IllegalParams, "Usage: space:insert_arrow(arrow)");
429+
return luaT_error(L);
430+
}
431+
uint32_t space_id = lua_tonumber(L, 1);
432+
433+
size_t len;
434+
const char *data = lua_tolstring(L, 2, &len);
435+
assert(data != NULL);
436+
struct ArrowArray array;
437+
struct ArrowSchema schema;
438+
if (arrow_ipc_decode(&array, &schema, data, data + len) != 0)
439+
return luaT_error(L);
440+
int rc = box_insert_arrow(space_id, &array, &schema);
441+
if (schema.release != NULL)
442+
schema.release(&schema);
443+
if (array.release != NULL)
444+
array.release(&array);
445+
if (rc != 0)
446+
return luaT_error(L);
447+
return 0;
448+
}
449+
422450
/* }}} */
423451

424452
void
@@ -452,6 +480,7 @@ box_lua_index_init(struct lua_State *L)
452480
{"truncate", lbox_truncate},
453481
{"stat", lbox_index_stat},
454482
{"compact", lbox_index_compact},
483+
{"insert_arrow", lbox_insert_arrow},
455484
{NULL, NULL}
456485
};
457486

src/box/lua/schema.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2876,6 +2876,11 @@ space_mt.run_triggers = function(space, yesno)
28762876
end
28772877
builtin.space_run_triggers(s, yesno)
28782878
end
2879+
space_mt.insert_arrow = function(space, arrow)
2880+
check_space_arg(space, 'insert_arrow', 2)
2881+
check_space_exists(space, 2)
2882+
return internal.insert_arrow(space.id, arrow);
2883+
end
28792884
space_mt.frommap = box.internal.space.frommap
28802885
space_mt.stat = box.internal.space.stat
28812886
space_mt.__index = space_mt

src/lua/msgpack.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,12 @@ luamp_decode_with_ctx(struct lua_State *L, struct luaL_serializer *cfg,
519519
VERIFY(interval_unpack(data, len, itv) != NULL);
520520
return;
521521
}
522+
case MP_ARROW:
523+
{
524+
lua_pushlstring(L, *data, len);
525+
*data += len;
526+
return;
527+
}
522528
default:
523529
/* reset data to the extension header */
524530
*data = svp;
@@ -531,7 +537,6 @@ luamp_decode_with_ctx(struct lua_State *L, struct luaL_serializer *cfg,
531537
return;
532538
}
533539

534-
535540
static int
536541
lua_msgpack_encode(lua_State *L)
537542
{
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
local server = require('luatest.server')
2+
local t = require('luatest')
3+
local g = t.group()
4+
5+
g.before_all(function(cg)
6+
cg.server = server:new()
7+
cg.server:start()
8+
end)
9+
10+
g.after_all(function(cg)
11+
cg.server:drop()
12+
end)
13+
14+
g.after_each(function(cg)
15+
cg.server:exec(function()
16+
if box.space.test ~= nil then
17+
box.space.test:drop()
18+
end
19+
end)
20+
end)
21+
22+
-- MP_EXT of type MP_ARROW, column 'a', value 0.
23+
local mp_arrow_hex = [[
24+
c8011008ffffffff70000000040000009effffff0400010004000000b6ffffff0c0000000400
25+
0000000000000100000004000000daffffff140000000202000004000000f0ffffff40000000
26+
01000000610000000600080004000c0010000400080009000c000c000c000000040000000800
27+
0a000c00040006000800ffffffff88000000040000008affffff040003001000000008000000
28+
0000000000000000acffffff0100000000000000340000000800000000000000020000000000
29+
0000000000000000000000000000000000000000000008000000000000000000000001000000
30+
010000000000000000000000000000000a00140004000c0010000c0014000400060008000c00
31+
00000000000000000000
32+
]]
33+
34+
g.test_decode_mp_arrow = function(cg)
35+
cg.server:exec(function(mp_arrow_hex)
36+
local msgpack = require('msgpack')
37+
-- Test invalid extension.
38+
t.assert_error_msg_equals(
39+
"Invalid MsgPack - invalid extension",
40+
msgpack.decode, string.fromhex('c70408deadbeef'))
41+
-- Test a valid extension.
42+
local mp_arrow = string.fromhex(mp_arrow_hex:gsub("%s+", ""))
43+
local decoded, next_pos = msgpack.decode(mp_arrow)
44+
t.assert_equals(decoded, string.sub(mp_arrow, 5))
45+
t.assert_equals(next_pos, 277)
46+
end, {mp_arrow_hex})
47+
end
48+
49+
g.test_space_insert_arrow = function(cg)
50+
cg.server:exec(function(mp_arrow_hex)
51+
local msgpack = require('msgpack')
52+
local s = box.schema.create_space('test')
53+
54+
t.assert_error_msg_equals(
55+
"Usage: space:insert_arrow(arrow)",
56+
box.internal.insert_arrow)
57+
t.assert_error_msg_equals(
58+
"Usage: space:insert_arrow(arrow)",
59+
box.internal.insert_arrow, 'not a number', 'string')
60+
t.assert_error_msg_equals(
61+
"Usage: space:insert_arrow(arrow)",
62+
box.internal.insert_arrow, 11, {22})
63+
t.assert_error_msg_equals(
64+
"Use space:insert_arrow(...) instead of " ..
65+
"space.insert_arrow(...)",
66+
s.insert_arrow)
67+
t.assert_error_msg_equals(
68+
"Space 'xyz' does not exist",
69+
s.insert_arrow, {id = 1000, name = 'xyz'})
70+
t.assert_error_msg_equals(
71+
"Usage: space:insert_arrow(arrow)",
72+
s.insert_arrow, s, 22)
73+
t.assert_error_msg_equals(
74+
"Arrow decode error: Expected >= 544501618 bytes of remaining " ..
75+
"data but found 29 bytes in buffer",
76+
s.insert_arrow, s, 'not a valid arrow data string')
77+
78+
local mp_arrow = string.fromhex(mp_arrow_hex:gsub("%s+", ""))
79+
local arrow = msgpack.decode(mp_arrow)
80+
t.assert_error_msg_equals(
81+
"memtx does not support arrow format",
82+
s.insert_arrow, s, arrow)
83+
end, {mp_arrow_hex})
84+
end

0 commit comments

Comments
 (0)