Skip to content

Commit 6d9b63e

Browse files
authored
Merge pull request #96 from CESNET/packet-zerocopy
Packet zerocopy
2 parents 1fd3bb2 + 85d707d commit 6d9b63e

File tree

9 files changed

+50
-81
lines changed

9 files changed

+50
-81
lines changed

include/ipfixprobe/packet.hpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,18 @@ struct Packet : public Record {
8484
uint32_t tcp_seq;
8585
uint32_t tcp_ack;
8686

87-
uint8_t *packet; /**< Pointer to begin of packet, if available */
87+
const uint8_t *packet; /**< Pointer to begin of packet, if available */
8888
uint16_t packet_len; /**< Length of data in packet buffer, packet_len <= packet_len_wire */
8989
uint16_t packet_len_wire; /**< Original packet length on wire */
9090

91-
uint8_t *payload; /**< Pointer to begin of payload, if available */
91+
const uint8_t *payload; /**< Pointer to begin of payload, if available */
9292
uint16_t payload_len; /**< Length of data in payload buffer, payload_len <= payload_len_wire */
9393
uint16_t payload_len_wire; /**< Original payload length computed from headers */
9494

9595
uint8_t *custom; /**< Pointer to begin of custom data, if available */
9696
uint16_t custom_len; /**< Length of data in custom buffer */
9797

98+
// TODO REMOVE
9899
uint8_t *buffer; /**< Buffer for packet, payload and custom data */
99100
uint16_t buffer_size; /**< Size of buffer */
100101

@@ -125,9 +126,15 @@ struct PacketBlock {
125126
size_t bytes;
126127
size_t size;
127128

128-
PacketBlock() :
129-
pkts(nullptr), cnt(0), bytes(0), size(0)
129+
PacketBlock(size_t pkts_size) :
130+
cnt(0), bytes(0), size(pkts_size)
130131
{
132+
pkts = new Packet[pkts_size];
133+
}
134+
135+
~PacketBlock()
136+
{
137+
delete[] pkts;
131138
}
132139
};
133140

input/parser.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -697,14 +697,8 @@ void parse_packet(parser_opt_t *opt, struct timeval ts, const uint8_t *data, uin
697697
}
698698

699699
uint16_t pkt_len = caplen;
700-
if ((int) pkt_len > pkt->buffer_size - 1) {
701-
pkt_len = pkt->buffer_size - 1;
702-
DEBUG_MSG("Packet size too long, truncating to %u\n", pkt_len);
703-
}
704-
pkt->packet = pkt->buffer;
705-
memcpy(pkt->packet, data, pkt_len);
706-
pkt->packet[pkt_len] = 0;
707-
pkt->packet_len = pkt_len;
700+
pkt->packet = data;
701+
pkt->packet_len = caplen;
708702

709703
if (l4_hdr_offset != l3_hdr_offset) {
710704
if (l4_hdr_offset + pkt->ip_payload_len < 64) {

input/pcap.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353

5454
namespace ipxp {
5555

56+
// Read only 1 packet into packet block
57+
constexpr size_t PCAP_PACKET_BLOCK_SIZE = 1;
58+
5659
__attribute__((constructor)) static void register_this_plugin()
5760
{
5861
static PluginRecord rec = PluginRecord("pcap", [](){return new PcapReader();});
@@ -266,7 +269,7 @@ InputPlugin::Result PcapReader::get(PacketBlock &packets)
266269
}
267270

268271
packets.cnt = 0;
269-
ret = pcap_dispatch(m_handle, packets.size, packet_handler, (u_char *) (&opt));
272+
ret = pcap_dispatch(m_handle, PCAP_PACKET_BLOCK_SIZE, packet_handler, (u_char *) (&opt));
270273
if (m_live) {
271274
if (ret == 0) {
272275
return Result::TIMEOUT;

input/raw.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ namespace ipxp {
6969
#error "raw plugin is supported with TPACKET3 only"
7070
#endif
7171

72+
// Read only 1 packet into packet block
73+
constexpr size_t RAW_PACKET_BLOCK_SIZE = 1;
74+
7275
__attribute__((constructor)) static void register_this_plugin()
7376
{
7477
static PluginRecord rec = PluginRecord("raw", [](){return new RawReader();});
@@ -301,7 +304,7 @@ int RawReader::process_packets(struct tpacket_block_desc *pbd, PacketBlock &pack
301304
{
302305
parser_opt_t opt = {&packets, false, false, DLT_EN10MB};
303306
uint32_t num_pkts = pbd->hdr.bh1.num_pkts;
304-
uint32_t capacity = packets.size - packets.cnt;
307+
uint32_t capacity = RAW_PACKET_BLOCK_SIZE - packets.cnt;
305308
uint32_t to_read = 0;
306309
struct tpacket3_hdr *ppd;
307310

input/stem.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151
namespace ipxp {
5252

53+
// Read only 1 packet into packet block
54+
constexpr size_t STEM_PACKET_BLOCK_SIZE = 1;
5355

5456
__attribute__((constructor)) static void register_this_plugin()
5557
{
@@ -168,7 +170,7 @@ InputPlugin::Result StemPacketReader::get(PacketBlock &packets)
168170
{
169171
packets.cnt = 0;
170172
packets.bytes = 0;
171-
while (packets.cnt < packets.size) {
173+
while (packets.cnt < STEM_PACKET_BLOCK_SIZE) {
172174
try {
173175
auto pkt = m_reader->next_packet();
174176
if (!pkt.has_value()) {

ipfixprobe.cpp

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ volatile sig_atomic_t terminate_export = 0;
7373
volatile sig_atomic_t terminate_input = 0;
7474

7575
const uint32_t DEFAULT_IQUEUE_SIZE = 64;
76-
const uint32_t DEFAULT_IQUEUE_BLOCK = 32;
7776
const uint32_t DEFAULT_OQUEUE_SIZE = 16536;
7877
const uint32_t DEFAULT_FPS = 0; // unlimited
7978

@@ -159,29 +158,6 @@ void print_help(ipxp_conf_t &conf, const std::string &arg)
159158
}
160159
}
161160

162-
void init_packets(ipxp_conf_t &conf)
163-
{
164-
// Reserve +1 more block as a "working block"
165-
conf.blocks_cnt = static_cast<size_t>(conf.iqueue_size + 1U) * conf.worker_cnt;
166-
conf.pkts_cnt = conf.blocks_cnt * conf.iqueue_block;
167-
conf.pkt_data_cnt = conf.pkts_cnt * conf.pkt_bufsize;
168-
conf.blocks = new PacketBlock[conf.blocks_cnt];
169-
conf.pkts = new Packet[conf.pkts_cnt];
170-
conf.pkt_data = new uint8_t[conf.pkt_data_cnt];
171-
172-
for (unsigned i = 0; i < conf.blocks_cnt; i++) {
173-
size_t pkts_offset = static_cast<size_t>(i) * conf.iqueue_block; // offset in number of packets
174-
175-
conf.blocks[i].pkts = conf.pkts + pkts_offset;
176-
conf.blocks[i].cnt = 0;
177-
conf.blocks[i].size = conf.iqueue_block;
178-
for (unsigned j = 0; j < conf.iqueue_block; j++) {
179-
conf.blocks[i].pkts[j].buffer = static_cast<uint8_t *>(conf.pkt_data + conf.pkt_bufsize * (j + pkts_offset));
180-
conf.blocks[i].pkts[j].buffer_size = conf.pkt_bufsize;
181-
}
182-
}
183-
}
184-
185161
void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params)
186162
{
187163
size_t delim;
@@ -356,17 +332,17 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
356332
conf.input_stats.push_back(input_stats);
357333

358334
WorkPipeline tmp = {
359-
{
360-
input_plugin,
361-
new std::thread(input_storage_worker, input_plugin, storage_plugin, &conf.blocks[pipeline_idx * (conf.iqueue_size + 1)],
362-
conf.iqueue_size + 1, conf.max_pkts, input_res, input_stats),
363-
input_res,
364-
input_stats
365-
},
366-
{
367-
storage_plugin,
368-
storage_process_plugins
369-
}
335+
{
336+
input_plugin,
337+
new std::thread(input_storage_worker, input_plugin, storage_plugin, conf.iqueue_size,
338+
conf.max_pkts, input_res, input_stats),
339+
input_res,
340+
input_stats
341+
},
342+
{
343+
storage_plugin,
344+
storage_process_plugins
345+
}
370346
};
371347
conf.pipelines.push_back(tmp);
372348
pipeline_idx++;
@@ -646,15 +622,13 @@ int run(int argc, char *argv[])
646622
}
647623

648624
conf.worker_cnt = parser.m_input.size();
649-
conf.iqueue_block = parser.m_iqueue_block;
650625
conf.iqueue_size = parser.m_iqueue;
651626
conf.oqueue_size = parser.m_oqueue;
652627
conf.fps = parser.m_fps;
653628
conf.pkt_bufsize = parser.m_pkt_bufsize;
654629
conf.max_pkts = parser.m_max_pkts;
655630

656631
try {
657-
init_packets(conf);
658632
if (process_plugin_args(conf, parser)) {
659633
goto EXIT;
660634
}

ipfixprobe.hpp

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
namespace ipxp {
6767

6868
extern const uint32_t DEFAULT_IQUEUE_SIZE;
69-
extern const uint32_t DEFAULT_IQUEUE_BLOCK;
7069
extern const uint32_t DEFAULT_OQUEUE_SIZE;
7170
extern const uint32_t DEFAULT_FPS;
7271

@@ -95,7 +94,6 @@ class IpfixprobeOptParser : public OptionsParser {
9594
std::string m_pid;
9695
bool m_daemon;
9796
uint32_t m_iqueue;
98-
uint32_t m_iqueue_block;
9997
uint32_t m_oqueue;
10098
uint32_t m_fps;
10199
uint32_t m_pkt_bufsize;
@@ -106,7 +104,7 @@ class IpfixprobeOptParser : public OptionsParser {
106104

107105
IpfixprobeOptParser() : OptionsParser("ipfixprobe", "flow exporter supporting various custom IPFIX elements"),
108106
m_pid(""), m_daemon(false),
109-
m_iqueue(DEFAULT_IQUEUE_SIZE), m_iqueue_block(DEFAULT_IQUEUE_BLOCK), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS),
107+
m_iqueue(DEFAULT_IQUEUE_SIZE), m_oqueue(DEFAULT_OQUEUE_SIZE), m_fps(DEFAULT_FPS),
110108
m_pkt_bufsize(1600), m_max_pkts(0), m_help(false), m_help_str(""), m_version(false)
111109
{
112110
m_delim = ' ';
@@ -137,12 +135,6 @@ class IpfixprobeOptParser : public OptionsParser {
137135
std::invalid_argument &e) { return false; }
138136
return true;
139137
}, OptionFlags::RequiredArgument);
140-
register_option("-b", "--iqueueb", "SIZE", "Size of input queue packet block",
141-
[this](const char *arg) {
142-
try { m_iqueue_block = str2num<decltype(m_iqueue_block)>(arg); } catch (
143-
std::invalid_argument &e) { return false; }
144-
return true;
145-
}, OptionFlags::RequiredArgument);
146138
register_option("-Q", "--oqueue", "SIZE", "Size of queue between storage and output plugins",
147139
[this](const char *arg) {
148140
try { m_oqueue = str2num<decltype(m_oqueue)>(arg); } catch (
@@ -189,7 +181,6 @@ class IpfixprobeOptParser : public OptionsParser {
189181

190182
struct ipxp_conf_t {
191183
uint32_t iqueue_size;
192-
uint32_t iqueue_block;
193184
uint32_t oqueue_size;
194185
uint32_t worker_cnt;
195186
uint32_t fps;
@@ -222,7 +213,7 @@ struct ipxp_conf_t {
222213
Packet *pkts;
223214
uint8_t *pkt_data;
224215

225-
ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE), iqueue_block(DEFAULT_IQUEUE_BLOCK),
216+
ipxp_conf_t() : iqueue_size(DEFAULT_IQUEUE_SIZE),
226217
oqueue_size(DEFAULT_OQUEUE_SIZE),
227218
worker_cnt(0), fps(0), max_pkts(0),
228219
pkt_bufsize(1600), blocks_cnt(0), pkts_cnt(0), pkt_data_cnt(0), blocks(nullptr), pkts(nullptr), pkt_data(nullptr)
@@ -268,10 +259,6 @@ struct ipxp_conf_t {
268259
for (auto &it : output_stats) {
269260
delete it;
270261
}
271-
272-
delete[] pkts;
273-
delete[] blocks;
274-
delete[] pkt_data;
275262
}
276263
};
277264

workers.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ namespace ipxp {
5151

5252
#define MICRO_SEC 1000000L
5353

54-
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
54+
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit,
5555
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats)
5656
{
5757
struct timespec start_cache;
@@ -60,30 +60,30 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
6060
struct timespec end = {0, 0};
6161
struct timeval ts = {0, 0};
6262
bool timeout = false;
63-
size_t i = 0;
6463
InputPlugin::Result ret;
6564
InputStats stats = {0, 0, 0, 0, 0};
6665
WorkerResult res = {false, ""};
6766

67+
PacketBlock block(queue_size);
68+
6869
#ifdef __linux__
6970
const clockid_t clk_id = CLOCK_MONOTONIC_COARSE;
7071
#else
7172
const clockid_t clk_id = CLOCK_MONOTONIC;
7273
#endif
7374

7475
while (!terminate_input) {
75-
PacketBlock *block = &pkts[i];
76-
block->cnt = 0;
77-
block->bytes = 0;
76+
block.cnt = 0;
77+
block.bytes = 0;
7878

79-
if (pkt_limit && plugin->m_parsed + block->size >= pkt_limit) {
79+
if (pkt_limit && plugin->m_parsed + block.size >= pkt_limit) {
8080
if (plugin->m_parsed >= pkt_limit) {
8181
break;
8282
}
83-
block->size = pkt_limit - plugin->m_parsed;
83+
block.size = pkt_limit - plugin->m_parsed;
8484
}
8585
try {
86-
ret = plugin->get(*block);
86+
ret = plugin->get(block);
8787
} catch (PluginError &e) {
8888
res.error = true;
8989
res.msg = e.what();
@@ -107,13 +107,13 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
107107
stats.packets = plugin->m_seen;
108108
stats.parsed = plugin->m_parsed;
109109
stats.dropped = plugin->m_dropped;
110-
stats.bytes += block->bytes;
110+
stats.bytes += block.bytes;
111111
clock_gettime(clk_id, &start_cache);
112112
try {
113-
for (unsigned i = 0; i < block->cnt; i++) {
114-
cache->put_pkt(block->pkts[i]);
113+
for (unsigned i = 0; i < block.cnt; i++) {
114+
cache->put_pkt(block.pkts[i]);
115115
}
116-
ts = block->pkts[block->cnt - 1].ts;
116+
ts = block.pkts[block.cnt - 1].ts;
117117
} catch (PluginError &e) {
118118
res.error = true;
119119
res.msg = e.what();
@@ -127,7 +127,6 @@ void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock
127127
time += 1000000000;
128128
}
129129
stats.qtime += time;
130-
i = (i + 1) % block_cnt;
131130

132131
out_stats->store(stats);
133132
} else if (ret == InputPlugin::Result::ERROR) {

workers.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ struct OutputWorker {
8686
ipx_ring_t *queue;
8787
};
8888

89-
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, PacketBlock *pkts, size_t block_cnt, uint64_t pkt_limit,
89+
void input_storage_worker(InputPlugin *plugin, StoragePlugin *cache, size_t queue_size, uint64_t pkt_limit,
9090
std::promise<WorkerResult> *out, std::atomic<InputStats> *out_stats);
9191
void output_worker(OutputPlugin *exp, ipx_ring_t *queue, std::promise<WorkerResult> *out, std::atomic<OutputStats> *out_stats,
9292
uint32_t fps);

0 commit comments

Comments
 (0)