Skip to content

Commit a5e7812

Browse files
author
Pavel Siska
committed
WIP ndp: update ndp plugin
1 parent 42d6fec commit a5e7812

File tree

4 files changed

+91
-33
lines changed

4 files changed

+91
-33
lines changed

src/plugins/input/nfb/src/ndp.cpp

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#include <cstdio>
1818
#include <cstring>
1919
#include <iostream>
20+
#include <memory>
21+
#include <algorithm>
22+
#include <span>
2023

2124
#include <ipfixprobe/pluginFactory/pluginManifest.hpp>
2225
#include <ipfixprobe/pluginFactory/pluginRegistrar.hpp>
@@ -47,7 +50,7 @@ static std::vector<std::string> parseDevices(const std::string& input)
4750
{
4851
std::vector<std::string> result;
4952

50-
size_t colon_pos = input.find(':');
53+
size_t colon_pos = input.find_last_of(':');
5154
std::string suffix;
5255
std::string devices;
5356

@@ -69,6 +72,7 @@ static std::vector<std::string> parseDevices(const std::string& input)
6972
}
7073

7174
NdpPacketReader::NdpPacketReader(const std::string& params)
75+
: ndp_packet_burst(new ndp_packet[64])
7276
{
7377
init(params.c_str());
7478
}
@@ -121,54 +125,57 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock& packets)
121125
parser_opt_t opt = {&packets, false, false, 0};
122126
struct ndp_packet* ndp_packet;
123127
struct timeval timestamp;
124-
size_t read_pkts = 0;
125128
int ret = -1;
126129

