Skip to content

Commit 659a4db

Browse files
authored
in_syslog: Provide appending source address parameter (#7651)
* in_syslog: Append source_address into records if needed
1 parent f50a02e commit 659a4db

File tree

6 files changed

+276
-63
lines changed

6 files changed

+276
-63
lines changed

plugins/in_syslog/syslog.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ static struct flb_config_map config_map[] = {
239239
0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
240240
"Key where the raw message will be preserved"
241241
},
242+
{
243+
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
244+
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
245+
"Key where the source address will be injected"
246+
},
242247

243248

244249
/* EOF */

plugins/in_syslog/syslog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ struct flb_syslog {
6565
flb_sds_t parser_name;
6666
struct flb_parser *parser;
6767
flb_sds_t raw_message_key;
68+
flb_sds_t source_address_key;
6869

6970
int dgram_mode_flag;
7071
int collector_id;

plugins/in_syslog/syslog_conn.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,11 @@ int syslog_dgram_conn_event(void *data)
128128
struct flb_connection *connection;
129129
int bytes;
130130
struct syslog_conn *conn;
131-
struct flb_syslog *ctx;
132131

133132
connection = (struct flb_connection *) data;
134133

135134
conn = connection->user_data;
136135

137-
ctx = conn->ctx;
138-
139136
bytes = flb_io_net_read(connection,
140137
(void *) &conn->buf_data[conn->buf_len],
141138
conn->buf_size - 1);
@@ -144,7 +141,7 @@ int syslog_dgram_conn_event(void *data)
144141
conn->buf_data[bytes] = '\0';
145142
conn->buf_len = bytes;
146143

147-
syslog_prot_process_udp(conn->buf_data, conn->buf_len, ctx);
144+
syslog_prot_process_udp(conn);
148145
}
149146
else {
150147
flb_errno();

plugins/in_syslog/syslog_prot.c

Lines changed: 117 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include <fluent-bit/flb_input_plugin.h>
2121
#include <fluent-bit/flb_parser.h>
2222
#include <fluent-bit/flb_time.h>
23+
#include <fluent-bit/flb_pack.h>
2324

2425
#include "syslog.h"
2526
#include "syslog_conn.h"
27+
#include "syslog_prot.h"
2628

2729
#include <string.h>
2830

@@ -31,88 +33,127 @@ static inline void consume_bytes(char *buf, int bytes, int length)
3133
memmove(buf, buf + bytes, length - bytes);
3234
}
3335

34-
static int append_raw_message_to_record_data(char **result_buffer,
35-
size_t *result_size,
36-
flb_sds_t raw_message_key_name,
37-
char *base_object_buffer,
38-
size_t base_object_size,
39-
char *raw_message_buffer,
40-
size_t raw_message_size)
36+
static int append_message_to_record_data(char **result_buffer,
37+
size_t *result_size,
38+
flb_sds_t message_key_name,
39+
char *base_object_buffer,
40+
size_t base_object_size,
41+
char *message_buffer,
42+
size_t message_size,
43+
int message_type)
4144
{
42-
int i;
43-
int result;
44-
size_t unpacker_offset;
45-
msgpack_sbuffer mp_sbuf;
46-
msgpack_packer mp_pck;
47-
msgpack_unpacked unpacked_buffer;
45+
int result = FLB_MAP_NOT_MODIFIED;
46+
char *modified_data_buffer;
47+
int modified_data_size;
48+
msgpack_object_kv *new_map_entries[1];
49+
msgpack_object_kv message_entry;
4850
*result_buffer = NULL;
4951
*result_size = 0;
52+
modified_data_buffer = NULL;
5053

51-
unpacker_offset = 0;
52-
msgpack_unpacked_init(&unpacked_buffer);
53-
result = msgpack_unpack_next(&unpacked_buffer,
54-
base_object_buffer,
55-
base_object_size,
56-
&unpacker_offset);
54+
if (message_key_name != NULL) {
55+
new_map_entries[0] = &message_entry;
5756

58-
if (result != MSGPACK_UNPACK_SUCCESS) {
59-
return -1;
60-
}
57+
message_entry.key.type = MSGPACK_OBJECT_STR;
58+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
59+
message_entry.key.via.str.ptr = message_key_name;
6160

62-
if (unpacker_offset != base_object_size) {
63-
msgpack_unpacked_destroy(&unpacked_buffer);
64-
return -2;
65-
}
61+
if (message_type == MSGPACK_OBJECT_BIN) {
62+
message_entry.val.type = MSGPACK_OBJECT_BIN;
63+
message_entry.val.via.bin.size = message_size;
64+
message_entry.val.via.bin.ptr = message_buffer;
65+
}
66+
else if (message_type == MSGPACK_OBJECT_STR) {
67+
message_entry.val.type = MSGPACK_OBJECT_STR;
68+
message_entry.val.via.str.size = message_size;
69+
message_entry.val.via.str.ptr = message_buffer;
70+
}
71+
else {
72+
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
73+
}
6674

67-
if (unpacked_buffer.data.type != MSGPACK_OBJECT_MAP) {
68-
msgpack_unpacked_destroy(&unpacked_buffer);
69-
return -3;
75+
if (result == FLB_MAP_NOT_MODIFIED) {
76+
result = flb_msgpack_expand_map(base_object_buffer,
77+
base_object_size,
78+
new_map_entries, 1,
79+
&modified_data_buffer,
80+
&modified_data_size);
81+
if (result == 0) {
82+
result = FLB_MAP_EXPAND_SUCCESS;
83+
}
84+
else {
85+
result = FLB_MAP_EXPANSION_ERROR;
86+
}
87+
}
7088
}
7189

72-
msgpack_sbuffer_init(&mp_sbuf);
73-
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
74-
75-
msgpack_pack_map(&mp_pck, unpacked_buffer.data.via.map.size + 1);
76-
77-
for (i = 0; i < unpacked_buffer.data.via.map.size; i++) {
78-
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].key);
79-
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].val);
90+
if (result == FLB_MAP_EXPAND_SUCCESS) {
91+
*result_buffer = modified_data_buffer;
92+
*result_size = modified_data_size;
8093
}
8194

