Skip to content

Commit 7be7826

Browse files
committed
WIP
Signed-off-by: Eduardo Silva <[email protected]>
1 parent a4ace4e commit 7be7826

File tree

8 files changed

+512
-63
lines changed

8 files changed

+512
-63
lines changed

include/fluent-bit/flb_mp_chunk.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ struct flb_mp_chunk_cobj {
6060

6161

6262
struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj *chunk_cobj);
63+
struct flb_mp_chunk_record *flb_mp_chunk_record_dup(struct flb_mp_chunk_record *src);
6364

6465
int flb_mp_chunk_cobj_record_destroy(struct flb_mp_chunk_cobj *chunk_cobj,
6566
struct flb_mp_chunk_record *record);

include/fluent-bit/flb_router.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@
3030
#include <cfl/cfl.h>
3131
#include <monkey/mk_core.h>
3232

33+
struct flb_mp_chunk_cobj;
34+
struct flb_log_event_encoder;
35+
struct flb_log_event_decoder;
36+
37+
struct flb_router_chunk_context {
38+
struct flb_mp_chunk_cobj *chunk_cobj;
39+
struct flb_log_event_encoder *log_encoder;
40+
struct flb_log_event_decoder *log_decoder;
41+
};
42+
3343
struct flb_router_path {
3444
struct flb_output_instance *ins;
3545
struct flb_route *route;
@@ -110,6 +120,7 @@ struct flb_route_processor {
110120
struct flb_route {
111121
flb_sds_t name;
112122
uint32_t signals;
123+
int per_record_routing; /* Enable per-record routing for this route */
113124
struct flb_route_condition *condition;
114125
struct cfl_list outputs;
115126
struct cfl_list processors;
@@ -135,15 +146,39 @@ void flb_router_exit(struct flb_config *config);
135146

136147
uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk);
137148

149+
int flb_router_chunk_context_init(struct flb_router_chunk_context *context);
150+
void flb_router_chunk_context_reset(struct flb_router_chunk_context *context);
151+
void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context);
152+
int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context,
153+
struct flb_event_chunk *chunk);
154+
138155
int flb_route_condition_eval(struct flb_event_chunk *chunk,
156+
struct flb_router_chunk_context *context,
139157
struct flb_route *route);
140158
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
159+
struct flb_router_chunk_context *context,
141160
struct flb_route *route);
161+
int flb_condition_eval_logs_per_record(struct flb_event_chunk *chunk,
162+
struct flb_router_chunk_context *context,
163+
struct flb_route *route,
164+
struct cfl_list *matching_records);
165+
int flb_router_route_per_record(struct flb_event_chunk *chunk,
166+
struct flb_router_chunk_context *context,
167+
struct flb_route *route,
168+
struct flb_event_chunk **out_chunk);
169+
int flb_router_create_chunk_from_records(struct cfl_list *records,
170+
const char *tag,
171+
int event_type,
172+
char **out_buf,
173+
size_t *out_size);
142174
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
175+
struct flb_router_chunk_context *context,
143176
struct flb_route *route);
144177
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
178+
struct flb_router_chunk_context *context,
145179
struct flb_route *route);
146180
int flb_router_path_should_route(struct flb_event_chunk *chunk,
181+
struct flb_router_chunk_context *context,
147182
struct flb_router_path *path);
148183

149184
struct flb_cf;

src/flb_event.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
#include <fluent-bit/flb_event.h>
2525
#include <fluent-bit/flb_sds.h>
26-
2726
struct flb_event_chunk *flb_event_chunk_create(int type,
2827
int total_events,
2928
char *tag_buf, int tag_len,

src/flb_mp.c

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,6 +1434,82 @@ int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
14341434
return ret;
14351435
}
14361436

1437+
/* Duplicate a chunk record for per-record routing */
1438+
struct flb_mp_chunk_record *flb_mp_chunk_record_dup(struct flb_mp_chunk_record *src)
1439+
{
1440+
struct flb_mp_chunk_record *dup;
1441+
1442+
if (!src) {
1443+
return NULL;
1444+
}
1445+
1446+
dup = flb_calloc(1, sizeof(struct flb_mp_chunk_record));
1447+
if (!dup) {
1448+
return NULL;
1449+
}
1450+
1451+
/* Copy basic fields */
1452+
dup->event = src->event;
1453+
dup->owns_group_metadata = FLB_FALSE;
1454+
dup->owns_group_attributes = FLB_FALSE;
1455+
1456+
/* Duplicate metadata if present */
1457+
if (src->cobj_metadata) {
1458+
dup->cobj_metadata = cfl_object_dup(src->cobj_metadata);
1459+
if (!dup->cobj_metadata) {
1460+
flb_free(dup);
1461+
return NULL;
1462+
}
1463+
}
1464+
1465+
/* Duplicate record body if present */
1466+
if (src->cobj_record) {
1467+
dup->cobj_record = cfl_object_dup(src->cobj_record);
1468+
if (!dup->cobj_record) {
1469+
if (dup->cobj_metadata) {
1470+
cfl_object_destroy(dup->cobj_metadata);
1471+
}
1472+
flb_free(dup);
1473+
return NULL;
1474+
}
1475+
}
1476+
1477+
/* Duplicate group metadata if present */
1478+
if (src->cobj_group_metadata) {
1479+
dup->cobj_group_metadata = cfl_object_dup(src->cobj_group_metadata);
1480+
if (!dup->cobj_group_metadata) {
1481+
if (dup->cobj_metadata) {
1482+
cfl_object_destroy(dup->cobj_metadata);
1483+
}
1484+
if (dup->cobj_record) {
1485+
cfl_object_destroy(dup->cobj_record);
1486+
}
1487+
flb_free(dup);
1488+
return NULL;
1489+
}
1490+
}
1491+
1492+
/* Duplicate group attributes if present */
1493+
if (src->cobj_group_attributes) {
1494+
dup->cobj_group_attributes = cfl_object_dup(src->cobj_group_attributes);
1495+
if (!dup->cobj_group_attributes) {
1496+
if (dup->cobj_metadata) {
1497+
cfl_object_destroy(dup->cobj_metadata);
1498+
}
1499+
if (dup->cobj_record) {
1500+
cfl_object_destroy(dup->cobj_record);
1501+
}
1502+
if (dup->cobj_group_metadata) {
1503+
cfl_object_destroy(dup->cobj_group_metadata);
1504+
}
1505+
flb_free(dup);
1506+
return NULL;
1507+
}
1508+
}
1509+
1510+
return dup;
1511+
}
1512+
14371513
int flb_mp_chunk_cobj_record_destroy(struct flb_mp_chunk_cobj *chunk_cobj,
14381514
struct flb_mp_chunk_record *record)
14391515
{

0 commit comments

Comments
 (0)