Skip to content

Commit fc153b0

Browse files
committed
multiline: new api to destroy all stream_id in context
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 80776bf commit fc153b0

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

include/fluent-bit/multiline/flb_ml.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,11 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,
268268
struct flb_time *tm, msgpack_object *obj);
269269

270270
int flb_ml_parsers_init(struct flb_config *ctx);
271+
272+
void flb_ml_flush_parser_instance(struct flb_ml *ml,
273+
struct flb_ml_parser_ins *parser_i,
274+
uint64_t stream_id);
275+
271276
int flb_ml_auto_flush_init(struct flb_ml *ml);
272277

273278
int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
@@ -288,6 +293,8 @@ int flb_ml_stream_create(struct flb_ml *ml,
288293

289294
int flb_ml_stream_destroy(struct flb_ml_stream *mst);
290295

296+
void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id);
297+
291298
struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser,
292299
uint64_t stream_id);
293300

src/multiline/flb_ml.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ int flb_ml_type_lookup(char *str)
8080
return type;
8181
}
8282

83-
static void flb_ml_flush_parser_instance(struct flb_ml *ml,
84-
struct flb_ml_parser_ins *parser_i,
85-
uint64_t stream_id)
83+
void flb_ml_flush_parser_instance(struct flb_ml *ml,
84+
struct flb_ml_parser_ins *parser_i,
85+
uint64_t stream_id)
8686
{
8787
struct mk_list *head;
8888
struct mk_list *head_group;

src/multiline/flb_ml_stream.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,41 @@ struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser,
280280
return NULL;
281281
}
282282

283+
void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id)
284+
{
285+
struct mk_list *tmp;
286+
struct mk_list *head;
287+
struct mk_list *head_group;
288+
struct mk_list *head_stream;
289+
struct flb_ml_group *group;
290+
struct flb_ml_stream *mst;
291+
struct flb_ml_parser_ins *parser_i;
292+
293+
/* groups */
294+
mk_list_foreach(head, &ml->groups) {
295+
group = mk_list_entry(head, struct flb_ml_group, _head);
296+
297+
/* parser instances */
298+
mk_list_foreach(head_group, &group->parsers) {
299+
parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
300+
301+
/* streams */
302+
mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) {
303+
mst = mk_list_entry(head_stream, struct flb_ml_stream, _head);
304+
if (mst->id != stream_id) {
305+
continue;
306+
}
307+
308+
/* flush any pending data */
309+
flb_ml_flush_parser_instance(ml, parser_i, stream_id);
310+
311+
/* destroy internal groups of the stream */
312+
flb_ml_stream_destroy(mst);
313+
}
314+
}
315+
}
316+
}
317+
283318
int flb_ml_stream_destroy(struct flb_ml_stream *mst)
284319
{
285320
mk_list_del(&mst->_head);

0 commit comments

Comments
 (0)