127130
packets.cnt = 0;
131+
constexpr size_t maxBurstSize = 64;
132+
size_t burstSize = std::min(packets.size, maxBurstSize);
133+
std::span<struct ndp_packet> packetSpan(ndp_packet_burst.get(), burstSize);
134+
std::span<timeval> timestampSpan(timestamps);
135+
136+
size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
137+
NdpReader& reader = ndpReader[reader_index];
138+
int received = reader.get_packets(packetSpan, timestampSpan);
139+
140+
if (received < 32) {
141+
std::span<struct ndp_packet> packetSpan(ndp_packet_burst.get() + received, burstSize - received);
142+
std::span<timeval> timestampSpan(timestamps.data() + received, burstSize - received);
143+
144+
size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
145+
NdpReader& reader = ndpReader[reader_index];
146+
received += reader.get_packets(packetSpan, timestampSpan);
147+
}
148+
149+
for (unsigned i = 0; i < received; ++i) {
150+
ndp_packet = &ndp_packet_burst[i];
151+
timestamp = timestamps[i];
128152

129-
bool any_timeout = false;
130-
131-
for (unsigned r = 0; r < m_readers_count; ++r) {
132-
NdpReader& reader = ndpReader[(m_reader_idx++) % m_readers_count];
133-
134-
for (unsigned i = 0; i < packets.size; ++i) {
135-
ret = reader.get_pkt(&ndp_packet, &timestamp);
136-
if (ret == 0) {
137-
any_timeout = true;
138-
break;
139-
} else if (ret < 0) {
140-
// Error occured.
141-
throw PluginError(reader.error_msg);
142-
}
143-
144-
++read_pkts;
145-
parse_packet(
146-
&opt,
147-
m_parser_stats,
148-
timestamp,
149-
ndp_packet->data,
150-
ndp_packet->data_length,
151-
ndp_packet->data_length);
152-
153-
if (opt.pblock->cnt >= packets.size) {
154-
break;
155-
}
153+
if (ndp_packet->data_length == 0) {
154+
continue; // Skip empty packets
156155
}
157156

158-
if (opt.pblock->cnt) {
157+
parse_packet(
158+
&opt,
159+
m_parser_stats,
160+
timestamp,
161+
ndp_packet->data,
162+
ndp_packet->data_length,
163+
ndp_packet->data_length);
164+
165+
if (opt.pblock->cnt >= packets.size) {
159166
break;
160167
}
161168
}
162169

163-
m_seen += read_pkts;
170+
m_seen += received;
164171
m_parsed += opt.pblock->cnt;
165172

166-
m_stats.receivedPackets += read_pkts;
173+
m_stats.receivedPackets += received;
167174
m_stats.receivedBytes += packets.bytes;
168175

169176
if (opt.pblock->cnt) {
170177
return Result::PARSED;
171-
} else if (any_timeout) {
178+
} else if (received == 0) {
172179
return Result::TIMEOUT;
173180
} else {
174181
return Result::NOT_PARSED;

src/plugins/input/nfb/src/ndp.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
#include <ipfixprobe/packet.hpp>
2020
#include <ipfixprobe/utils.hpp>
2121

22+
#include <memory>
23+
#include <span>
24+
2225
namespace ipxp {
2326

2427
class NdpOptParser : public OptionsParser {
@@ -86,6 +89,9 @@ class NdpPacketReader : public InputPlugin {
8689
uint64_t m_reader_idx = 0;
8790
RxStats m_stats = {};
8891

92+
std::unique_ptr<struct ndp_packet[]> ndp_packet_burst;
93+
std::array<timeval, 64> timestamps;
94+
8995
void init_ifc(const std::string& dev);
9096
};
9197

src/plugins/input/nfb/src/ndpReader.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,46 @@ void NdpReader::set_sw_timestamp(struct timeval* tv)
198198
tv->tv_usec = micros;
199199
}
200200

201+
int NdpReader::get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps)
202+
{
203+
if (blocked_packets > 128) {
204+
ndp_rx_burst_put(rx_handle);
205+
blocked_packets = 0;
206+
}
207+
208+
const unsigned received = ndp_rx_burst_get(rx_handle, packets.data(), packets.size());
209+
for (unsigned i = 0; i < received; i++) {
210+
struct ndp_packet* ndp_packet = &packets[i];
211+
if (fw_type == NdpFwType::NDP_FW_HANIC) {
212+
uint64_t* fw_ts = &((NdpHeader*) (ndp_packet->header))->timestamp;
213+
if (*fw_ts == 0) {
214+
set_sw_timestamp((struct timeval*) &timestamps[i]);
215+
} else {
216+
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
217+
}
218+
} else {
219+
uint8_t header_id = ndp_packet_flag_header_id_get(ndp_packet);
220+
if (header_id >= ndk_timestamp_offsets.size()) {
221+
set_sw_timestamp((struct timeval*) &timestamps[i]);
222+
} else if (ndk_timestamp_offsets[header_id] == std::numeric_limits<uint32_t>::max()) {
223+
set_sw_timestamp((struct timeval*) &timestamps[i]);
224+
} else {
225+
uint64_t* fw_ts
226+
= (uint64_t*) ((uint8_t*) ndp_packet->header + ndk_timestamp_offsets[header_id]);
227+
if (*fw_ts == std::numeric_limits<uint64_t>::max()) {
228+
set_sw_timestamp((struct timeval*) &timestamps[i]);
229+
} else {
230+
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
231+
}
232+
}
233+
}
234+
}
235+
236+
blocked_packets += received;
237+
238+
return received;
239+
}
240+
201241
int NdpReader::get_pkt(struct ndp_packet** ndp_packet_out, struct timeval* timestamp)
202242
{
203243
if (ndp_packet_buffer_processed >= ndp_packet_buffer_packets) {

src/plugins/input/nfb/src/ndpReader.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <stdint.h>
1010
#include <sys/time.h>
1111
#include <unistd.h>
12+
#include <span>
1213

1314
extern "C" {
1415
#include <nfb/ndp.h>
@@ -49,6 +50,8 @@ class NdpReader {
4950
int get_pkt(struct ndp_packet** ndp_packet, struct timeval* timestamp);
5051
std::string error_msg;
5152

53+
int get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps);
54+
5255
private:
5356
void set_booted_fw();
5457
void convert_fw_ts_to_timeval(const uint64_t* fw_ts, struct timeval* tv);
@@ -60,6 +63,8 @@ class NdpReader {
6063
uint16_t packet_bufferSize;
6164
uint64_t timeout;
6265

66+
uint64_t blocked_packets = 0;
67+
6368
NdpFwType fw_type;
6469
std::vector<uint32_t> ndk_timestamp_offsets;
6570

0 commit comments

Comments
 (0)