Skip to content

Commit 0b6080a

Browse files
committed
JSON output: add an exporter IP address if detailedInfo is enabled
Based on a pull request from norrisjeremy (not merged due to collision with another pull request)
1 parent cfffb01 commit 0b6080a

File tree

2 files changed

+106
-49
lines changed

2 files changed

+106
-49
lines changed

src/plugins/output/json/src/Storage.cpp

Lines changed: 93 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,53 @@ Storage::output_add(Output *output)
146146
m_outputs.push_back(output);
147147
}
148148

149+
/**
150+
* \brief Get IP address from Transport Session
151+
*
152+
* \note Not all Transport Session contains an IPv4/IPv6 address (for example, file type)
153+
* \param[in] ipx_desc Transport Session description
154+
* \param[out] src_addr Conversion buffer for IPv4/IPv6 address converted to string
155+
* \param[int] size Size of the conversion buffer
156+
* \return On success returns a pointer to the buffer. Otherwise returns nullptr.
157+
*/
158+
const char *
159+
Storage::session_src_addr(const struct ipx_session *ipx_desc, char *src_addr, socklen_t size)
160+
{
161+
const struct ipx_session_net *net_desc;
162+
switch (ipx_desc->type) {
163+
case FDS_SESSION_UDP:
164+
net_desc = &ipx_desc->udp.net;
165+
break;
166+
case FDS_SESSION_TCP:
167+
net_desc = &ipx_desc->tcp.net;
168+
break;
169+
case FDS_SESSION_SCTP:
170+
net_desc = &ipx_desc->sctp.net;
171+
break;
172+
default:
173+
return nullptr;
174+
}
175+
176+
const char *ret;
177+
if (net_desc->l3_proto == AF_INET) {
178+
ret = inet_ntop(AF_INET, &net_desc->addr_src.ipv4, src_addr, size);
179+
} else {
180+
ret = inet_ntop(AF_INET6, &net_desc->addr_src.ipv6, src_addr, size);
181+
}
182+
183+
return ret;
184+
}
185+
149186
/**
150187
* \brief Convert template record to JSON string
151188
*
152-
* \param[in] tset_iter (Options) Template set structure to convert
153-
* \param[in] set_id Id of the set
154-
* \param[in] hdr Message header of IPFIX record
189+
* \param[in] tset_iter (Options) Template Set structure to convert
190+
* \param[in] set_id Id of the Template Set
191+
* \param[in] hdr Message header of IPFIX record
155192
* \throw runtime_error If template parser failed
156193
*/
157194
void
158-
Storage::convert_tmplt_rec(struct fds_tset_iter *tset_iter, uint16_t set_id, fds_ipfix_msg_hdr* hdr)
195+
Storage::convert_tmplt_rec(struct fds_tset_iter *tset_iter, uint16_t set_id, const struct fds_ipfix_msg_hdr *hdr)
159196
{
160197
enum fds_template_type type;
161198
void *ptr;
@@ -222,70 +259,77 @@ Storage::convert_tmplt_rec(struct fds_tset_iter *tset_iter, uint16_t set_id, fds
222259
* From all sets in the Message, try to convert just Template and Options template sets.
223260
* \param[in] set All sets in the Message
224261
* \param[in] hdr Message header of IPFIX record
262+
* \return #IPX_OK on success
263+
* \return #IPX_ERR_DENIED if an output fails to store any record
225264
*/
226-
void
227-
Storage::convert_set(struct ipx_ipfix_set *set, fds_ipfix_msg_hdr* hdr)
265+
int
266+
Storage::convert_tset(struct ipx_ipfix_set *set, const struct fds_ipfix_msg_hdr *hdr)
228267
{
229-
230-
bool flush = false;
231-
int ret = IPX_OK;
232268
uint16_t set_id = ntohs(set->ptr->flowset_id);
233-
if (set_id == FDS_IPFIX_SET_TMPLT || set_id == FDS_IPFIX_SET_OPTS_TMPLT) {
234-
235-
// Template set
236-
struct fds_tset_iter tset_iter;
237-
fds_tset_iter_init(&tset_iter, set->ptr);
238-
239-
// Iteration through all templates in the set
240-
while (fds_tset_iter_next(&tset_iter) == FDS_OK) {
241-
flush = true;
269+
assert(set_id == FDS_IPFIX_SET_TMPLT || set_id == FDS_IPFIX_SET_OPTS_TMPLT);
242270

243-
// Read and print single template
244-
convert_tmplt_rec(&tset_iter, set_id, hdr);
271+
// Template set
272+
struct fds_tset_iter tset_iter;
273+
fds_tset_iter_init(&tset_iter, set->ptr);
245274

246-
// Store it
247-
for (Output *output : m_outputs) {
248-
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
249-
ret = IPX_ERR_DENIED;
250-
goto endloop;
251-
}
252-
}
275+
// Iteration through all (Options) Templates in the Set
276+
while (fds_tset_iter_next(&tset_iter) == FDS_OK) {
277+
// Read and print single template
278+
convert_tmplt_rec(&tset_iter, set_id, hdr);
253279

254-
// Buffer is empty
255-
m_record.size_used = 0;
256-
}
257-
}
258-
endloop:
259-
if (flush) {
280+
// Store it
260281
for (Output *output : m_outputs) {
261-
output->flush();
282+
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
283+
return IPX_ERR_DENIED;
284+
}
262285
}
286+
287+
// Buffer is empty
288+
m_record.size_used = 0;
263289
}
290+
291+
return IPX_OK;
264292
}
265293

266294
int
267295
Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
268296
{
269-
// Message header
270-
auto hdr = (fds_ipfix_msg_hdr*) ipx_msg_ipfix_get_packet(msg);
297+
const auto hdr = (fds_ipfix_msg_hdr*) ipx_msg_ipfix_get_packet(msg);
298+
const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
299+
bool flush = false;
300+
int ret = IPX_OK;
301+
302+
// Extract IPv4/IPv6 address of the exporter, if required
303+
m_src_addr = nullptr;
304+
char src_addr[INET6_ADDRSTRLEN];
305+
if (m_format.detailed_info) {
306+
const struct ipx_msg_ctx *msg_ctx = ipx_msg_ipfix_get_ctx(msg);
307+
m_src_addr = session_src_addr(msg_ctx->session, src_addr, INET6_ADDRSTRLEN);
308+
}
271309

272-
// Process template records if enabled
310+
// Process (Options) Template records if enabled
273311
if (m_format.template_info) {
274312
struct ipx_ipfix_set *sets;
275313
size_t set_cnt;
276314
ipx_msg_ipfix_get_sets(msg, &sets, &set_cnt);
277315

278316
// Iteration through all sets
279317
for (uint32_t i = 0; i < set_cnt; i++) {
280-
convert_set(&sets[i], hdr);
318+
uint16_t set_id = ntohs(sets[i].ptr->flowset_id);
319+
if (set_id != FDS_IPFIX_SET_TMPLT && set_id != FDS_IPFIX_SET_OPTS_TMPLT) {
320+
// Skip non-template sets
321+
continue;
322+
}
323+
324+
flush = true;
325+
if (convert_tset(&sets[i], hdr) != IPX_OK) {
326+
ret = IPX_ERR_DENIED;
327+
goto endloop;
328+
}
281329
}
282330
}
283331

284332
// Process all data records
285-
const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
286-
bool flush = false;
287-
int ret = IPX_OK;
288-
289333
for (uint32_t i = 0; i < rec_cnt; ++i) {
290334
ipx_ipfix_record *ipfix_rec = ipx_msg_ipfix_get_drec(msg, i);
291335

@@ -342,7 +386,7 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
342386
* @param[in] hdr Message header of IPFIX record
343387
*/
344388
void
345-
Storage::addDetailedInfo(fds_ipfix_msg_hdr *hdr)
389+
Storage::addDetailedInfo(const struct fds_ipfix_msg_hdr *hdr)
346390
{
347391
// Array for formatting detailed info fields
348392
char field[LOCAL_BSIZE];
@@ -357,6 +401,12 @@ Storage::addDetailedInfo(fds_ipfix_msg_hdr *hdr)
357401

358402
snprintf(field, LOCAL_BSIZE, ",\"ipfix:msgLength\":%" PRIu16, ntohs(hdr->length));
359403
buffer_append(field);
404+
405+
if (m_src_addr) {
406+
buffer_append(",\"ipfix:srcAddr\":\"");
407+
buffer_append(m_src_addr);
408+
buffer_append("\"");
409+
}
360410
}
361411

362412
/**

src/plugins/output/json/src/Storage.hpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
#include <string>
4646
#include <vector>
47+
#include <arpa/inet.h>
4748
#include <ipfixcol2.h>
4849
#include "Config.hpp"
4950

@@ -75,6 +76,9 @@ class Output {
7576
virtual int
7677
process(const char *str, size_t len) = 0;
7778

79+
/**
80+
* \brief Flush buffered records
81+
*/
7882
virtual void
7983
flush() {};
8084
};
@@ -90,6 +94,8 @@ class Storage {
9094
struct cfg_format m_format;
9195
/** Conversion flags for libfds converter */
9296
uint32_t m_flags;
97+
/** IPv4/IPv6 exporter address of the current message (can be nullptr) */
98+
const char *m_src_addr = nullptr;
9399

94100
struct {
95101
char *buffer;
@@ -111,12 +117,13 @@ class Storage {
111117
// Reserve memory for a JSON string
112118
void buffer_reserve(size_t n);
113119
// Convert set to JSON string
114-
void convert_set(struct ipx_ipfix_set *set, fds_ipfix_msg_hdr* hdr);
120+
int convert_tset(struct ipx_ipfix_set *set, const struct fds_ipfix_msg_hdr *hdr);
115121
// Convert template record to a JSON string
116-
void convert_tmplt_rec(struct fds_tset_iter *tset_iter, uint16_t set_id, fds_ipfix_msg_hdr* hdr);
122+
void convert_tmplt_rec(struct fds_tset_iter *tset_iter, uint16_t set_id, const struct fds_ipfix_msg_hdr *hdr);
117123
// Add detailed info (templateId, ODID, seqNum, exportTime) to JSON string
118-
void addDetailedInfo(fds_ipfix_msg_hdr *hdr);
119-
124+
void addDetailedInfo(const struct fds_ipfix_msg_hdr *hdr);
125+
// Get src_addr from IPFIX session
126+
static const char *session_src_addr(const struct ipx_session *ipx_desc, char *src_addr, socklen_t size);
120127
public:
121128
/**
122129
* \brief Constructor
@@ -142,8 +149,8 @@ class Storage {
142149
* \brief Process IPFIX Message records
143150
*
144151
* For each record perform conversion to JSON and pass it to all output instances.
145-
* \param[in] msg IPFIX Message to convert
146-
* \param[in] mgr Information Element manager (can be NULL)
152+
* \param[in] msg IPFIX Message to convert
153+
* \param[in] iemgr Information Element manager (can be NULL)
147154
* \return #IPX_OK on success
148155
* \return #IPX_ERR_DENIED if a fatal error has occurred and the storage cannot continue to
149156
* work properly!

0 commit comments

Comments
 (0)