diff --git a/CMakeLists.txt b/CMakeLists.txt index b3d43b937..77222011e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,14 +12,16 @@ include(cmake/installation.cmake) set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules) +option(ENABLE_INPUT_SOCK "Enable build of input SOCK plugin" OFF) option(ENABLE_INPUT_PCAP "Enable build of input PCAP plugin" OFF) option(ENABLE_INPUT_DPDK "Enable build of input DPDK plugin" OFF) option(ENABLE_INPUT_NFB "Enable build of input NFB plugin" OFF) option(ENABLE_OUTPUT_UNIREC "Enable build of output UNIREC plugin" OFF) +option(ENABLE_OUTPUT_LZ4 "Enable Build of output LZ4 compression" OFF) option(ENABLE_PROCESS_EXPERIMENTAL "Enable build of experimental process plugins" OFF) option(ENABLE_MILLISECONDS_TIMESTAMP "Compile ipfixprobe with miliseconds timestamp precesion" OFF) option(ENABLE_NEMEA "Enable build of NEMEA plugins" OFF) - +option(ENABLE_UNWIND "Enable Build with Lib unwind" OFF) option(ENABLE_RPMBUILD "Enable build of RPM package" ON) option(ENABLE_TESTS "Build tests (make test)" OFF) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 2459ed45c..048d8be76 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -3,8 +3,13 @@ find_package(PkgConfig REQUIRED) find_package(Threads REQUIRED) find_package(Atomic REQUIRED) -find_package(Unwind REQUIRED) -find_package(LZ4 REQUIRED) +if (ENABLE_UNWIND) + find_package(Unwind REQUIRED) +endif() + +if (ENABLE_OUTPUT_LZ4) + find_package(LZ4 REQUIRED) +endif() find_package(OpenSSL REQUIRED) if (ENABLE_INPUT_PCAP) diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 4919df27f..cbe9b21d6 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -253,9 +253,12 @@ struct Flow : public Record { uint32_t dst_packets; uint8_t src_tcp_flags; uint8_t dst_tcp_flags; + uint32_t drop_packets; + uint32_t ing_phy_interface; uint8_t ip_version; + uint8_t ip_tos; uint8_t ip_proto; uint16_t src_port; uint16_t dst_port; diff --git a/include/ipfixprobe/ipfix-elements.hpp b/include/ipfixprobe/ipfix-elements.hpp index 0ed972a5f..35cd80b8b 100644 --- a/include/ipfixprobe/ipfix-elements.hpp +++ b/include/ipfixprobe/ipfix-elements.hpp @@ -89,6 +89,8 @@ namespace ipxp { #define OUTPUT_INTERFACE(F) F(0, 14, 2, nullptr) #define FLOW_END_REASON(F) F(0, 136, 1, &flow.end_reason) #define FLOW_ID(F) F(0, 148, 8, &flow.flow_hash) +#define DROPS(F) F(0, 133, 4, &flow.drop_packets) +#define ING_PHY_INTERFACE(F) F(0, 252, 4, &flow.ing_phy_interface) #define ETHERTYPE(F) F(0, 256, 2, nullptr) @@ -100,9 +102,10 @@ namespace ipxp { #define L3_PROTO(F) F(0, 60, 1, &flow.ip_version) #define L3_IPV4_ADDR_SRC(F) F(0, 8, 4, &flow.src_ip.v4) #define L3_IPV4_ADDR_DST(F) F(0, 12, 4, &flow.dst_ip.v4) -#define L3_IPV4_TOS(F) F(0, 5, 1, nullptr) +#define L3_IPV4_TOS(F) F(0, 5, 1, &flow.ip_tos) #define L3_IPV6_ADDR_SRC(F) F(0, 27, 16, &flow.src_ip.v6) #define L3_IPV6_ADDR_DST(F) F(0, 28, 16, &flow.dst_ip.v6) +#define L3_IPV6_TOS(F) F(0, 5, 1, &flow.ip_tos) #define L3_IPV4_IDENTIFICATION(F) F(0, 54, 2, nullptr) #define L3_IPV4_FRAGMENT(F) F(0, 88, 2, nullptr) #define L3_IPV4_TTL(F) F(0, 192, 1, nullptr) @@ -343,7 +346,8 @@ namespace ipxp { F(L3_IPV4_ADDR_SRC) \ F(L3_IPV4_ADDR_DST) \ F(L2_SRC_MAC) \ - F(L2_DST_MAC) + F(L2_DST_MAC) \ + F(L3_IPV4_TOS) #define BASIC_TMPLT_V6(F) \ F(FLOW_END_REASON) \ @@ -363,7 +367,8 @@ namespace ipxp { F(L3_IPV6_ADDR_SRC) \ F(L3_IPV6_ADDR_DST) \ F(L2_SRC_MAC) \ - F(L2_DST_MAC) + F(L2_DST_MAC) \ + F(L3_IPV6_TOS) #define IPFIX_HTTP_TEMPLATE(F) \ F(HTTP_USERAGENT) \ @@ -583,6 +588,10 @@ namespace ipxp { #define IPFIX_MPLS_TEMPLATE(F) F(MPLS_TOP_LABEL_STACK_SECTION) +#define IPFIX_SOCKPKTINFO_TEMPLATE(F) \ + F(ING_PHY_INTERFACE) \ + F(DROPS) + /** * List of all known templated. * @@ -616,7 +625,8 @@ namespace ipxp { IPFIX_ICMP_TEMPLATE(F) \ IPFIX_VLAN_TEMPLATE(F) \ IPFIX_NETTISA_TEMPLATE(F) \ - IPFIX_FLOW_HASH_TEMPLATE(F) + IPFIX_FLOW_HASH_TEMPLATE(F) \ + IPFIX_SOCKPKTINFO_TEMPLATE(F) /** * Helper macro, convert FIELD into its name as a C literal. diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index c84baa534..70a38b1ab 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -100,6 +100,12 @@ struct Packet : public Record { uint16_t buffer_size; /**< Size of buffer */ bool source_pkt; /**< Direction of packet from flow point of view */ + uint32_t pkt_cnt; /**< Number of packets on a flow - input plugin 'sock' */ + uint32_t drop_cnt; /**< Number of dropped packets on a flow - input plugin 'sock' */ + uint64_t byte_cnt; /**< Bytes received on a flow - input plugin 'sock' */ + uint32_t source_interface; /**< source interface for the flow - input plugin 'sock' */ + uint8_t end_reason; /**< Flow end reason - input plugin 'sock' */ + struct timeval end_ts; /**< Flow end time - input plugin 'sock' */ /** * \brief Constructor. @@ -142,6 +148,12 @@ struct Packet : public Record { , buffer(nullptr) , buffer_size(0) , source_pkt(true) + , pkt_cnt(0) + , drop_cnt(0) + , byte_cnt(0) + , source_interface(0) + , end_reason (0) + , end_ts({0, 0}) { } }; diff --git a/init/config2args.py b/init/config2args.py index eb2ad60ff..e0ca963f8 100755 --- a/init/config2args.py +++ b/init/config2args.py @@ -39,6 +39,8 @@ def process_input_plugin(config): return process_input_dpdk_plugin(settings) if plugin == "dpdk_ring": return process_input_dpdk_ring_plugin(settings) + if plugin == "sock": + return process_input_sock_plugin(settings) if plugin == "raw": return process_input_raw_plugin(settings) if plugin == "ndp": @@ -292,6 +294,19 @@ def process_input_raw_plugin(settings): return " ".join(params) +def process_input_sock_plugin(settings): + params = ['-i "sock'] + + if settings is None: + raise ValueError("Settings for sock plugin cannot be empty.") + + sock = settings.get("sock") + if sock is None: + raise ValueError("sock must be specified in the sock plugin configuration.") + + params.append(f"sock={sock}") + + return " ".join(params) def process_process_plugins(config): process_plugins = config.get("process_plugins", []) diff --git a/init/link0.conf.example b/init/link0.conf.example index 8e27a8e85..2056cde77 100644 --- a/init/link0.conf.example +++ b/init/link0.conf.example @@ -1,7 +1,7 @@ # Input plugin configuration (input_plugin) input_plugin: # IMPORTANT: Only one input plugin can be specified. Choose one of the following options: - # raw, pcap_file, pcap_live, ndp, dpdk_ring, or dpdk. + # raw, pcap_file, pcap_live, ndp, dpdk_ring, or dpdk, sock. raw: interface: eth0 # Network interface name to capture traffic from [required] @@ -35,6 +35,9 @@ input_plugin: eal_opts: null # EAL options (null = default options) mtu: null # Maximum Transmission Unit (defaults to RTE_ETHER_MAX_LEN) + sock: + sock: Unix domain socket path [required] + # Storage configuration (storage) storage: cache: diff --git a/init/schema.json b/init/schema.json index 3d2f9e9f7..a05864816 100644 --- a/init/schema.json +++ b/init/schema.json @@ -32,6 +32,26 @@ "raw" ] }, + { + "type": "object", + "properties": { + "sock": { + "type": "object", + "properties": { + "path": { + "type": "string" + } + }, + "required": [ + "path" + ], + "additionalProperties": false + } + }, + "required": [ + "sock" + ] + }, { "type": "object", "properties": { diff --git a/pkg/rpm/CMakeLists.txt b/pkg/rpm/CMakeLists.txt index cbdd25f48..5da5f9661 100644 --- a/pkg/rpm/CMakeLists.txt +++ b/pkg/rpm/CMakeLists.txt @@ -22,6 +22,10 @@ if (ENABLE_INPUT_PCAP) list(APPEND RPMBUILD_ARGS "--with" "input_pcap") endif() +if (ENABLE_INPUT_SOCK) + list(APPEND RPMBUILD_ARGS "--with" "input_sock") +endif() + if (ENABLE_INPUT_DPDK) list(APPEND RPMBUILD_ARGS "--with" "input_dpdk") endif() diff --git a/pkg/rpm/ipfixprobe-msec.spec.in b/pkg/rpm/ipfixprobe-msec.spec.in index 85d41da55..38f12873b 100644 --- a/pkg/rpm/ipfixprobe-msec.spec.in +++ b/pkg/rpm/ipfixprobe-msec.spec.in @@ -29,18 +29,25 @@ BuildRequires: gcc-c++ >= 10 BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif + %if 0%{?rhel} <= 9 BuildRequires: gcc-toolset-14-libatomic-devel %endif BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml diff --git a/pkg/rpm/ipfixprobe-nemea.spec.in b/pkg/rpm/ipfixprobe-nemea.spec.in index 9d097ad73..621763228 100644 --- a/pkg/rpm/ipfixprobe-nemea.spec.in +++ b/pkg/rpm/ipfixprobe-nemea.spec.in @@ -32,19 +32,26 @@ BuildRequires: gcc-c++ >= 10 BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif + %if 0%{?rhel} <= 9 BuildRequires: gcc-toolset-14-libatomic-devel %endif BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: nemea-framework-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml diff --git a/pkg/rpm/ipfixprobe.spec.in b/pkg/rpm/ipfixprobe.spec.in index 7beca85c0..acf724828 100644 --- a/pkg/rpm/ipfixprobe.spec.in +++ b/pkg/rpm/ipfixprobe.spec.in @@ -2,6 +2,7 @@ %bcond_with input_dpdk %bcond_with input_nfb %bcond_with process_experimental +%bcond_with input_sock %global _unitdir %{_prefix}/lib/systemd/system @@ -34,18 +35,25 @@ BuildRequires: gcc-c++ >= 10 BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif + %if 0%{?rhel} <= 9 BuildRequires: gcc-toolset-14-libatomic-devel %endif BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml @@ -64,6 +72,16 @@ BuildRequires: libpcap-devel Input plugin for libpcap. %endif +%if %{with input_sock} +%package input-sock +Summary: Input plugin to read flow records from a unix domain socket using libsock. +Requires: libsock +BuildRequires: libsock-devel + +%description input-sock +Input plugin for libsock. +%endif + %if %{with input_dpdk} %package input-dpdk Summary: Input plugin to read packets from interfaces using dpdk. @@ -104,7 +122,7 @@ Experimental process plugins. %if 0%{?rhel} == 8 source /opt/rh/gcc-toolset-14/enable %endif -%cmake -DCMAKE_BUILD_TYPE=Release %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} +%cmake -DCMAKE_BUILD_TYPE=Release %{?with_unwind:-DENABLE_UNWIND=ON} %{?with_output_lz4:-DENABLE_OUTPUT_LZ4=ON} %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} %cmake_build %install @@ -157,6 +175,12 @@ source /opt/rh/gcc-toolset-14/enable %{_libdir}/ipfixprobe/input/libipfixprobe-input-pcap.so %endif +%if %{with input_sock} +%files input-sock +%{_libdir}/ipfixprobe/input/libipfixprobe-input-sock.so +%{_libdir}/ipfixprobe/process/libipfixprobe-process-sockpktinfo.so +%endif + %if %{with input_nfb} %files input-nfb %{_libdir}/ipfixprobe/input/libipfixprobe-input-nfb.so diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 340f69d25..3f463ead5 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -30,10 +30,17 @@ target_link_libraries(ipfixprobe-core telemetry::telemetry telemetry::appFs atomic::atomic - unwind::unwind ${CMAKE_DL_LIBS} ) +if (ENABLE_UNWIND) +target_link_libraries(ipfixprobe-core unwind::unwind) + +target_compile_definitions(ipfixprobe-core PRIVATE + WITH_UNWIND +) +endif() + add_executable(ipfixprobe main.cpp) target_link_libraries(ipfixprobe ${CORE_LIB}) target_link_options(ipfixprobe PRIVATE -Wl,--export-dynamic) diff --git a/src/core/ipfixprobe.cpp b/src/core/ipfixprobe.cpp index b7dceecea..a1c95e797 100644 --- a/src/core/ipfixprobe.cpp +++ b/src/core/ipfixprobe.cpp @@ -65,7 +65,9 @@ void signal_handler(int sig) { (void) sig; if (sig == SIGSEGV || sig == SIGABRT) { +#ifdef WITH_UNWIND st_dump(STDERR_FILENO, sig); +#endif exit(EXIT_FAILURE); } stop = 1; @@ -77,9 +79,7 @@ void register_handlers() signal(SIGINT, signal_handler); signal(SIGSEGV, signal_handler); signal(SIGABRT, signal_handler); -#ifdef WITH_NEMEA signal(SIGPIPE, SIG_IGN); -#endif } void error(std::string msg) diff --git a/src/core/stacktrace.cpp b/src/core/stacktrace.cpp index 9654a7cd7..79109ee4b 100644 --- a/src/core/stacktrace.cpp +++ b/src/core/stacktrace.cpp @@ -36,6 +36,7 @@ #define UNW_LOCAL_ONLY #include "stacktrace.hpp" +#ifdef WITH_UNWIND #include namespace ipxp { @@ -192,3 +193,4 @@ void st_dump(int fd, int sig) } } // namespace ipxp +#endif diff --git a/src/plugins/input/CMakeLists.txt b/src/plugins/input/CMakeLists.txt index de07ed619..7fd9d0547 100644 --- a/src/plugins/input/CMakeLists.txt +++ b/src/plugins/input/CMakeLists.txt @@ -1,5 +1,9 @@ add_subdirectory(raw) +if (ENABLE_INPUT_SOCK) + add_subdirectory(sock) +endif() + if (ENABLE_INPUT_PCAP) add_subdirectory(pcap) endif() diff --git a/src/plugins/input/sock/CMakeLists.txt b/src/plugins/input/sock/CMakeLists.txt new file mode 100644 index 000000000..63d59151d --- /dev/null +++ b/src/plugins/input/sock/CMakeLists.txt @@ -0,0 +1,31 @@ +project(ipfixprobe-input-sock VERSION 1.0.0 DESCRIPTION "ipfixprobe-input-sock plugin") + +add_library(ipfixprobe-input-sock MODULE + src/sock.cpp + src/sock.hpp + ../parser/parser.cpp + ../parser/parser.hpp +) + +set_target_properties(ipfixprobe-input-sock PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN YES +) +target_include_directories(ipfixprobe-input-sock PRIVATE + ${SOCK_INCLUDE_DIRS} + ${CMAKE_SOURCE_DIR}/include/ + ${CMAKE_SOURCE_DIR}/src/plugins/input/parser +) + +target_compile_definitions(ipfixprobe-input-sock PRIVATE + WITH_SOCK +) + +target_link_libraries(ipfixprobe-input-sock PRIVATE + ${SOCK_LIBRARIES} + telemetry::telemetry +) + +install(TARGETS ipfixprobe-input-sock + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/input/" +) diff --git a/src/plugins/input/sock/README.md b/src/plugins/input/sock/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/input/sock/src/sock.cpp b/src/plugins/input/sock/src/sock.cpp new file mode 100644 index 000000000..2dd5f0585 --- /dev/null +++ b/src/plugins/input/sock/src/sock.cpp @@ -0,0 +1,278 @@ +/** + * @file + * @brief Switch records reader from unix domain sockets. + * This is useful plugin for devices with IPFIX support in silicon. + * A switch record identified by the device can be sent to + * this input plugin via a unix domain socket for processing + * exporting to a collector. + * @author Lokesh dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "sock.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace ipxp { + +// Print debug message if debugging is allowed. +#define DEBUG_MSG(format, ...) fprintf(stderr, format, ##__VA_ARGS__) +// Process code if debugging is allowed. +#define DEBUG_CODE(code) code + + +static const PluginManifest sockPluginManifest = { + .name = "sock", + .description = "sock input plugin for reading ipfix flow records from a unix domain socket.", + .pluginVersion = "1.0.0", + .apiVersion = "1.0.0", + .usage = + []() { + SockOptParser parser; + parser.usage(std::cout); + }, +}; + +SockReader::SockReader(const std::string& params) + : sock(-1) +{ + init(params.c_str()); +} + +SockReader::~SockReader() +{ + close(); +} + +void SockReader::init(const char* params) +{ + SockOptParser parser; + try { + parser.parse(params); + } catch (ParserError& e) { + throw PluginError(e.what()); + } + + if (parser.m_sock.empty()) { + throw PluginError("specify socket path"); + } + + open_sock(parser.m_sock); +} + +void SockReader::close() +{ + if (sock >= 0) { + ::close(sock); + sock = -1; + } +} + +void SockReader::open_sock(const std::string& m_sock) +{ + int server_sock, len, rc; + struct sockaddr_un server_sockaddr; + + server_sock = socket(AF_UNIX, SOCK_DGRAM, 0); + if (server_sock == -1) { + throw PluginError( + std::string("could not create AF_UNIX socket: ") + strerror(errno)); + } + + server_sockaddr.sun_family = AF_UNIX; + strcpy(server_sockaddr.sun_path, m_sock.c_str()); + len = sizeof(server_sockaddr); + unlink(m_sock.c_str()); + rc = bind(server_sock, (struct sockaddr*)&server_sockaddr, len); + if (rc == -1) { + ::close(server_sock); + throw PluginError( + std::string("bind failed: ") + strerror(errno)); + } + sock = server_sock; +} + +void SockReader::set_packet(Packet* pkt, struct SwitchRecordData* recordData) +{ + char dst_str[INET6_ADDRSTRLEN]; + char src_str[INET6_ADDRSTRLEN]; + + DEBUG_CODE(char timestamp[32]; time_t time = recordData->start_time.tv_sec; + strftime(timestamp, sizeof(timestamp), "%FT%T", localtime(&time));); + DEBUG_MSG("Time:\t\t\t%s.%06lu\n", timestamp, recordData->start_time.tv_usec); + DEBUG_MSG("Source interface:\t%u\n", recordData->src_if); + + pkt->ts = recordData->start_time; + pkt->end_ts = recordData->end_time; + pkt->end_reason = recordData->end_reason; + pkt->ip_version = recordData->ip_version; + pkt->source_interface = recordData->src_if; + pkt->src_port = 0; + pkt->dst_port = 0; + pkt->ip_proto = 0; + pkt->ip_ttl = 0; + pkt->ip_flags = 0; + pkt->ip_payload_len = 0; + pkt->tcp_flags = 0; + pkt->tcp_window = 0; + pkt->tcp_options = 0; + pkt->tcp_mss = 0; + memcpy(pkt->dst_mac, recordData->dst_mac, sizeof(recordData->dst_mac)); + memcpy(pkt->src_mac, recordData->src_mac, sizeof(recordData->src_mac)); + pkt->ethertype = recordData->eth_type; + pkt->vlan_id = recordData->vlan_id; + pkt->ip_tos = recordData->tos; + + DEBUG_CODE( + char src_mac[18]; // ether_ntoa missing on some platforms + char dst_mac[18]; + uint8_t *p = (uint8_t *) pkt->src_mac; + snprintf(src_mac, sizeof(src_mac), "%02x:%02x:%02x:%02x:%02x:%02x", p[0], p[1], p[2], p[3], p[4], p[5]); + p = (uint8_t *) pkt->dst_mac; + snprintf(dst_mac, sizeof(dst_mac), "%02x:%02x:%02x:%02x:%02x:%02x", p[0], p[1], p[2], p[3], p[4], p[5]); + ); + DEBUG_MSG("\tDest mac:\t%s\n", dst_mac); + DEBUG_MSG("\tSrc mac:\t%s\n", src_mac); + DEBUG_MSG("\tEthertype:\t%#06x\n", pkt->ethertype); + DEBUG_MSG("\tVLAN:\t%u\n", pkt->vlan_id); + + if (pkt->ip_version == 4) { + pkt->src_ip.v4 = recordData->src_ip.s_addr; + pkt->dst_ip.v4 = recordData->dst_ip.s_addr; + inet_ntop(AF_INET, &recordData->src_ip, src_str, 16); + inet_ntop(AF_INET, &recordData->dst_ip, dst_str, 16); + DEBUG_MSG("IPv4 header:\n"); + } + else if (pkt->ip_version == 6) { + memcpy(pkt->src_ip.v6, recordData->src_ip6.s6_addr, 16); + memcpy(pkt->dst_ip.v6, recordData->dst_ip6.s6_addr, 16); + inet_ntop(AF_INET6, &recordData->src_ip6, src_str, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &recordData->dst_ip6, dst_str, INET6_ADDRSTRLEN); + DEBUG_MSG("IPv6 header:\n"); + } + + pkt->ip_proto = recordData->ip_proto; + pkt->ip_len = recordData->ip_length; + pkt->ip_ttl = recordData->ip_ttl; + pkt->ip_flags = recordData->ip_flags; + pkt->ip_payload_len = recordData->ip_payload_len; + + DEBUG_MSG("\tHDR version:\t%u\n", pkt->ip_version); + DEBUG_MSG("\tHDR length:\t%u\n", pkt->ip_payload_len); + DEBUG_MSG("\tTotal length:\t%u\n", pkt->ip_len); + DEBUG_MSG("\tTOS:\t\t%u\n", pkt->ip_tos); + DEBUG_MSG("\tProtocol:\t%u\n", pkt->ip_proto); + DEBUG_MSG("\tSrc addr:\t%s\n", src_str); + DEBUG_MSG("\tDest addr:\t%s\n", dst_str); + DEBUG_MSG("\tFlags:\t\t%#x\n", pkt->ip_flags); + DEBUG_MSG("\tTTL:\t\t%u\n", pkt->ip_ttl); + + pkt->src_port = recordData->src_port; + pkt->dst_port = recordData->dst_port; + if (pkt->ip_proto == IPPROTO_TCP) { + pkt->tcp_flags = recordData->tcp_control_bits; + pkt->tcp_window = recordData->tcp_window; + pkt->tcp_seq = recordData->tcp_seq; + pkt->tcp_ack = recordData->tcp_ack; + DEBUG_MSG("TCP header:\n"); + DEBUG_MSG("\tSrc port:\t%u\n", pkt->src_port); + DEBUG_MSG("\tDest port:\t%u\n", pkt->dst_port); + DEBUG_MSG("\tFlags:\t%u\n", pkt->tcp_flags); + DEBUG_MSG("\tSEQ:\t\t%#x\n", pkt->tcp_seq); + DEBUG_MSG("\tACK SEQ:\t%#x\n", pkt->tcp_ack); + DEBUG_MSG("\tWindow:\t\t%u\n", pkt->tcp_window); + } + if (pkt->ip_proto == IPPROTO_UDP) { + DEBUG_MSG("UDP header:\n"); + DEBUG_MSG("\tSrc port:\t%u\n", pkt->src_port); + DEBUG_MSG("\tDest port:\t%u\n", pkt->dst_port); + } + + pkt->pkt_cnt = recordData->pkt_cnt; + pkt->byte_cnt = recordData->byte_cnt; + DEBUG_MSG("Packet count %u byte count: %lu\n", pkt->pkt_cnt, pkt->byte_cnt); +} + +InputPlugin::Result SockReader::get(PacketBlock& pblock) +{ + int bytes_rec = -1; + struct sockaddr_un peer_sock; + int len; + Packet* pkt; + struct SwitchRecordData* recordData; + struct SwitchRecordHdr* recordHdr; + uint8_t recordHdrBuffer[sizeof(struct SwitchRecordHdr)]; + int recordBuffer_size; + uint8_t* recordBuffer = NULL; + + bytes_rec = recvfrom(sock, recordHdrBuffer, sizeof(struct SwitchRecordHdr), MSG_PEEK | MSG_DONTWAIT, + (struct sockaddr*) &peer_sock, (socklen_t*) &len); + if (bytes_rec == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return Result::TIMEOUT; + } else { + ::close(sock); + throw PluginError(std::string("recvfrom failed: ") + strerror(errno)); + } + } else { + recordHdr = (struct SwitchRecordHdr*) recordHdrBuffer; + DEBUG_MSG("Switch record version: %u num_records %u bytes_rec %d\n", + recordHdr->version, recordHdr->num_records, bytes_rec); + + if (recordHdr->version == SWITCH_RECORD_VERSION_V1) { + recordBuffer_size = sizeof(struct SwitchRecordHdr) + + (sizeof(struct SwitchRecordData) * recordHdr->num_records); + + recordBuffer = (uint8_t*) malloc(recordBuffer_size); + if (!recordBuffer) { + ::close(sock); + throw PluginError("not enough memory"); + } else { + bytes_rec = recvfrom(sock, recordBuffer, recordBuffer_size, 0, + (struct sockaddr*) &peer_sock, (socklen_t*) &len); + if (bytes_rec == -1) { + ::close(sock); + throw PluginError(std::string("recvfrom failed: ") + strerror(errno)); + } else { + pblock.cnt = 0; + DEBUG_MSG("bytes_rec :%d \n", bytes_rec); + recordData = (struct SwitchRecordData*) (recordBuffer + sizeof(struct SwitchRecordHdr)); + for (int i = 0; i < recordHdr->num_records; i++, recordData++) { + pkt = &pblock.pkts[pblock.cnt]; + if (recordData) { + DEBUG_MSG("Record count: %d\n", i); + set_packet(pkt, recordData); + pblock.cnt++; + pblock.bytes += pkt->ip_len; + m_seen += recordData->pkt_cnt; + m_parsed += recordData->pkt_cnt; + } + } + } + } + } + } + free(recordBuffer); + return pblock.cnt ? Result::PARSED : Result::NOT_PARSED; +} + +static const PluginRegistrar sockRegistrar(sockPluginManifest); +} // namespace ipxp diff --git a/src/plugins/input/sock/src/sock.hpp b/src/plugins/input/sock/src/sock.hpp new file mode 100644 index 000000000..e0f08bcdf --- /dev/null +++ b/src/plugins/input/sock/src/sock.hpp @@ -0,0 +1,141 @@ +/** + * @file + * @brief Switch records reader using unix domain sockets + * @author Lokesh dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include + + +namespace ipxp { + +#define SWITCH_RECORD_VERSION_V1 1 + +class SockOptParser : public OptionsParser { + public: + std::string m_sock; + + SockOptParser() + : OptionsParser( + "sock", + "Input plugin for reading records from a unix domain socket") + , m_sock("") + { + register_option( + "s", + "sock", + "PATH", + "Unix domain socket path", + [this](const char* arg) { + m_sock = arg; + return true; + }, + OptionFlags::RequiredArgument); + } +}; + +class SockReader : public InputPlugin { + public: + SockReader(const std::string& params); + ~SockReader(); + void init(const char* params); + void close(); + OptionsParser* get_parser() const { return new SockOptParser(); } + std::string get_name() const { return "sock"; } + InputPlugin::Result get(PacketBlock& packets); + + private: + int sock; + void open_sock(const std::string& m_sock); + void set_packet(Packet* pkt, struct SwitchRecordData* recordData); +}; + +struct __attribute__((packed)) SwitchRecordData { + struct timeval start_time; + struct timeval end_time; + uint8_t end_reason; + uint8_t unused; + + uint32_t pkt_cnt; + uint32_t drop_cnt; + uint64_t byte_cnt; + uint32_t src_if; + + uint8_t dst_mac[6]; + uint8_t src_mac[6]; + uint16_t eth_type; + uint32_t vlan_id; + + uint8_t ip_version; + uint8_t ip_proto; + uint8_t tos; + uint8_t ip_ttl; + uint8_t ip_flags; + uint16_t ip_length; /* Length of IP header + its payload */ + uint16_t ip_payload_len; /* Length of IP payload */ + + uint16_t src_port; + uint16_t dst_port; + struct in_addr src_ip; + struct in_addr dst_ip; + struct in6_addr src_ip6; + struct in6_addr dst_ip6; + + uint8_t tcp_control_bits; + uint16_t tcp_window; + uint32_t tcp_seq; + uint32_t tcp_ack; + + /** + * \brief Constructor. + */ + SwitchRecordData() + : start_time({0, 0}) + , end_time({0, 0}) + , end_reason(0) + , pkt_cnt(0) + , drop_cnt(0) + , byte_cnt(0) + , src_if(0) + , dst_mac() + , src_mac() + , eth_type(0) + , vlan_id(0) + , ip_version(0) + , ip_proto(0) + , tos(0) + , ip_ttl(0) + , ip_flags(0) + , ip_length(0) + , ip_payload_len(0) + , src_port(0) + , dst_port(0) + , src_ip({0}) + , dst_ip({0}) + , src_ip6({0}) + , dst_ip6({0}) + , tcp_control_bits(0) + , tcp_window(0) + , tcp_seq(0) + , tcp_ack(0) + { + } +}; + + struct __attribute__((packed)) SwitchRecordHdr { + uint8_t version; + uint8_t unused; + uint16_t num_records; + }; +} // namespace ipxp + diff --git a/src/plugins/output/ipfix/CMakeLists.txt b/src/plugins/output/ipfix/CMakeLists.txt index 41ad2d80d..2ab3db20e 100644 --- a/src/plugins/output/ipfix/CMakeLists.txt +++ b/src/plugins/output/ipfix/CMakeLists.txt @@ -15,10 +15,16 @@ target_include_directories(ipfixprobe-output-ipfix PRIVATE ${CMAKE_SOURCE_DIR}/include/ ) -target_link_libraries(ipfixprobe-output-ipfix PRIVATE - lz4::lz4 +if (ENABLE_OUTPUT_LZ4) + target_link_libraries(ipfixprobe-output-ipfix PRIVATE + lz4::lz4 ) +target_compile_definitions(ipfixprobe-output-ipfix PRIVATE + WITH_LZ4 +) +endif() + install( TARGETS ipfixprobe-output-ipfix LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/output/" diff --git a/src/plugins/output/ipfix/src/ipfix.cpp b/src/plugins/output/ipfix/src/ipfix.cpp index 2095d28ed..c219edf64 100644 --- a/src/plugins/output/ipfix/src/ipfix.cpp +++ b/src/plugins/output/ipfix/src/ipfix.cpp @@ -19,7 +19,9 @@ #include #include #include +#ifdef WITH_LZ4 #include +#endif #include #include #include @@ -170,14 +172,16 @@ void IPFIXExporter::init(const char* params) dir_bit_field = parser.m_dir; templateRefreshTime = parser.m_template_refresh_time; - int res; + int res = -1; // check if compression is enabled if (parser.m_lz4_compression) { +#ifdef WITH_LZ4 res = packetDataBuffer.init( true, LZ4_COMPRESSBOUND(mtu) + CompressBuffer::C_ADD_SIZE, // mtu * 3 is arbitrary value, it should be more than mtu * 2 std::max(parser.m_lz4_buffer_size, mtu * 3)); +#endif } else { res = packetDataBuffer.init(false, 0, mtu); } @@ -879,7 +883,7 @@ int IPFIXExporter::send_packet(ipfix_packet_t* packet) auto data = packetDataBuffer.getCompressed(); /* sendto() does not guarantee that everything will be send in one piece */ - while (sent < dataLen) { + while (fd != -1 && sent < dataLen) { /* Send data to collector (TCP and SCTP ignores last two arguments) */ ret = sendto( fd, @@ -1189,7 +1193,9 @@ CompressBuffer::CompressBuffer() , readSize(0) , lastReadIndex(0) , lastReadSize(0) +#ifdef WITH_LZ4 , lz4Stream(nullptr) +#endif { } @@ -1216,12 +1222,12 @@ int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize) return -1; } compressedSize = compressSize; - +#ifdef WITH_LZ4 lz4Stream = LZ4_createStream(); if (!lz4Stream) { return -1; } - +#endif shouldResetConnection = true; return 0; @@ -1297,7 +1303,7 @@ int CompressBuffer::compress() readSize = 0; return compressedSize; } - +#ifdef WITH_LZ4 // resize the buffer if it may not be large enough if (compressedSize < LZ4_COMPRESSBOUND(readSize) + C_ADD_SIZE) { auto newSize = LZ4_COMPRESSBOUND(readSize); @@ -1366,6 +1372,9 @@ int CompressBuffer::compress() readSize = 0; return res + (com - compressed); +#else + return 0; +#endif } const uint8_t* CompressBuffer::getCompressed() const @@ -1425,12 +1434,12 @@ void CompressBuffer::close() compressed = nullptr; compressedSize = 0; } - +#ifdef WITH_LZ4 if (lz4Stream) { LZ4_freeStream(lz4Stream); lz4Stream = nullptr; } - +#endif shouldResetConnection = false; shouldCompress = false; readIndex = 0; diff --git a/src/plugins/output/ipfix/src/ipfix.hpp b/src/plugins/output/ipfix/src/ipfix.hpp index d2f7e9f74..6f2481b2f 100644 --- a/src/plugins/output/ipfix/src/ipfix.hpp +++ b/src/plugins/output/ipfix/src/ipfix.hpp @@ -24,7 +24,9 @@ #include #include #include +#ifdef WITH_LZ4 #include +#endif #define COUNT_IPFIX_TEMPLATES(T) +1 @@ -185,6 +187,7 @@ class IpfixOptParser : public OptionsParser { return true; }, OptionFlags::NoArgument); +#ifdef WITH_LZ4 register_option( "c", "lz4-compression", @@ -196,6 +199,7 @@ class IpfixOptParser : public OptionsParser { return true; }, OptionFlags::NoArgument); +#endif register_option( "s", "lz4-buffer-size", @@ -539,9 +543,10 @@ class CompressBuffer { // last compressed data position size_t lastReadIndex; size_t lastReadSize; - +#ifdef WITH_LZ4 // compression stream used by lz4 LZ4_stream_t* lz4Stream; +#endif }; class IPFIXExporter : public OutputPlugin { diff --git a/src/plugins/process/CMakeLists.txt b/src/plugins/process/CMakeLists.txt index a47322075..d8e9d4319 100644 --- a/src/plugins/process/CMakeLists.txt +++ b/src/plugins/process/CMakeLists.txt @@ -29,3 +29,7 @@ if (ENABLE_PROCESS_EXPERIMENTAL) add_subdirectory(ntp) add_subdirectory(nettisa) endif() + +if (ENABLE_INPUT_SOCK) + add_subdirectory(sockpktinfo) +endif() diff --git a/src/plugins/process/sockpktinfo/CMakeLists.txt b/src/plugins/process/sockpktinfo/CMakeLists.txt new file mode 100644 index 000000000..e2977bb91 --- /dev/null +++ b/src/plugins/process/sockpktinfo/CMakeLists.txt @@ -0,0 +1,27 @@ +project(ipfixprobe-process-sockpktinfo VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-sockpktinfo plugin") + +add_library(ipfixprobe-process-sockpktinfo MODULE + src/sockpktinfo.cpp + src/sockpktinfo.hpp +) + +set_target_properties(ipfixprobe-process-sockpktinfo PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN YES +) + +target_include_directories(ipfixprobe-process-sockpktinfo PRIVATE + ${CMAKE_SOURCE_DIR}/include/ +) + +if(ENABLE_NEMEA) + target_link_libraries(ipfixprobe-process-sockpktinfo PRIVATE + -Wl,--whole-archive ipfixprobe-nemea-fields -Wl,--no-whole-archive + unirec::unirec + trap::trap + ) +endif() + +install(TARGETS ipfixprobe-process-sockpktinfo + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/process/" +) diff --git a/src/plugins/process/sockpktinfo/README.md b/src/plugins/process/sockpktinfo/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp b/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp new file mode 100644 index 000000000..275c872a0 --- /dev/null +++ b/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp @@ -0,0 +1,62 @@ +/** + * @file + * @brief Plugin for parsing packet info arriving via the "sock" input plugin. + * @author Lokesh Dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "sockpktinfo.hpp" + +#include + +#include +#include + +namespace ipxp { + +static const PluginManifest sockpktinfoPluginManifest = { + .name = "sockpktinfo", + .description = "Sock input plugin packet information process plugin.", + .pluginVersion = "1.0.0", + .apiVersion = "1.0.0", + .usage = + []() { + OptionsParser parser( + "sockpktinfo", + "Process additional information coming in via the sock input plugin use"); + parser.usage(std::cout); + }, +}; + +SOCKPKTINFOPlugin::SOCKPKTINFOPlugin(const std::string& params, int pluginID) + : ProcessPlugin(pluginID) +{ + init(params.c_str()); +} + +ProcessPlugin* SOCKPKTINFOPlugin::copy() +{ + return new SOCKPKTINFOPlugin(*this); +} + +int SOCKPKTINFOPlugin::post_create(Flow& rec, const Packet& pkt) +{ + auto ext = new RecordExtSOCKPKTINFO(m_pluginID); + ext->ing_phy_interface = pkt.source_interface; + ext->drop_packets = pkt.drop_cnt; + + /* Update packet count and byte count received from sock input plugin */ + rec.src_packets = pkt.pkt_cnt; + rec.src_bytes = pkt.byte_cnt; + rec.add_extension(ext); + return FLOW_FLUSH; +} + +static const PluginRegistrar + sockpktinfoRegistrar(sockpktinfoPluginManifest); + +} // namespace ipxp diff --git a/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp b/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp new file mode 100644 index 000000000..0bf847724 --- /dev/null +++ b/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp @@ -0,0 +1,120 @@ +/** + * @file + * @brief Plugin for parsing packet info arriving via the "sock" input plugin. + * @author Lokesh Dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include + +#ifdef WITH_NEMEA +#include "fields.h" +#endif + +#include +#include +#include + +//#include +#include +#include +#include +#include + +namespace ipxp { + +#define SOCKPKTINFO_UNIREC_TEMPLATE "ING_PHY_INTERFACE,DROPS" + +UR_FIELDS( + uint32 ING_PHY_INTERFACE, + uint64 DROPS) + +/** + * \brief Flow record extension header for storing parsed SOCKPKTINFO data. + */ +struct RecordExtSOCKPKTINFO : public RecordExt { + uint32_t ing_phy_interface; + uint32_t drop_packets; + + RecordExtSOCKPKTINFO(int pluginID) + : RecordExt(pluginID) + , ing_phy_interface(0) + , drop_packets(0) + { + } + + #ifdef WITH_NEMEA + virtual void fill_unirec(ur_template_t* tmplt, void* record) + { + ur_set(tmplt, record, F_ING_PHY_INTERFACE, ing_phy_interface); + ur_set(tmplt, record, F_DROPS, drop_packets); + } + + const char* get_unirec_tmplt() const + { + return SOCKPKTINFO_UNIREC_TEMPLATE; + } + #endif + + int fill_ipfix(uint8_t* buffer, int size) override + { + const int LEN = sizeof(ing_phy_interface) + sizeof(drop_packets); + if (size < LEN) { + return -1; + } + *(uint32_t*)buffer = ntohl(ing_phy_interface); + *(uint32_t*)(buffer + 4) = ntohl(drop_packets); + + return LEN; + } + + const char** get_ipfix_tmplt() const + { + static const char* ipfix_template[] = { + IPFIX_SOCKPKTINFO_TEMPLATE(IPFIX_FIELD_NAMES) + NULL + }; + return ipfix_template; + } + + std::string get_text() const + { + std::ostringstream out; + out << "ing_phy_interface=\"" << ing_phy_interface << '"' << ",drop_packets=\"" << drop_packets << '"'; + return out.str(); + } +}; + +/** + * \brief Process plugin for parsing SOCKPKTINFO packets. + */ +class SOCKPKTINFOPlugin : public ProcessPlugin { + public: + SOCKPKTINFOPlugin(const std::string& params, int pluginID); + OptionsParser* get_parser() const + { + return new OptionsParser( + "sockpktinfo", + "Parse SOCKPKTINFO traffic"); + } + std::string get_name() const + { + return "sockpktinfo"; + } + RecordExt* get_ext() const + { + return new RecordExtSOCKPKTINFO(m_pluginID); + } + ProcessPlugin* copy(); + + int post_create(Flow& rec, const Packet& pkt); +}; + +} // namespace ipxp + diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index bda8bb4ac..0b85f42f5 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -68,6 +68,8 @@ void FlowRecord::erase() m_flow.dst_bytes = 0; m_flow.src_tcp_flags = 0; m_flow.dst_tcp_flags = 0; + m_flow.end_reason = 0; + m_flow.ip_tos = 0; } void FlowRecord::reuse() { @@ -99,6 +101,13 @@ void FlowRecord::create(const Packet& pkt, uint64_t hash) m_flow.time_first = pkt.ts; m_flow.time_last = pkt.ts; + if (pkt.end_ts.tv_sec || pkt.end_ts.tv_usec) { + m_flow.time_last = pkt.end_ts; + } + + if (pkt.end_reason) { + m_flow.end_reason = pkt.end_reason; + } m_flow.flow_hash = hash; memcpy(m_flow.src_mac, pkt.src_mac, 6); @@ -107,12 +116,14 @@ void FlowRecord::create(const Packet& pkt, uint64_t hash) if (pkt.ip_version == IP::v4) { m_flow.ip_version = pkt.ip_version; m_flow.ip_proto = pkt.ip_proto; + m_flow.ip_tos = pkt.ip_tos; m_flow.src_ip.v4 = pkt.src_ip.v4; m_flow.dst_ip.v4 = pkt.dst_ip.v4; m_flow.src_bytes = pkt.ip_len; } else if (pkt.ip_version == IP::v6) { m_flow.ip_version = pkt.ip_version; m_flow.ip_proto = pkt.ip_proto; + m_flow.ip_tos = pkt.ip_tos; memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); m_flow.src_bytes = pkt.ip_len;