Skip to content

Commit 9297b1f

Browse files
BonnyAD9sedmicha
authored andcommitted
TCP - Add LZ4 decoder
1 parent 9fbe92a commit 9297b1f

File tree

3 files changed

+308
-0
lines changed

3 files changed

+308
-0
lines changed

src/plugins/input/tcp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ add_library(tcp-input MODULE
88
src/Epoll.cpp
99
src/ClientManager.cpp
1010
src/IpfixDecoder.cpp
11+
src/Lz4Decoder.cpp
1112
)
1213

1314
if (CMAKE_HOST_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_HOST_SYSTEM_NAME STREQUAL "OpenBSD")
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief IPFIX decoder for tcp plugin (source file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "Lz4Decoder.hpp"
12+
13+
#include <cstdint> // uint32_t, uint16_t
14+
#include <stdexcept> // runtime_error
15+
#include <cstddef> // size_t
16+
#include <cerrno> // errno, EWOULDBLOCK, EAGAIN
17+
#include <string> // string
18+
19+
#include <netinet/in.h> // ntohl, ntohs
20+
#include <lz4.h> // LZ4_decompress_safe_continue, LZ4_createStreamDecode, LZ4_setStreamDecode
21+
#include <unistd.h> // read
22+
23+
#include <ipfixcol2.h> // ipx_strerror
24+
25+
#include "DecodeBuffer.hpp" // DecodeBuffer
26+
#include "read_until_n.hpp" // read_until_n
27+
28+
namespace tcp_in {
29+
30+
struct __attribute__((__packed__)) ipfix_compress_header {
31+
uint16_t decompressed_size;
32+
uint16_t compressed_size;
33+
};
34+
35+
struct __attribute__((__packed__)) ipfix_start_compress_header {
36+
uint32_t magic;
37+
uint32_t buffer_size;
38+
};
39+
40+
Lz4Decoder::Lz4Decoder(int fd) :
41+
m_fd(fd),
42+
m_decoded(),
43+
m_decoder(LZ4_createStreamDecode()),
44+
m_decompressed(),
45+
m_decompressed_pos(0),
46+
m_compressed(),
47+
m_compressed_size(0),
48+
m_decompressed_size(0)
49+
{
50+
if (!m_decoder) {
51+
throw std::runtime_error("LZ4 Decoder: Failed to create stream decoder");
52+
}
53+
}
54+
55+
DecodeBuffer &Lz4Decoder::decode() {
56+
while (!m_decoded.enough_data()) {
57+
// Read the header.
58+
if (!read_header()) {
59+
// There is not enough data to read the whole header.
60+
break;
61+
}
62+
63+
// Read the body of the message.
64+
if (!read_body()) {
65+
// There is not enough data to read the whole body.
66+
break;
67+
}
68+
69+
// Decompress the readed message and add it to the decode buffer.
70+
decompress();
71+
}
72+
73+
if (m_decoded.is_eof_reached() && m_compressed.size() != 0) {
74+
throw std::runtime_error("Incomplete compressed message received");
75+
}
76+
77+
return m_decoded;
78+
}
79+
80+
bool Lz4Decoder::read_header() {
81+
/** compress header size */
82+
constexpr size_t CH_SIZE = sizeof(ipfix_compress_header);
83+
84+
if (m_compressed_size != 0) {
85+
// The header is already read, but the message body is incomplete.
86+
return true;
87+
}
88+
89+
if (m_decompressed.size() == 0) {
90+
// The size of the ring buffer is unknown; this is the first message the decoder decodes.
91+
// Read the recommended ring buffer size from the start header.
92+
if (!read_start_header()) {
93+
// There is not enough data to read the whole start header.
94+
return false;
95+
}
96+
}
97+
98+
// Read the header.
99+
if (!read_until_n(CH_SIZE)) {
100+
// There is not enough data to read the whole header.
101+
return false;
102+
}
103+
104+
// Get the message sizes from the header.
105+
auto hdr = reinterpret_cast<ipfix_compress_header *>(m_compressed.data());
106+
m_compressed_size = ntohs(hdr->compressed_size);
107+
m_decompressed_size = ntohs(hdr->decompressed_size);
108+
109+
// The header is not part of the message that should be decompressed.
110+
m_compressed.clear();
111+
return true;
112+
}
113+
114+
bool Lz4Decoder::read_start_header() {
115+
/** start compress header size */
116+
constexpr size_t SCH_SIZE = sizeof(ipfix_start_compress_header);
117+
118+
// Read the header.
119+
if (!read_until_n(SCH_SIZE)) {
120+
// There is not enough data to read the whole start header.
121+
return false;
122+
}
123+
124+
// Get the ring buffer size from the header.
125+
auto hdr = reinterpret_cast<ipfix_start_compress_header *>(m_compressed.data());
126+
auto new_buffer_size = ntohl(hdr->buffer_size);
127+
128+
// The start header is not part of the message that should be decompressed.
129+
m_compressed.clear();
130+
131+
reset_stream(new_buffer_size);
132+
return true;
133+
}
134+
135+
bool Lz4Decoder::read_body() {
136+
return read_until_n(m_compressed_size);
137+
}
138+
139+
void Lz4Decoder::decompress() {
140+
// The message is never on buffer boundaries.
141+
if (m_decompressed.size() - m_decompressed_pos < m_decompressed_size) {
142+
m_decompressed_pos = 0;
143+
}
144+
145+
int res = LZ4_decompress_safe_continue(
146+
m_decoder.get(),
147+
reinterpret_cast<char *>(m_compressed.data()),
148+
reinterpret_cast<char *>(m_decompressed.data()) + m_decompressed_pos,
149+
m_compressed_size,
150+
m_decompressed_size
151+
);
152+
153+
if (res < 0) {
154+
throw std::runtime_error("LZ4 Decoder: decompression failed");
155+
}
156+
157+
if (static_cast<size_t>(res) != m_decompressed_size) {
158+
// this shouldn't happen, but it is not error
159+
m_decompressed_size = res;
160+
}
161+
162+
// Copy the decompressed data into the decode buffer
163+
m_decoded.read_from(
164+
m_decompressed.data(),
165+
m_decompressed.size(),
166+
m_decompressed_size,
167+
m_decompressed_pos
168+
);
169+
170+
m_decompressed_pos += m_decompressed_size;
171+
if (m_decompressed_pos >= m_decompressed.size()) {
172+
m_decompressed_pos -= m_decompressed.size();
173+
}
174+
175+
m_compressed.clear();
176+
m_compressed_size = 0;
177+
}
178+
179+
bool Lz4Decoder::read_until_n(size_t n) {
180+
return ::read_until_n(n, m_fd, m_compressed, m_decoded);
181+
}
182+
183+
void Lz4Decoder::reset_stream(size_t buffer_size) {
184+
m_decompressed.resize(buffer_size);
185+
m_decompressed_pos = 0;
186+
187+
int res = LZ4_setStreamDecode(m_decoder.get(), nullptr, 0);
188+
if (res == 0) {
189+
throw std::runtime_error("LZ4 Decoder: Failed to reset stream decoder.");
190+
}
191+
}
192+
193+
} // namespace tcp_in
194+
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief IPFIX decoder for tcp plugin (header file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <memory> // std::unique_ptr
14+
#include <cstdint> // uint8_t, uint32_t
15+
#include <vector> // std::vector
16+
#include <cstddef> // size_t
17+
18+
#include <lz4.h> // LZ4_streamDecode_t, LZ4_freeStreamDecode
19+
20+
#include "Decoder.hpp" // Decoder
21+
#include "DecodeBuffer.hpp" // DecodeBuffer
22+
23+
namespace tcp_in {
24+
25+
/** Byte sequence at the start of lz4 stream. Used to determine what decoder should be used */
26+
constexpr uint32_t LZ4_MAGIC = 0x4c5a3463;
27+
28+
struct Lz4DecodeDestructor {
29+
inline void operator()(void *p) noexcept {
30+
LZ4_freeStreamDecode(reinterpret_cast<LZ4_streamDecode_t *>(p));
31+
}
32+
};
33+
34+
/** Decoder for LZ4 stream compression */
35+
class Lz4Decoder : public Decoder {
36+
public:
37+
/**
38+
* @brief Creates ipfix decoder.
39+
* @param fd TCP connection file descriptor.
40+
* @throws when fails to create stream decoder
41+
*/
42+
Lz4Decoder(int fd);
43+
44+
/**
45+
* @brief Decodes the next blocks of data until there is no data or when enough data is readed
46+
* @return decoded data
47+
* @throws when fails to read from file descriptor
48+
* @throws when fails to decompress the data
49+
*/
50+
virtual DecodeBuffer &decode() override;
51+
52+
virtual const char *get_name() const override {
53+
return "LZ4";
54+
}
55+
56+
private:
57+
/**
58+
* @brief reads header, returns true if whole header is readed
59+
* @return true if whole header is readed
60+
* @throws when fails to read from file descriptor
61+
*/
62+
bool read_header();
63+
/**
64+
* @brief reads the start header and resets stream, returns true if whole header is readed
65+
* @return true if whole header is readed
66+
* @throws when fails to read from file descriptor
67+
*/
68+
bool read_start_header();
69+
/**
70+
* @brief reads body, returns true if whole body is readed
71+
* @return true if whole body is readed
72+
* @throws when fails to read from file descriptor
73+
*/
74+
bool read_body();
75+
/**
76+
* @brief decompresses the next block of memory and write it to the decode buffer
77+
* @throws when decompression fails
78+
*/
79+
void decompress();
80+
/**
81+
* @brief read while `m_compressed.size() < n`
82+
* @param n number of bytes required to be in `m_compressed`
83+
* @return true if after call `m_compressed` has at least `n` bytes.
84+
* @throws when fails to read from file descriptor
85+
*/
86+
bool read_until_n(size_t n);
87+
/**
88+
* @brief resets the lz4 stream
89+
* @param buffer_size new decompressed buffer size
90+
* @throws when fails to reset LZ4 stream decoder
91+
*/
92+
void reset_stream(size_t buffer_size);
93+
94+
int m_fd;
95+
DecodeBuffer m_decoded;
96+
97+
std::unique_ptr<LZ4_streamDecode_t, Lz4DecodeDestructor> m_decoder;
98+
99+
/** internal circular buffer */
100+
std::vector<uint8_t> m_decompressed;
101+
/** current position in the circular buffer (to the first unused index) */
102+
size_t m_decompressed_pos;
103+
104+
/** When m_compressed_size == 0, this may contain incomplete header */
105+
std::vector<uint8_t> m_compressed;
106+
107+
/** size of full compressed block, 0 when unknown */
108+
size_t m_compressed_size;
109+
/** size of full decompressed block */
110+
size_t m_decompressed_size;
111+
};
112+
113+
} // namespace tcp_in

0 commit comments

Comments
 (0)