Skip to content

Commit 2787f6a

Browse files
committed
pack: add new JSON parser wrapper for yyjson
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 6eb8afd commit 2787f6a

File tree

2 files changed

+217
-2
lines changed

2 files changed

+217
-2
lines changed

include/fluent-bit/flb_pack.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
/* Fluent Bit
44
* ==========
5-
* Copyright (C) 2015-2024 The Fluent Bit Authors
5+
* Copyright (C) 2015-2025 The Fluent Bit Authors
66
*
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
#include <fluent-bit/flb_config.h>
2828

2929
#include <msgpack.h>
30+
#include <yyjson.h>
3031

3132
/* JSON types */
3233
#define FLB_PACK_JSON_UNDEFINED JSMN_UNDEFINED
@@ -80,6 +81,11 @@ int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
8081
int *root_type, size_t *consumed);
8182
int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
8283
int *root_type, int *out_records, size_t *consumed);
84+
int flb_pack_json_yyjson(const char *js, size_t len, char **buffer, size_t *size,
85+
int *root_type, size_t *consumed);
86+
int flb_pack_json_recs_yyjson(const char *js, size_t len, char **buffer,
87+
size_t *size, int *root_type, int *out_records,
88+
size_t *consumed);
8389

8490
int flb_pack_state_init(struct flb_pack_state *s);
8591
void flb_pack_state_reset(struct flb_pack_state *s);

src/flb_pack.c

Lines changed: 210 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <msgpack.h>
4242
#include <math.h>
4343
#include <jsmn/jsmn.h>
44+
#include <yyjson.h>
4445

4546
#define try_to_write_str flb_utils_write_str
4647

@@ -189,6 +190,200 @@ static inline int pack_string_token(struct flb_pack_state *state,
189190
return out_len;
190191
}
191192

