Skip to content

Commit af92f72

Browse files
tchronoedsiper
authored andcommitted
filter_lua: Fix record split in mpack implementation
Fix the mpack implementation of lua, which didn't handle record splitting. Fix fluent#5496 Signed-off-by: Thiago Padilha <[email protected]>
1 parent 1e6fe03 commit af92f72

File tree

3 files changed

+45
-10
lines changed

3 files changed

+45
-10
lines changed

include/fluent-bit/flb_lua.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct flb_lua_l2c_config {
4747
struct mk_list l2c_types; /* data types (lua -> C) */
4848
};
4949

50+
int flb_lua_arraylength(lua_State *l);
5051
void flb_lua_pushtimetable(lua_State *l, struct flb_time *tm);
5152
int flb_lua_is_valid_func(lua_State *l, flb_sds_t func);
5253
int flb_lua_pushmpack(lua_State *l, mpack_reader_t *reader);

plugins/filter_lua/lua.c

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,45 @@ static void mpack_buffer_flush(mpack_writer_t* writer, const char* buffer, size_
112112
flb_sds_cat_safe(&ctx->packbuf, buffer, count);
113113
}
114114

115+
static void pack_result_mpack(lua_State *l,
116+
mpack_writer_t *writer,
117+
struct flb_lua_l2c_config *l2cc,
118+
struct flb_time *t)
119+
{
120+
int i;
121+
int len;
122+
123+
if (lua_type(l, -1) != LUA_TTABLE) {
124+
return;
125+
}
126+
127+
len = flb_lua_arraylength(l);
128+
if (len > 0) {
129+
/* record split */
130+
for (i = 1; i <= len; i++) {
131+
/* write array tag */
132+
mpack_write_tag(writer, mpack_tag_array(2));
133+
/* write timestamp */
134+
flb_time_append_to_mpack(writer, t, 0);
135+
/* get the subrecord */
136+
lua_rawgeti(l, -1, i);
137+
/* convert */
138+
flb_lua_tompack(l, writer, 0, l2cc);
139+
lua_pop(l, 1);
140+
}
141+
}
142+
else {
143+
/* write array tag */
144+
mpack_write_tag(writer, mpack_tag_array(2));
145+
/* write timestamp */
146+
flb_time_append_to_mpack(writer, t, 0);
147+
/* convert */
148+
flb_lua_tompack(l, writer, 0, l2cc);
149+
}
150+
/* pop */
151+
lua_pop(l, 1);
152+
}
153+
115154
static int cb_lua_filter_mpack(const void *data, size_t bytes,
116155
const char *tag, int tag_len,
117156
void **out_buf, size_t *out_bytes,
@@ -251,13 +290,8 @@ static int cb_lua_filter_mpack(const void *data, size_t bytes,
251290
mpack_writer_init(&writer, writebuf, sizeof(writebuf));
252291
mpack_writer_set_context(&writer, ctx);
253292
mpack_writer_set_flush(&writer, mpack_buffer_flush);
254-
/* write array tag */
255-
mpack_write_tag(&writer, mpack_tag_array(2));
256-
/* write timestamp: convert from double to Fluent Bit format */
257-
flb_time_append_to_mpack(&writer, &t, 0);
258-
/* write the lua table */
259-
flb_lua_tompack(ctx->lua->state, &writer, 0, &ctx->l2cc);
260-
lua_pop(ctx->lua->state, 1);
293+
/* write the result */
294+
pack_result_mpack(ctx->lua->state, &writer, &ctx->l2cc, &t);
261295
/* flush the writer */
262296
mpack_writer_flush_message(&writer);
263297
mpack_writer_destroy(&writer);

src/flb_lua.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ static int lua_table_maxn(lua_State *l)
243243
#endif
244244
}
245245

246-
static int lua_arraylength(lua_State *l)
246+
int flb_lua_arraylength(lua_State *l)
247247
{
248248
lua_Integer n;
249249
int count = 0;
@@ -442,7 +442,7 @@ void flb_lua_tompack(lua_State *l,
442442
mpack_write_false(writer);
443443
break;
444444
case LUA_TTABLE:
445-
len = lua_arraylength(l);
445+
len = flb_lua_arraylength(l);
446446
if (len > 0) {
447447
mpack_write_tag(writer, mpack_tag_array(len));
448448
for (i = 1; i <= len; i++) {
@@ -533,7 +533,7 @@ void flb_lua_tomsgpack(lua_State *l,
533533
msgpack_pack_false(pck);
534534
break;
535535
case LUA_TTABLE:
536-
len = lua_arraylength(l);
536+
len = flb_lua_arraylength(l);
537537
if (len > 0) {
538538
msgpack_pack_array(pck, len);
539539
for (i = 1; i <= len; i++) {

0 commit comments

Comments
 (0)