Skip to content

Commit c93f1ca

Browse files
committed
TCP input TLS - Optimize TlsDecoder.
This also brings new api for `DecodeBuffer` and simplification of IpfixDecoder.
1 parent 010e1cf commit c93f1ca

17 files changed

+312
-291
lines changed

src/plugins/input/tcp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ add_library(tcp-input MODULE
1313
src/Acceptor.cpp
1414
src/Plugin.cpp
1515
src/IpxPlugin.cpp
16+
src/TcpReader.cpp
17+
src/RingBufferReader.cpp
1618
src/tls/Ssl.cpp
1719
src/tls/TlsDecoder.cpp
1820
src/tls/DecoderFactory.cpp

src/plugins/input/tcp/src/DecodeBuffer.cpp

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,58 +19,34 @@
1919

2020
namespace tcp_in {
2121

22-
void DecodeBuffer::read_from(const uint8_t *data, size_t size) {
23-
// read until there is something to read
24-
while (size) {
25-
// get the message size
26-
if (!read_header(&data, &size)) {
27-
// There is not enough data to read the whole header.
22+
void DecodeBuffer::read_from(Reader &reader, std::size_t consume) {
23+
consume += m_total_bytes_decoded;
24+
while (!enough_data() || consume < m_total_bytes_decoded) {
25+
if (!read_header(reader)) {
2826
break;
2927
}
3028

31-
// Read the body of the message.
32-
if (!read_body(&data, &size)) {
33-
// There is not enough data to read the whole body of the message.
29+
if (!read_body(reader)) {
3430
break;
3531
}
3632
}
3733
}
3834

39-
void DecodeBuffer::read_from(
40-
const uint8_t *data,
41-
size_t buffer_size,
42-
size_t data_size,
43-
size_t position
44-
) {
45-
// read until wrap
46-
auto block = data + position;
47-
auto block_size = std::min(data_size, buffer_size - position);
48-
read_from(block, block_size);
49-
50-
if (block_size == data_size) {
51-
// there was no need to wrap
52-
return;
53-
}
54-
55-
// read the second block (after wrap)
56-
read_from(data, data_size - block_size);
57-
}
58-
5935
void DecodeBuffer::signal_eof() {
6036
if (m_part_decoded.size() != 0) {
6137
throw std::runtime_error("Received incomplete message.");
6238
}
6339
m_eof_reached = true;
6440
}
6541

66-
bool DecodeBuffer::read_header(const uint8_t **data, size_t *size) {
42+
bool DecodeBuffer::read_header(Reader &reader) {
6743
if (m_decoded_size != 0) {
6844
// The header is already read, but the message body is incomplete.
6945
return true;
7046
}
7147

7248
// Read the header.
73-
if (!read_until_n(sizeof(fds_ipfix_msg_hdr), data, size)) {
49+
if (!read_until_n(sizeof(fds_ipfix_msg_hdr), reader)) {
7450
// There is not enough data to read the whole header.
7551
return false;
7652
}
@@ -86,9 +62,9 @@ bool DecodeBuffer::read_header(const uint8_t **data, size_t *size) {
8662
return true;
8763
}
8864

89-
bool DecodeBuffer::read_body(const uint8_t **data, size_t *size) {
65+
bool DecodeBuffer::read_body(Reader &reader) {
9066
// Read the body
91-
if (!read_until_n(m_decoded_size, data, size)) {
67+
if (!read_until_n(m_decoded_size, reader)) {
9268
// There is not enough data to read the whole body.
9369
return false;
9470
}
@@ -99,22 +75,10 @@ bool DecodeBuffer::read_body(const uint8_t **data, size_t *size) {
9975
return true;
10076
}
10177

102-
bool DecodeBuffer::read_until_n(size_t n, const uint8_t **data, size_t *data_len) {
103-
m_part_decoded.reserve(n);
104-
auto cnt = read_min(*data, n, *data_len);
105-
*data_len -= cnt;
106-
*data += cnt;
78+
bool DecodeBuffer::read_until_n(size_t n, Reader &reader) {
79+
m_total_bytes_decoded += reader.read_until_n(n, m_part_decoded, *this);
10780
return m_part_decoded.size() == n;
10881
}
10982

110-
size_t DecodeBuffer::read_min(const uint8_t *data, size_t size1, size_t size2) {
111-
auto size = std::min(size1, size2);
112-
auto filled = m_part_decoded.size();
113-
m_part_decoded.resize(filled + size);
114-
std::copy_n(data, size, m_part_decoded.data() + filled);
115-
m_total_bytes_decoded += size;
116-
return size;
117-
}
118-
11983
} // namespace tcp_in
12084

src/plugins/input/tcp/src/DecodeBuffer.hpp

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111
#pragma once
1212

13-
#include <vector> // std::vector
1413
#include <cstdint> // uint8_t, UINT16_MAX
1514
#include <cstddef> // size_t
15+
#include <vector> // std::vector
1616

1717
#include "ByteVector.hpp" // ByteVector
18+
#include "Reader.hpp"
1819

1920
namespace tcp_in {
2021

@@ -50,28 +51,11 @@ class DecodeBuffer {
5051
}
5152

5253
/**
53-
* @brief Copies IPFIX data from buffer.
54-
*
55-
* The data may be any part of message (possibly incomplete or even multiple messages) but
56-
* multiple calls to this method must be with the message data in correct order so that it can
57-
* be reconstructed.
58-
* @param data data with the message
59-
* @param size size of the data in `data`
54+
* @brief Read data using generic reader function.
55+
* @param reader Generic reader function.
56+
* @param consume Minimum number of bytes that should be read from reader if possible.
6057
*/
61-
void read_from(const uint8_t *data, size_t size);
62-
63-
/**
64-
* @brief Copies IPFIX data from circular buffer.
65-
*
66-
* The data may be any part of message (possibly incomplete or even multiple messages) but
67-
* multiple calls to this metod must be with the message data in correct order so that it can be
68-
* reconstructed.
69-
* @param data data of the circullar buffer
70-
* @param buffer_size size of the circullar buffer (allocated space)
71-
* @param data_size size of data to copy from the buffer
72-
* @param position start position of the data in the buffer.
73-
*/
74-
void read_from(const uint8_t *data, size_t buffer_size, size_t data_size, size_t position);
58+
void read_from(Reader &reader, std::size_t consume = 0);
7559

7660
/**
7761
* @brief Checks whether enough data has been read since last time. When this returns true,
@@ -104,32 +88,23 @@ class DecodeBuffer {
10488
private:
10589
/**
10690
* @brief Reads the length from ipfix header to `m_decoded_size`.
107-
* @param[in,out] data Pointer to data. It is modified by the amount of read data.
108-
* @param[in,out] size Size of the data. It is modified by the amount of read data.
91+
* @param reader Reader from which to read.
10992
* @return `true` if the whole header could be read.
11093
*/
111-
bool read_header(const uint8_t **data, size_t *size);
94+
bool read_header(Reader &reader);
11295
/**
11396
* @brief Reads the body of ipfix message.
114-
* @param data Pointer to data. It is modified by the amount of read data.
115-
* @param size Size of the data. It is modified by the amount of read data.
97+
* @param reader Reader from which to read.
11698
* @return `true` if the whole header could be read.
11799
*/
118-
bool read_body(const uint8_t **data, size_t *size);
100+
bool read_body(Reader &reader);
119101
/**
120102
* @brief Reads to `m_part_decoded` until it reaches length of `n`.
121103
* @param n Target length for `m_part_decoded`.
122-
* @param[in,out] data Data to read from. It is modified by the read amount.
123-
* @param[in,out] data_len Length of data to read. It is modified by the read amount.
104+
* @param reader Generic reader from which to read.
124105
* @return `true` if `m_part_decoded` has reached length of `n`.
125106
*/
126-
bool read_until_n(size_t n, const uint8_t **data, size_t *data_len);
127-
/**
128-
* @brief Adds N elements from `src` to `m_part_decoded` where N is the smaller of the two
129-
* sizes.
130-
* @return The smaller of the two sizes.
131-
*/
132-
size_t read_min(const uint8_t *data, size_t size1, size_t size2);
107+
bool read_until_n(size_t n, Reader &reader);
133108

134109
/** number of bytes readed since last call to `get_decoded` */
135110
size_t m_total_bytes_decoded;

src/plugins/input/tcp/src/IpfixDecoder.cpp

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -10,74 +10,13 @@
1010

1111
#include "IpfixDecoder.hpp"
1212

13-
#include <stdexcept> // runtime_error
14-
#include <string> // string
15-
#include <errno.h> // errno, EWOULDBLOCK, EAGAIN
16-
#include <stddef.h> // size_t
17-
18-
#include <unistd.h> // read
19-
20-
#include <ipfixcol2.h> // fds_ipfix_msg_hdr, ipx_strerror
21-
2213
#include "DecodeBuffer.hpp" // DecodeBuffer
23-
#include "read_until_n.hpp" // read_until_n
14+
#include "Reader.hpp"
2415

2516
namespace tcp_in {
2617

2718
void IpfixDecoder::progress() {
28-
while (!m_decoded.enough_data()) {
29-
// Read the header of the message.
30-
if (!read_header()) {
31-
// There is not enough data to read the whole header.
32-
break;
33-
}
34-
35-
// Read the body of the message.
36-
if (!read_body()) {
37-
// There is not enough data to read the whole body of the message.
38-
break;
39-
}
40-
}
41-
42-
if (m_decoded.is_eof_reached() && m_part_readed.size() != 0) {
43-
throw std::runtime_error("Received incomplete message.");
44-
}
45-
}
46-
47-
bool IpfixDecoder::read_header() {
48-
if (m_msg_size != 0) {
49-
// The header is already read, but the message body is incomplete.
50-
return true;
51-
}
52-
53-
// Read the header
54-
if (!read_until_n(sizeof(fds_ipfix_msg_hdr))) {
55-
// There is not enough data to read the whole header.
56-
return false;
57-
}
58-
59-
// The header has been read, load the size of the message.
60-
auto hdr = reinterpret_cast<fds_ipfix_msg_hdr *>(m_part_readed.data());
61-
m_msg_size = ntohs(hdr->length);
62-
return true;
63-
}
64-
65-
bool IpfixDecoder::read_body() {
66-
// Read the body.
67-
if (!read_until_n(m_msg_size)) {
68-
// There is not enough data to read the whole body
69-
return false;
70-
}
71-
72-
// Whole message has been read. Add it to the decode buffer.
73-
m_decoded.add(std::move(m_part_readed));
74-
m_part_readed = ByteVector();
75-
m_msg_size = 0;
76-
return true;
77-
}
78-
79-
bool IpfixDecoder::read_until_n(size_t n) {
80-
return ::read_until_n(n, m_fd, m_part_readed, m_decoded);
19+
m_decoded.read_from(m_reader);
8120
}
8221

8322
} // namespace tcp_in

src/plugins/input/tcp/src/IpfixDecoder.hpp

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
#include "Decoder.hpp" // Decoder
1717
#include "DecodeBuffer.hpp" // DecodeBuffer
18-
#include "ByteVector.hpp" // ByteVector
18+
#include "TcpReader.hpp"
1919

2020
namespace tcp_in {
2121

@@ -29,7 +29,7 @@ class IpfixDecoder : public Decoder {
2929
* @brief Creates ipfix decoder.
3030
* @param fd TCP connection file descriptor.
3131
*/
32-
IpfixDecoder(int fd) : m_fd(fd), m_decoded(), m_part_readed(), m_msg_size(0) {}
32+
IpfixDecoder(int fd) : m_reader(fd), m_decoded() {}
3333

3434
virtual void progress() override;
3535

@@ -40,19 +40,8 @@ class IpfixDecoder : public Decoder {
4040
}
4141

4242
private:
43-
/** returns true if there was enough data to read the header or if the header is already read */
44-
bool read_header();
45-
/** returns true if there was enough data to read the body */
46-
bool read_body();
47-
/** returns true if there was enough data to read to the given amount */
48-
bool read_until_n(size_t n);
49-
50-
int m_fd;
43+
TcpReader m_reader;
5144
DecodeBuffer m_decoded;
52-
53-
ByteVector m_part_readed;
54-
/** Final size of the whole IPFIX message. When 0 IPIFX header is not fully read. */
55-
size_t m_msg_size;
5645
};
5746

5847
} // namespace tcp_in

src/plugins/input/tcp/src/Lz4Decoder.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <stdexcept> // runtime_error
1515
#include <cstddef> // size_t
1616
#include <cerrno> // errno, EWOULDBLOCK, EAGAIN
17+
#include <cstring>
1718
#include <string> // string
1819

1920
#include <netinet/in.h> // ntohl, ntohs
@@ -23,7 +24,7 @@
2324
#include <ipfixcol2.h> // ipx_strerror
2425

2526
#include "DecodeBuffer.hpp" // DecodeBuffer
26-
#include "read_until_n.hpp" // read_until_n
27+
#include "RingBufferReader.hpp"
2728

2829
namespace tcp_in {
2930

@@ -38,7 +39,7 @@ struct __attribute__((__packed__)) ipfix_start_compress_header {
3839
};
3940

4041
Lz4Decoder::Lz4Decoder(int fd) :
41-
m_fd(fd),
42+
m_reader(fd),
4243
m_decoded(),
4344
m_decoder(LZ4_createStreamDecode()),
4445
m_decompressed(),
@@ -158,12 +159,13 @@ void Lz4Decoder::decompress() {
158159
}
159160

160161
// Copy the decompressed data into the decode buffer
161-
m_decoded.read_from(
162+
auto reader = RingBufferReader(
162163
m_decompressed.data(),
163164
m_decompressed.size(),
164165
m_decompressed_size,
165166
m_decompressed_pos
166167
);
168+
m_decoded.read_from(reader, m_decompressed_size);
167169

168170
m_decompressed_pos += m_decompressed_size;
169171
if (m_decompressed_pos >= m_decompressed.size()) {
@@ -175,7 +177,8 @@ void Lz4Decoder::decompress() {
175177
}
176178

177179
bool Lz4Decoder::read_until_n(size_t n) {
178-
return ::read_until_n(n, m_fd, m_compressed, m_decoded);
180+
m_reader.read_until_n(n, m_compressed, m_decoded);
181+
return m_compressed.size() == n;
179182
}
180183

181184
void Lz4Decoder::reset_stream(size_t buffer_size) {

src/plugins/input/tcp/src/Lz4Decoder.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "Decoder.hpp" // Decoder
2121
#include "DecodeBuffer.hpp" // DecodeBuffer
22+
#include "TcpReader.hpp"
2223

2324
namespace tcp_in {
2425

@@ -92,7 +93,7 @@ class Lz4Decoder : public Decoder {
9293
*/
9394
void reset_stream(size_t buffer_size);
9495

95-
int m_fd;
96+
TcpReader m_reader;
9697
DecodeBuffer m_decoded;
9798

9899
std::unique_ptr<LZ4_streamDecode_t, Lz4DecodeDestructor> m_decoder;

0 commit comments

Comments
 (0)