Skip to content

Commit 7b0070f

Browse files
committed
TCP input - Fix DecodeBuffer.
I'm not sure what the issue was, but after this rewrite it properly decodes IPFIX messages and the code structure is simmilar to `IpfixDecoder`.
1 parent c75373c commit 7b0070f

File tree

2 files changed

+62
-34
lines changed

2 files changed

+62
-34
lines changed

src/plugins/input/tcp/src/DecodeBuffer.cpp

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,16 @@ void DecodeBuffer::read_from(const uint8_t *data, size_t size) {
2323
// read until there is something to read
2424
while (size) {
2525
// get the message size
26-
if (m_part_decoded.size() < sizeof(struct fds_ipfix_msg_hdr)) {
27-
auto new_data = read_header(data, size);
28-
size -= new_data - data;
29-
data = new_data;
26+
if (!read_header(&data, &size)) {
27+
// There is not enough data to read the whole header.
28+
break;
3029
}
3130

32-
if (size == 0) {
33-
return;
31+
// Read the body of the message.
32+
if (!read_body(&data, &size)) {
33+
// There is not enough data to read the whole body of the message.
34+
break;
3435
}
35-
36-
auto remaining = m_decoded_size - m_part_decoded.size();
37-
38-
if (read_min(data, size, remaining) != remaining) {
39-
// all data is readed
40-
return;
41-
}
42-
43-
data += remaining;
44-
size -= remaining;
45-
46-
add(std::move(m_part_decoded));
47-
m_part_decoded = ByteVector();
4836
}
4937
}
5038

@@ -75,26 +63,49 @@ void DecodeBuffer::signal_eof() {
7563
m_eof_reached = true;
7664
}
7765

78-
const uint8_t *DecodeBuffer::read_header(const uint8_t *data, size_t size) {
79-
constexpr size_t HDR_SIZE = sizeof(fds_ipfix_msg_hdr);
66+
bool DecodeBuffer::read_header(const uint8_t **data, size_t *size) {
67+
if (m_decoded_size != 0) {
68+
// The header is already read, but the message body is incomplete.
69+
return true;
70+
}
8071

81-
auto filled = m_part_decoded.size();
82-
auto remaining = 0;
83-
if (filled < HDR_SIZE) {
84-
remaining = HDR_SIZE - filled;
85-
if (read_min(data, size, remaining) == size) {
86-
// not enough data for the whole header
87-
return data + size;
88-
}
72+
// Read the header.
73+
if (!read_until_n(sizeof(fds_ipfix_msg_hdr), data, size)) {
74+
// There is not enough data to read the whole header.
75+
return false;
8976
}
9077

78+
// The header has been successfully read. Load the size of the message.
9179
auto hdr = reinterpret_cast<const fds_ipfix_msg_hdr *>(m_part_decoded.data());
9280
m_decoded_size = ntohs(hdr->length);
9381

94-
m_part_decoded.reserve(m_decoded_size);
95-
return data + remaining;
82+
if (m_decoded_size < sizeof(fds_ipfix_msg_hdr)) {
83+
throw std::runtime_error("Invalid IPFIX message header size.");
84+
}
85+
86+
return true;
9687
}
9788

89+
bool DecodeBuffer::read_body(const uint8_t **data, size_t *size) {
90+
// Read the body
91+
if (!read_until_n(m_decoded_size, data, size)) {
92+
// There is not enough data to read the whole body.
93+
return false;
94+
}
95+
96+
m_decoded.push_back(std::move(m_part_decoded));
97+
m_part_decoded = ByteVector();
98+
m_decoded_size = 0;
99+
return true;
100+
}
101+
102+
bool DecodeBuffer::read_until_n(size_t n, const uint8_t **data, size_t *data_len) {
103+
m_part_decoded.reserve(n);
104+
auto cnt = read_min(*data, n, *data_len);
105+
*data_len -= cnt;
106+
*data += cnt;
107+
return m_part_decoded.size() == n;
108+
}
98109

99110
size_t DecodeBuffer::read_min(const uint8_t *data, size_t size1, size_t size2) {
100111
auto size = std::min(size1, size2);

src/plugins/input/tcp/src/DecodeBuffer.hpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,27 @@ class DecodeBuffer {
103103

104104
private:
105105
/**
106-
* @brief reads the length from ipfix header to `m_decoded_size`
107-
* @return Pointer to the first unreaded byte from `data`
106+
* @brief Reads the length from ipfix header to `m_decoded_size`.
107+
* @param[in,out] data Pointer to data. It is modified by the amount of read data.
108+
* @param[in,out] size Size of the data. It is modified by the amount of read data.
109+
* @return `true` if the whole header could be read.
108110
*/
109-
const uint8_t *read_header(const uint8_t *data, size_t size);
111+
bool read_header(const uint8_t **data, size_t *size);
112+
/**
113+
* @brief Reads the body of ipfix message.
114+
* @param data Pointer to data. It is modified by the amount of read data.
115+
* @param size Size of the data. It is modified by the amount of read data.
116+
* @return `true` if the whole header could be read.
117+
*/
118+
bool read_body(const uint8_t **data, size_t *size);
119+
/**
120+
* @brief Reads to `m_part_decoded` until it reaches length of `n`.
121+
* @param n Target length for `m_part_decoded`.
122+
* @param[in,out] data Data to read from. It is modified by the read amount.
123+
* @param[in,out] data_len Length of data to read. It is modified by the read amount.
124+
* @return `true` if `m_part_decoded` has reached length of `n`.
125+
*/
126+
bool read_until_n(size_t n, const uint8_t **data, size_t *data_len);
110127
/**
111128
* @brief Adds N elements from `src` to `m_part_decoded` where N is the smaller of the two
112129
* sizes.

0 commit comments

Comments
 (0)