193+
/* Convert a yyjson value to msgpack */
194+
static void yyjson_val_to_msgpack(yyjson_val *val, msgpack_packer *pck)
195+
{
196+
size_t idx, max;
197+
yyjson_val *key;
198+
yyjson_val *tmp;
199+
const char *k;
200+
size_t klen;
201+
202+
switch (yyjson_get_type(val)) {
203+
case YYJSON_TYPE_OBJ:
204+
msgpack_pack_map(pck, yyjson_obj_size(val));
205+
yyjson_obj_foreach(val, idx, max, key, tmp) {
206+
k = yyjson_get_str(key);
207+
klen = yyjson_get_len(key);
208+
msgpack_pack_str(pck, klen);
209+
msgpack_pack_str_body(pck, k, klen);
210+
yyjson_val_to_msgpack(tmp, pck);
211+
}
212+
break;
213+
case YYJSON_TYPE_ARR:
214+
msgpack_pack_array(pck, yyjson_arr_size(val));
215+
yyjson_arr_foreach(val, idx, max, tmp) {
216+
yyjson_val_to_msgpack(tmp, pck);
217+
}
218+
break;
219+
case YYJSON_TYPE_STR:
220+
msgpack_pack_str(pck, yyjson_get_len(val));
221+
msgpack_pack_str_body(pck, yyjson_get_str(val), yyjson_get_len(val));
222+
break;
223+
case YYJSON_TYPE_BOOL:
224+
if (yyjson_get_bool(val)) {
225+
msgpack_pack_true(pck);
226+
}
227+
else {
228+
msgpack_pack_false(pck);
229+
}
230+
break;
231+
case YYJSON_TYPE_NULL:
232+
msgpack_pack_nil(pck);
233+
break;
234+
case YYJSON_TYPE_NUM:
235+
if (yyjson_is_int(val)) {
236+
if (yyjson_is_sint(val)) {
237+
msgpack_pack_int64(pck, yyjson_get_sint(val));
238+
}
239+
else {
240+
msgpack_pack_uint64(pck, yyjson_get_uint(val));
241+
}
242+
}
243+
else {
244+
msgpack_pack_double(pck, yyjson_get_real(val));
245+
}
246+
break;
247+
default:
248+
msgpack_pack_nil(pck);
249+
}
250+
}
251+
252+
static inline int yyjson_root_type(yyjson_val *val)
253+
{
254+
switch (yyjson_get_type(val)) {
255+
case YYJSON_TYPE_OBJ:
256+
return JSMN_OBJECT;
257+
case YYJSON_TYPE_ARR:
258+
return JSMN_ARRAY;
259+
case YYJSON_TYPE_STR:
260+
return JSMN_STRING;
261+
default:
262+
return JSMN_PRIMITIVE;
263+
}
264+
}
265+
266+
static int pack_json_to_msgpack_yyjson(const char *js, size_t len, char **buffer,
267+
size_t *size, int *root_type, int *records,
268+
size_t *consumed)
269+
{
270+
int count_records = 0;
271+
size_t read_bytes;
272+
yyjson_read_err err;
273+
yyjson_doc *doc = NULL;
274+
yyjson_val *root;
275+
msgpack_sbuffer sbuf;
276+
msgpack_packer pck;
277+
char *start, *end, *insitu_buf;
278+
279+
if (!js || !buffer || !size) {
280+
return -1;
281+
}
282+
283+
/*
284+
* This is the tricky part, if we want to take advantage of SIMD we need to add
285+
* padding to the buffer (INSITU), otherwise trusting the caller it's a bit risky.
286+
*
287+
* An extra optimization would be to provide a specific API for callers who are aware about
288+
* padding and buffer states, or use a pool allocator. While this is a good optimization,
289+
* it's not a priority for now, we are already gaining around 50% perf improvement with this
290+
* implementation compared to the previous one.
291+
*/
292+
insitu_buf = flb_malloc(len + YYJSON_PADDING_SIZE);
293+
if (!insitu_buf) {
294+
flb_errno();
295+
return -1;
296+
}
297+
memcpy(insitu_buf, js, len);
298+
memset(insitu_buf + len, 0, YYJSON_PADDING_SIZE);
299+
300+
start = insitu_buf;
301+
end = insitu_buf + len;
302+
303+
msgpack_sbuffer_init(&sbuf);
304+
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
305+
306+
while (start < end) {
307+
/* Skip leading whitespace/newlines between JSON values */
308+
while (start < end && (*start == ' ' || *start == '\t' ||
309+
*start == '\n' || *start == '\r')) {
310+
start++;
311+
}
312+
if (start >= end) {
313+
/* only whitespace remains */
314+
break;
315+
}
316+
317+
doc = yyjson_read_opts(start, (size_t)(end - start),
318+
YYJSON_READ_STOP_WHEN_DONE | YYJSON_READ_INSITU |
319+
YYJSON_READ_ALLOW_INVALID_UNICODE | YYJSON_READ_REPLACE_INVALID_UNICODE,
320+
NULL, &err);
321+
if (!doc) {
322+
/* If we already parsed something, treat trailing junk/whitespace as done */
323+
if (count_records > 0) {
324+
break;
325+
}
326+
flb_debug("[yyjson->msgpack] read error code=%d msg=%s pos=%zu",
327+
err.code, err.msg, err.pos);
328+
msgpack_sbuffer_clear(&sbuf);
329+
msgpack_sbuffer_destroy(&sbuf);
330+
flb_free(insitu_buf);
331+
return -1;
332+
}
333+
334+
read_bytes = yyjson_doc_get_read_size(doc);
335+
if (read_bytes == 0) {
336+
yyjson_doc_free(doc);
337+
doc = NULL;
338+
/* No progress; if nothing parsed yet, error; else stop */
339+
if (count_records == 0) {
340+
msgpack_sbuffer_clear(&sbuf);
341+
msgpack_sbuffer_destroy(&sbuf);
342+
flb_free(insitu_buf);
343+
return -1;
344+
}
345+
break;
346+
}
347+
348+
root = yyjson_doc_get_root(doc);
349+
if (!root) {
350+
yyjson_doc_free(doc);
351+
doc = NULL;
352+
msgpack_sbuffer_clear(&sbuf);
353+
msgpack_sbuffer_destroy(&sbuf);
354+
flb_free(insitu_buf);
355+
return -1;
356+
}
357+
358+
yyjson_val_to_msgpack(root, &pck);
359+
360+
if (root_type && count_records == 0) {
361+
*root_type = yyjson_root_type(root);
362+
}
363+
364+
yyjson_doc_free(doc);
365+
doc = NULL;
366+
count_records++;
367+
368+
/* Move to the next value in the stream */
369+
start += read_bytes;
370+
}
371+
372+
if (records) {
373+
*records = count_records;
374+
}
375+
if (consumed) {
376+
*consumed = (size_t) (start - insitu_buf);
377+
}
378+
379+
/* caller owns and must free with the same allocator */
380+
*buffer = sbuf.data;
381+
*size = sbuf.size;
382+
383+
flb_free(insitu_buf);
384+
return 0;
385+
}
386+
192387
/* Receive a tokenized JSON message and convert it to MsgPack */
193388
static char *tokens_to_msgpack(struct flb_pack_state *state,
194389
const char *js,
@@ -336,7 +531,6 @@ int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
336531
int *root_type, size_t *consumed)
337532
{
338533
int records;
339-
340534
return pack_json_to_msgpack(js, len, buffer, size, root_type, &records, consumed);
341535
}
342536

@@ -350,6 +544,21 @@ int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
350544
return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records, consumed);
351545
}
352546

547+
/* Pack a JSON message using yyjson */
548+
int flb_pack_json_yyjson(const char *js, size_t len, char **buffer, size_t *size,
549+
int *root_type, size_t *consumed)
550+
{
551+
int records;
552+
553+
return pack_json_to_msgpack_yyjson(js, len, buffer, size, root_type, &records, consumed);
554+
}
555+
556+
int flb_pack_json_recs_yyjson(const char *js, size_t len, char **buffer, size_t *size,
557+
int *root_type, int *out_records, size_t *consumed)
558+
{
559+
return pack_json_to_msgpack_yyjson(js, len, buffer, size, root_type, out_records, consumed);
560+
}
561+
353562
/* Initialize a JSON packer state */
354563
int flb_pack_state_init(struct flb_pack_state *s)
355564
{

0 commit comments

Comments
 (0)