Skip to content

Commit e20abee

Browse files
authored
Merge pull request #287 from CESNET/ndp-dual-device-read
NfbPlugin: add support for reading from multiple NFB devices
2 parents 0f3d743 + 7720b65 commit e20abee

File tree

6 files changed

+149
-22
lines changed

6 files changed

+149
-22
lines changed

init/config2args.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,20 @@ def process_input_ndp_plugin(settings):
214214
if device is None:
215215
raise ValueError("device must be specified in the ndp plugin configuration.")
216216

217+
process_plugins = settings.get("device", [])
218+
if not isinstance(process_plugins, list):
219+
raise ValueError("Invalid process plugins configuration format.")
220+
221+
res = ','.join(process_plugins)
222+
217223
queues = settings.get("queues")
218224
if queues is None:
219225
raise ValueError("queues must be specified in the ndp plugin configuration.")
220226

221227
# Parse the queues
222228
parsed_queues = parse_ndp_queues(queues)
223229

224-
params = [f'-i "ndp;dev={device}:{queue_id}"' for queue_id in parsed_queues]
230+
params = [f'-i "ndp;dev={res}:{queue_id}"' for queue_id in parsed_queues]
225231
return " ".join(params)
226232

227233
def process_input_pcap_file_plugin(settings):

init/schema.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,11 @@
9393
"ndp": {
9494
"type": "object",
9595
"properties": {
96-
"device": {
97-
"type": "string"
96+
"device": {
97+
"type": "array",
98+
"items": {
99+
"type": "string"
100+
}
98101
},
99102
"queues": {
100103
"type": "string"

src/plugins/input/nfb/src/ndp.cpp

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414

1515
#include "parser.hpp"
1616

17+
#include <algorithm>
1718
#include <cstdio>
1819
#include <cstring>
1920
#include <iostream>
21+
#include <memory>
22+
#include <span>
2023

2124
#include <ipfixprobe/pluginFactory/pluginManifest.hpp>
2225
#include <ipfixprobe/pluginFactory/pluginRegistrar.hpp>
@@ -43,7 +46,33 @@ static const PluginManifest ndpPluginManifest = {
4346
},
4447
};
4548

49+
static std::vector<std::string> parseDevices(const std::string& input)
50+
{
51+
std::vector<std::string> result;
52+
53+
size_t colon_pos = input.find_last_of(':');
54+
std::string suffix;
55+
std::string devices;
56+
57+
if (colon_pos != std::string::npos) {
58+
devices = input.substr(0, colon_pos);
59+
suffix = input.substr(colon_pos);
60+
} else {
61+
devices = input;
62+
suffix = "";
63+
}
64+
65+
std::stringstream ss(devices);
66+
std::string dev;
67+
while (std::getline(ss, dev, ',')) {
68+
result.push_back(dev + suffix);
69+
}
70+
71+
return result;
72+
}
73+
4674
NdpPacketReader::NdpPacketReader(const std::string& params)
75+
: ndp_packet_burst(new ndp_packet[64])
4776
{
4877
init(params.c_str());
4978
}
@@ -65,18 +94,29 @@ void NdpPacketReader::init(const char* params)
6594
if (parser.m_dev.empty()) {
6695
throw PluginError("specify device path");
6796
}
97+
6898
init_ifc(parser.m_dev);
6999
}
70100

71101
void NdpPacketReader::close()
72102
{
73-
ndpReader.close();
103+
for (size_t i = 0; i < m_readers_count; i++) {
104+
ndpReader[i].close();
105+
}
74106
}
75107

76108
void NdpPacketReader::init_ifc(const std::string& dev)
77109
{
78-
if (ndpReader.init_interface(dev) != 0) {
79-
throw PluginError(ndpReader.error_msg);
110+
const std::vector<std::string> devs = parseDevices(dev);
111+
m_readers_count = devs.size();
112+
if (m_readers_count > 2) {
113+
throw PluginError("too many devices specified");
114+
}
115+
116+
for (size_t i = 0; i < m_readers_count; i++) {
117+
if (ndpReader[i].init_interface(devs[i]) != 0) {
118+
throw PluginError(ndpReader[i].error_msg);
119+
}
80120
}
81121
}
82122

@@ -85,38 +125,63 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock& packets)
85125
parser_opt_t opt = {&packets, false, false, 0};
86126
struct ndp_packet* ndp_packet;
87127
struct timeval timestamp;
88-
size_t read_pkts = 0;
89128
int ret = -1;
90129

91130
packets.cnt = 0;
92-
for (unsigned i = 0; i < packets.size; i++) {
93-
ret = ndpReader.get_pkt(&ndp_packet, &timestamp);
94-
if (ret == 0) {
95-
if (opt.pblock->cnt) {
96-
break;
97-
}
98-
return Result::TIMEOUT;
99-
} else if (ret < 0) {
100-
// Error occured.
101-
throw PluginError(ndpReader.error_msg);
131+
constexpr size_t maxBurstSize = 64;
132+
size_t burstSize = std::min(packets.size, maxBurstSize);
133+
std::span<struct ndp_packet> packetSpan(ndp_packet_burst.get(), burstSize);
134+
std::span<timeval> timestampSpan(timestamps);
135+
136+
size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
137+
NdpReader& reader = ndpReader[reader_index];
138+
int received = reader.get_packets(packetSpan, timestampSpan);
139+
140+
if (received < 32) {
141+
std::span<struct ndp_packet> packetSpan(
142+
ndp_packet_burst.get() + received,
143+
burstSize - received);
144+
std::span<timeval> timestampSpan(timestamps.data() + received, burstSize - received);
145+
146+
size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
147+
NdpReader& reader = ndpReader[reader_index];
148+
received += reader.get_packets(packetSpan, timestampSpan);
149+
}
150+
151+
for (unsigned i = 0; i < received; ++i) {
152+
ndp_packet = &ndp_packet_burst[i];
153+
timestamp = timestamps[i];
154+
155+
if (ndp_packet->data_length == 0) {
156+
continue; // Skip empty packets
102157
}
103-
read_pkts++;
158+
104159
parse_packet(
105160
&opt,
106161
m_parser_stats,
107162
timestamp,
108163
ndp_packet->data,
109164
ndp_packet->data_length,
110165
ndp_packet->data_length);
166+
167+
if (opt.pblock->cnt >= packets.size) {
168+
break;
169+
}
111170
}
112171

113-
m_seen += read_pkts;
172+
m_seen += received;
114173
m_parsed += opt.pblock->cnt;
115174

116-
m_stats.receivedPackets += read_pkts;
175+
m_stats.receivedPackets += received;
117176
m_stats.receivedBytes += packets.bytes;
118177

119-
return opt.pblock->cnt ? Result::PARSED : Result::NOT_PARSED;
178+
if (opt.pblock->cnt) {
179+
return Result::PARSED;
180+
} else if (received == 0) {
181+
return Result::TIMEOUT;
182+
} else {
183+
return Result::NOT_PARSED;
184+
}
120185
}
121186

122187
void NdpPacketReader::configure_telemetry_dirs(

src/plugins/input/nfb/src/ndp.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
#include "ndpReader.hpp"
1616

17+
#include <memory>
18+
#include <span>
19+
1720
#include <ipfixprobe/inputPlugin.hpp>
1821
#include <ipfixprobe/options.hpp>
1922
#include <ipfixprobe/packet.hpp>
@@ -81,9 +84,14 @@ class NdpPacketReader : public InputPlugin {
8184

8285
telemetry::Content get_queue_telemetry();
8386

84-
NdpReader ndpReader;
87+
NdpReader ndpReader[2];
88+
std::size_t m_readers_count;
89+
uint64_t m_reader_idx = 0;
8590
RxStats m_stats = {};
8691

92+
std::unique_ptr<struct ndp_packet[]> ndp_packet_burst;
93+
std::array<timeval, 64> timestamps;
94+
8795
void init_ifc(const std::string& dev);
8896
};
8997

src/plugins/input/nfb/src/ndpReader.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,46 @@ void NdpReader::set_sw_timestamp(struct timeval* tv)
198198
tv->tv_usec = micros;
199199
}
200200

201+
int NdpReader::get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps)
202+
{
203+
if (blocked_packets > 128) {
204+
ndp_rx_burst_put(rx_handle);
205+
blocked_packets = 0;
206+
}
207+
208+
const unsigned received = ndp_rx_burst_get(rx_handle, packets.data(), packets.size());
209+
for (unsigned i = 0; i < received; i++) {
210+
struct ndp_packet* ndp_packet = &packets[i];
211+
if (fw_type == NdpFwType::NDP_FW_HANIC) {
212+
uint64_t* fw_ts = &((NdpHeader*) (ndp_packet->header))->timestamp;
213+
if (*fw_ts == 0) {
214+
set_sw_timestamp((struct timeval*) &timestamps[i]);
215+
} else {
216+
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
217+
}
218+
} else {
219+
uint8_t header_id = ndp_packet_flag_header_id_get(ndp_packet);
220+
if (header_id >= ndk_timestamp_offsets.size()) {
221+
set_sw_timestamp((struct timeval*) &timestamps[i]);
222+
} else if (ndk_timestamp_offsets[header_id] == std::numeric_limits<uint32_t>::max()) {
223+
set_sw_timestamp((struct timeval*) &timestamps[i]);
224+
} else {
225+
uint64_t* fw_ts = (uint64_t*) ((uint8_t*) ndp_packet->header
226+
+ ndk_timestamp_offsets[header_id]);
227+
if (*fw_ts == std::numeric_limits<uint64_t>::max()) {
228+
set_sw_timestamp((struct timeval*) &timestamps[i]);
229+
} else {
230+
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
231+
}
232+
}
233+
}
234+
}
235+
236+
blocked_packets += received;
237+
238+
return received;
239+
}
240+
201241
int NdpReader::get_pkt(struct ndp_packet** ndp_packet_out, struct timeval* timestamp)
202242
{
203243
if (ndp_packet_buffer_processed >= ndp_packet_buffer_packets) {

src/plugins/input/nfb/src/ndpReader.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "ndpHeader.hpp"
44

5+
#include <span>
56
#include <string>
67
#include <vector>
78

@@ -49,6 +50,8 @@ class NdpReader {
4950
int get_pkt(struct ndp_packet** ndp_packet, struct timeval* timestamp);
5051
std::string error_msg;
5152

53+
int get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps);
54+
5255
private:
5356
void set_booted_fw();
5457
void convert_fw_ts_to_timeval(const uint64_t* fw_ts, struct timeval* tv);
@@ -60,6 +63,8 @@ class NdpReader {
6063
uint16_t packet_bufferSize;
6164
uint64_t timeout;
6265

66+
uint64_t blocked_packets = 0;
67+
6368
NdpFwType fw_type;
6469
std::vector<uint32_t> ndk_timestamp_offsets;
6570

0 commit comments

Comments
 (0)