82-
msgpack_pack_str(&mp_pck, flb_sds_len(raw_message_key_name));
83-
msgpack_pack_str_body(&mp_pck, raw_message_key_name, flb_sds_len(raw_message_key_name));
84-
msgpack_pack_str(&mp_pck, raw_message_size);
85-
msgpack_pack_str_body(&mp_pck, raw_message_buffer, raw_message_size);
86-
87-
*result_buffer = mp_sbuf.data;
88-
*result_size = mp_sbuf.size;
89-
90-
msgpack_unpacked_destroy(&unpacked_buffer);
9195
return result;
9296
}
9397

9498
static inline int pack_line(struct flb_syslog *ctx,
9599
struct flb_time *time,
100+
struct flb_connection *connection,
96101
char *data, size_t data_size,
97102
char *raw_data, size_t raw_data_size)
98103
{
99104
char *modified_data_buffer;
100105
size_t modified_data_size;
106+
char *appended_address_buffer;
107+
size_t appended_address_size;
101108
int result;
109+
char *source_address;
102110

111+
source_address = NULL;
103112
modified_data_buffer = NULL;
113+
appended_address_buffer = NULL;
104114

105115
if (ctx->raw_message_key != NULL) {
106-
result = append_raw_message_to_record_data(&modified_data_buffer,
107-
&modified_data_size,
108-
ctx->raw_message_key,
109-
data,
110-
data_size,
111-
raw_data,
112-
raw_data_size);
113-
114-
if (result != 0) {
115-
flb_plg_debug(ctx->ins, "error appending raw message : %d", result);
116+
result = append_message_to_record_data(&modified_data_buffer,
117+
&modified_data_size,
118+
ctx->raw_message_key,
119+
data,
120+
data_size,
121+
raw_data,
122+
raw_data_size,
123+
MSGPACK_OBJECT_BIN);
124+
125+
if (result == FLB_MAP_EXPANSION_ERROR) {
126+
flb_plg_debug(ctx->ins, "error expanding raw message : %d", result);
127+
}
128+
}
129+
130+
if (ctx->source_address_key != NULL) {
131+
source_address = flb_connection_get_remote_address(connection);
132+
if (source_address != NULL) {
133+
if (modified_data_buffer != NULL) {
134+
result = append_message_to_record_data(&appended_address_buffer,
135+
&appended_address_size,
136+
ctx->source_address_key,
137+
modified_data_buffer,
138+
modified_data_size,
139+
source_address,
140+
strlen(source_address),
141+
MSGPACK_OBJECT_STR);
142+
}
143+
else {
144+
result = append_message_to_record_data(&appended_address_buffer,
145+
&appended_address_size,
146+
ctx->source_address_key,
147+
data,
148+
data_size,
149+
source_address,
150+
strlen(source_address),
151+
MSGPACK_OBJECT_STR);
152+
}
153+
154+
if (result == FLB_MAP_EXPANSION_ERROR) {
155+
flb_plg_debug(ctx->ins, "error expanding source_address : %d", result);
156+
}
116157
}
117158
}
118159

@@ -123,7 +164,11 @@ static inline int pack_line(struct flb_syslog *ctx,
123164
}
124165

125166
if (result == FLB_EVENT_ENCODER_SUCCESS) {
126-
if (modified_data_buffer != NULL) {
167+
if (appended_address_buffer != NULL) {
168+
result = flb_log_event_encoder_set_body_from_raw_msgpack(
169+
ctx->log_encoder, appended_address_buffer, appended_address_size);
170+
}
171+
else if (modified_data_buffer != NULL) {
127172
result = flb_log_event_encoder_set_body_from_raw_msgpack(
128173
ctx->log_encoder, modified_data_buffer, modified_data_size);
129174
}
@@ -154,6 +199,9 @@ static inline int pack_line(struct flb_syslog *ctx,
154199
if (modified_data_buffer != NULL) {
155200
flb_free(modified_data_buffer);
156201
}
202+
if (appended_address_buffer != NULL) {
203+
flb_free(appended_address_buffer);
204+
}
157205

158206
return result;
159207
}
@@ -210,6 +258,7 @@ int syslog_prot_process(struct syslog_conn *conn)
210258
flb_time_get(&out_time);
211259
}
212260
pack_line(ctx, &out_time,
261+
conn->connection,
213262
out_buf, out_size,
214263
p, len);
215264
flb_free(out_buf);
@@ -235,12 +284,21 @@ int syslog_prot_process(struct syslog_conn *conn)
235284
return 0;
236285
}
237286

238-
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
287+
int syslog_prot_process_udp(struct syslog_conn *conn)
239288
{
240289
int ret;
241290
void *out_buf;
242291
size_t out_size;
243292
struct flb_time out_time = {0};
293+
char *buf;
294+
size_t size;
295+
struct flb_syslog *ctx;
296+
struct flb_connection *connection;
297+
298+
buf = conn->buf_data;
299+
size = conn->buf_len;
300+
ctx = conn->ctx;
301+
connection = conn->connection;
244302

245303
ret = flb_parser_do(ctx->parser, buf, size,
246304
&out_buf, &out_size, &out_time);
@@ -249,6 +307,7 @@ int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
249307
flb_time_get(&out_time);
250308
}
251309
pack_line(ctx, &out_time,
310+
connection,
252311
out_buf, out_size,
253312
buf, size);
254313
flb_free(out_buf);

plugins/in_syslog/syslog_prot.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424

2525
#include "syslog.h"
2626

27+
#define FLB_MAP_EXPAND_SUCCESS 0
28+
#define FLB_MAP_NOT_MODIFIED -1
29+
#define FLB_MAP_EXPANSION_ERROR -2
30+
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
31+
2732
int syslog_prot_process(struct syslog_conn *conn);
28-
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx);
33+
int syslog_prot_process_udp(struct syslog_conn *conn);
2934

3035
#endif

0 commit comments

Comments
 (0)