Skip to content

Commit 0760f79

Browse files
committed
TCP input TLS - Optimize TlsDecoder.
This also brings new api for `DecodeBuffer` and simplification of IpfixDecoder.
1 parent 90e7aad commit 0760f79

File tree

11 files changed

+135
-171
lines changed

11 files changed

+135
-171
lines changed

src/plugins/input/tcp/src/DecodeBuffer.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include <ipfixcol2.h> // fds_ipfix_msg_hdr
1919

20+
#include "read_until_n.hpp"
21+
2022
namespace tcp_in {
2123

2224
void DecodeBuffer::read_from(const uint8_t *data, size_t size) {
@@ -36,6 +38,18 @@ void DecodeBuffer::read_from(const uint8_t *data, size_t size) {
3638
}
3739
}
3840

41+
void DecodeBuffer::read_from(Reader &reader) {
42+
while (!enough_data()) {
43+
if (!read_header(reader)) {
44+
break;
45+
}
46+
47+
if (!read_body(reader)) {
48+
break;
49+
}
50+
}
51+
}
52+
3953
void DecodeBuffer::read_from(
4054
const uint8_t *data,
4155
size_t buffer_size,
@@ -63,14 +77,15 @@ void DecodeBuffer::signal_eof() {
6377
m_eof_reached = true;
6478
}
6579

66-
bool DecodeBuffer::read_header(const uint8_t **data, size_t *size) {
80+
template<typename... State>
81+
bool DecodeBuffer::read_header(State &&...state) {
6782
if (m_decoded_size != 0) {
6883
// The header is already read, but the message body is incomplete.
6984
return true;
7085
}
7186

7287
// Read the header.
73-
if (!read_until_n(sizeof(fds_ipfix_msg_hdr), data, size)) {
88+
if (!read_until_n(sizeof(fds_ipfix_msg_hdr), std::forward<State>(state)...)) {
7489
// There is not enough data to read the whole header.
7590
return false;
7691
}
@@ -86,9 +101,10 @@ bool DecodeBuffer::read_header(const uint8_t **data, size_t *size) {
86101
return true;
87102
}
88103

89-
bool DecodeBuffer::read_body(const uint8_t **data, size_t *size) {
104+
template<typename... State>
105+
bool DecodeBuffer::read_body(State &&...state) {
90106
// Read the body
91-
if (!read_until_n(m_decoded_size, data, size)) {
107+
if (!read_until_n(m_decoded_size, std::forward<State>(state)...)) {
92108
// There is not enough data to read the whole body.
93109
return false;
94110
}
@@ -107,6 +123,10 @@ bool DecodeBuffer::read_until_n(size_t n, const uint8_t **data, size_t *data_len
107123
return m_part_decoded.size() == n;
108124
}
109125

126+
bool DecodeBuffer::read_until_n(size_t n, Reader &reader) {
127+
return ::read_until_n(n, reader, m_part_decoded, *this);
128+
}
129+
110130
size_t DecodeBuffer::read_min(const uint8_t *data, size_t size1, size_t size2) {
111131
auto size = std::min(size1, size2);
112132
auto filled = m_part_decoded.size();

src/plugins/input/tcp/src/DecodeBuffer.hpp

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111
#pragma once
1212

13-
#include <vector> // std::vector
1413
#include <cstdint> // uint8_t, UINT16_MAX
1514
#include <cstddef> // size_t
15+
#include <vector> // std::vector
1616

1717
#include "ByteVector.hpp" // ByteVector
18+
#include "Reader.hpp"
1819

1920
namespace tcp_in {
2021

@@ -60,6 +61,12 @@ class DecodeBuffer {
6061
*/
6162
void read_from(const uint8_t *data, size_t size);
6263

64+
/**
65+
* @brief Read data using generic reader function.
66+
* @param reader generic reader function.
67+
*/
68+
void read_from(Reader &reader);
69+
6370
/**
6471
* @brief Copies IPFIX data from circular buffer.
6572
*
@@ -104,18 +111,18 @@ class DecodeBuffer {
104111
private:
105112
/**
106113
* @brief Reads the length from ipfix header to `m_decoded_size`.
107-
* @param[in,out] data Pointer to data. It is modified by the amount of read data.
108-
* @param[in,out] size Size of the data. It is modified by the amount of read data.
114+
* @param state state of the reader. This may be anything for which is read_unitl_n implemented.
109115
* @return `true` if the whole header could be read.
110116
*/
111-
bool read_header(const uint8_t **data, size_t *size);
117+
template<typename... State>
118+
bool read_header(State &&...state);
112119
/**
113120
* @brief Reads the body of ipfix message.
114-
* @param data Pointer to data. It is modified by the amount of read data.
115-
* @param size Size of the data. It is modified by the amount of read data.
121+
* @param state state of the reader. This may be anything for which is read_unitl_n implemented.
116122
* @return `true` if the whole header could be read.
117123
*/
118-
bool read_body(const uint8_t **data, size_t *size);
124+
template<typename... State>
125+
bool read_body(State &&...state);
119126
/**
120127
* @brief Reads to `m_part_decoded` until it reaches length of `n`.
121128
* @param n Target length for `m_part_decoded`.
@@ -124,6 +131,13 @@ class DecodeBuffer {
124131
* @return `true` if `m_part_decoded` has reached length of `n`.
125132
*/
126133
bool read_until_n(size_t n, const uint8_t **data, size_t *data_len);
134+
/**
135+
* @brief Reads to `m_part_decoded` until it reaches length of `n`.
136+
* @param n Target length for `m_part_decoded`.
137+
* @param reader Generic reader from which to read.
138+
* @return `true` if `m_part_decoded` has reached length of `n`.
139+
*/
140+
bool read_until_n(size_t n, Reader &reader);
127141
/**
128142
* @brief Adds N elements from `src` to `m_part_decoded` where N is the smaller of the two
129143
* sizes.

src/plugins/input/tcp/src/IpfixDecoder.cpp

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -10,77 +10,16 @@
1010

1111
#include "IpfixDecoder.hpp"
1212

13-
#include <stdexcept> // runtime_error
14-
#include <string> // string
15-
#include <errno.h> // errno, EWOULDBLOCK, EAGAIN
16-
#include <stddef.h> // size_t
17-
18-
#include <unistd.h> // read
19-
20-
#include <ipfixcol2.h> // fds_ipfix_msg_hdr, ipx_strerror
21-
2213
#include "DecodeBuffer.hpp" // DecodeBuffer
23-
#include "read_until_n.hpp" // read_until_n
14+
#include "Reader.hpp"
2415

2516
namespace tcp_in {
2617

2718
DecodeBuffer &IpfixDecoder::decode() {
28-
while (!m_decoded.enough_data()) {
29-
// Read the header of the message.
30-
if (!read_header()) {
31-
// There is not enough data to read the whole header.
32-
break;
33-
}
34-
35-
// Read the body of the message.
36-
if (!read_body()) {
37-
// There is not enough data to read the whole body of the message.
38-
break;
39-
}
40-
}
41-
42-
if (m_decoded.is_eof_reached() && m_part_readed.size() != 0) {
43-
throw std::runtime_error("Received incomplete message.");
44-
}
45-
19+
auto reader = tcp_reader(m_fd);
20+
m_decoded.read_from(reader);
4621
return m_decoded;
4722
}
4823

49-
bool IpfixDecoder::read_header() {
50-
if (m_msg_size != 0) {
51-
// The header is already read, but the message body is incomplete.
52-
return true;
53-
}
54-
55-
// Read the header
56-
if (!read_until_n(sizeof(fds_ipfix_msg_hdr))) {
57-
// There is not enough data to read the whole header.
58-
return false;
59-
}
60-
61-
// The header has been read, load the size of the message.
62-
auto hdr = reinterpret_cast<fds_ipfix_msg_hdr *>(m_part_readed.data());
63-
m_msg_size = ntohs(hdr->length);
64-
return true;
65-
}
66-
67-
bool IpfixDecoder::read_body() {
68-
// Read the body.
69-
if (!read_until_n(m_msg_size)) {
70-
// There is not enough data to read the whole body
71-
return false;
72-
}
73-
74-
// Whole message has been read. Add it to the decode buffer.
75-
m_decoded.add(std::move(m_part_readed));
76-
m_part_readed = ByteVector();
77-
m_msg_size = 0;
78-
return true;
79-
}
80-
81-
bool IpfixDecoder::read_until_n(size_t n) {
82-
return ::read_until_n(n, m_fd, m_part_readed, m_decoded);
83-
}
84-
8524
} // namespace tcp_in
8625

src/plugins/input/tcp/src/IpfixDecoder.hpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
#include "Decoder.hpp" // Decoder
1717
#include "DecodeBuffer.hpp" // DecodeBuffer
18-
#include "ByteVector.hpp" // ByteVector
1918

2019
namespace tcp_in {
2120

@@ -29,7 +28,7 @@ class IpfixDecoder : public Decoder {
2928
* @brief Creates ipfix decoder.
3029
* @param fd TCP connection file descriptor.
3130
*/
32-
IpfixDecoder(int fd) : m_fd(fd), m_decoded(), m_part_readed(), m_msg_size(0) {}
31+
IpfixDecoder(int fd) : m_fd(fd), m_decoded() {}
3332

3433
virtual DecodeBuffer &decode() override;
3534

@@ -38,19 +37,8 @@ class IpfixDecoder : public Decoder {
3837
}
3938

4039
private:
41-
/** returns true if there was enough data to read the header or if the header is already read */
42-
bool read_header();
43-
/** returns true if there was enough data to read the body */
44-
bool read_body();
45-
/** returns true if there was enough data to read to the given amount */
46-
bool read_until_n(size_t n);
47-
4840
int m_fd;
4941
DecodeBuffer m_decoded;
50-
51-
ByteVector m_part_readed;
52-
/** Final size of the whole IPFIX message. When 0 IPIFX header is not fully read. */
53-
size_t m_msg_size;
5442
};
5543

5644
} // namespace tcp_in

src/plugins/input/tcp/src/Lz4Decoder.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ void Lz4Decoder::decompress() {
177177
}
178178

179179
bool Lz4Decoder::read_until_n(size_t n) {
180-
return ::read_until_n(n, m_fd, m_compressed, m_decoded);
180+
auto reader = tcp_reader(m_fd);
181+
return ::read_until_n(n, reader, m_compressed, m_decoded);
181182
}
182183

183184
void Lz4Decoder::reset_stream(size_t buffer_size) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Generic reader function. (header file)
5+
* \date 2025
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <cerrno>
14+
#include <cstdint>
15+
#include <functional>
16+
#include <stdexcept>
17+
18+
#include <unistd.h>
19+
#include <sys/socket.h>
20+
21+
#include <ipfixcol2.h>
22+
23+
/** Describes result of read operation. */
24+
enum class ReadResult {
25+
/** Successufully read some data. */
26+
READ,
27+
/** Non blocking socket needs to wait for more data. */
28+
WAIT,
29+
/** Connection has ended. */
30+
END,
31+
};
32+
33+
using Reader = std::function<ReadResult(std::uint8_t *data, std::size_t &length)>;
34+
35+
static inline Reader tcp_reader(int fd) {
36+
return [=](std::uint8_t *data, std::size_t &length) -> ReadResult {
37+
int res = recv(fd, data, length, 0);
38+
if (res == -1) {
39+
length = 0;
40+
int err = errno;
41+
if (err == EWOULDBLOCK || err == EAGAIN) {
42+
return ReadResult::WAIT;
43+
}
44+
const char *err_str;
45+
ipx_strerror(err, err_str);
46+
throw std::runtime_error("Failed to read from descruptor " + std::string(err_str));
47+
}
48+
49+
length = std::size_t(res);
50+
return length == 0 ? ReadResult::END : ReadResult::READ;
51+
};
52+
}

src/plugins/input/tcp/src/read_until_n.hpp

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,59 +10,44 @@
1010

1111
#pragma once
1212

13-
#include <cstddef> // size_t
1413
#include <cerrno> // errno, EWOULDBLOCK, EAGAIN
15-
#include <string> // std::string
14+
#include <cstddef> // size_t
1615
#include <stdexcept> // std::runtime_error
17-
18-
#include <sys/socket.h> // recv
16+
#include <string> // std::string
1917

2018
#include <ipfixcol2.h> // ipx_strerror
2119

20+
#include "Reader.hpp"
21+
2222
/**
2323
* @brief Reads to vector from file descriptor until the vector has the desired number of bytes
2424
*
2525
* @tparam Vec vector like object
2626
* @tparam EofSig `EofSig::signal_eof()` is called when eof is reached
2727
* @param n number of bytes that should be in the vector
28-
* @param fd file descriptor to read from
28+
* @param reader generic read function
2929
* @param result where to read the bytes to
3030
* @param eofSignaler has method to signal eof
3131
* @return true if the vector has at least `n` bytes after the call
3232
* @throws when fails to read from the file descriptor
3333
*/
3434
template<typename Vec, typename EofSig>
35-
bool read_until_n(size_t n, int fd, Vec &result, EofSig &eofSignaler) {
35+
bool read_until_n(size_t n, Reader &reader, Vec &result, EofSig &eofSignaler) {
3636
auto filled = result.size();
3737
if (filled >= n) {
3838
return true;
3939
}
4040

41-
auto remaining = n - filled;
4241
result.resize(n);
4342

44-
int res = recv(fd, result.data() + filled, remaining, 0);
45-
if (res == -1) {
46-
result.resize(filled);
47-
int err = errno;
48-
if (err == EWOULDBLOCK || err == EAGAIN) {
49-
return false;
50-
}
51-
52-
const char *err_str;
53-
ipx_strerror(err, err_str);
54-
throw std::runtime_error("Failed to read from descriptor: " + std::string(err_str));
55-
}
43+
auto remaining = n - filled;
44+
auto res = reader(result.data() + filled, remaining);
5645

57-
result.resize(filled + res);
46+
result.resize(filled + remaining);
5847

59-
if (res == 0) {
48+
if (res == ReadResult::END) {
6049
eofSignaler.signal_eof();
6150
}
6251

63-
if (static_cast<size_t>(res) != remaining) {
64-
return false;
65-
}
66-
67-
return true;
52+
return result.size() == n;
6853
}

0 commit comments

Comments
 (0)