diff --git a/Pcap++/header/XdpDevice.h b/Pcap++/header/XdpDevice.h index effb4b07a7..d822d7133f 100644 --- a/Pcap++/header/XdpDevice.h +++ b/Pcap++/header/XdpDevice.h @@ -6,20 +6,31 @@ #include #include #include +#include /// @namespace pcpp /// @ namespace pcpp { + + #define PCPP_MAX_XDP_NUMBER_QUEUES 64 + + /// Forward Declaration of XdpSocket + class XdpSocket; + /// @class XdpDevice /// A class wrapping the main functionality of using AF_XDP (XSK) sockets /// which are optimized for high performance packet processing. /// - /// It provides methods for configuring and initializing an AF_XDP socket, and then send and receive packets through - /// it. It also provides a method for gathering statistics from the socket. + /// It provides methods for configuring and initializing multipler AF_XDP socket, and then send and receive packets through + /// these. It also provides a method for gathering statistics from the socket. class XdpDevice : public IDevice { + + friend class XdpSocket; + public: + /// @typedef OnPacketsArrive /// The callback that is called whenever packets are received on the socket /// @param[in] packets An array of the raw packets received @@ -48,6 +59,10 @@ namespace pcpp /// AF_XDP operation mode AttachMode attachMode; + /// number of queues. Should be less than or equal to the number of hardware queues supported by the device + // the queue ids are inferred as consecutive starting at zero + uint32_t numQueues; + /// UMEM is a region of virtual contiguous memory, divided into equal-sized frames. /// This parameter determines the number of frames that will be allocated as pert of the UMEM. uint16_t umemNumFrames; @@ -87,10 +102,11 @@ namespace pcpp /// @param[in] txSize The size of the TX ring used by the AF_XDP socket. The default value is 2048 /// @param[in] rxTxBatchSize The max number of packets to be received or sent in one batch. The default /// value is 64 + /// @param[in] numQueues The number of queues, and therefore sockets, to configure with device. The default is 1 explicit XdpDeviceConfiguration(AttachMode attachMode = AutoMode, uint16_t umemNumFrames = 0, uint16_t umemFrameSize = 0, uint32_t fillRingSize = 0, uint32_t completionRingSize = 0, uint32_t rxSize = 0, uint32_t txSize = 0, - uint16_t rxTxBatchSize = 0) + uint16_t rxTxBatchSize = 0, uint32_t numQueues = 0) { this->attachMode = attachMode; this->umemNumFrames = umemNumFrames; @@ -100,6 +116,7 @@ namespace pcpp this->rxSize = rxSize; this->txSize = txSize; this->rxTxBatchSize = rxTxBatchSize; + this->numQueues = numQueues; } }; @@ -155,29 +172,29 @@ namespace pcpp uint64_t umemFreeFrames; }; - /// A c'tor for this class. Please note that calling this c'tor doesn't initialize the AF_XDP socket. In order - /// to set up the socket call open(). - /// @param[in] interfaceName The interface name to open the AF_XDP socket on + /// A c'tor for this class. Please note that calling this c'tor doesn't initialize the AF_XDP sockets. In order + /// to set up the sockets call open(). + /// @param[in] interfaceName The interface name to open the AF_XDP sockets on explicit XdpDevice(std::string interfaceName); - /// A d'tor for this class. It closes the device if it's open. + /// A d'tor for this class. It closes the device if it's open and frees up the sockets ~XdpDevice() override; /// Open the device with default configuration. Call getConfig() after opening the device to get the /// current configuration. - /// This method initializes the UMEM, and then creates and configures the AF_XDP socket. If it succeeds the - /// socket is ready to receive and send packets. + /// This method creates and configures the AF_XDP sockets per queue. If it succeeds the + /// sockets are ready to receive and send packets. /// @return True if device was opened successfully, false otherwise bool open() override; /// Open the device with custom configuration set by the user. - /// This method initializes the UMEM, and then creates and configures the AF_XDP socket. If it succeeds the - /// socket is ready to receive and send packets. + /// This method initializes the UMEM, and then creates and configures the AF_XDP sockets. If it succeeds the + /// sockets are ready to receive and send packets. /// @param[in] config The configuration to use for opening the device /// @return True if device was opened successfully, false otherwise bool open(const XdpDeviceConfiguration& config); - /// Close the device. This method closes the AF_XDP socket and frees the UMEM that was allocated for it. + /// Close the device. This method closes the AF_XDP sockets and frees the associated socket resources. void close() override; bool isOpened() const override @@ -185,7 +202,7 @@ namespace pcpp return m_DeviceOpened; } - /// Start receiving packets. In order to use this method the device should be open. Note that this method is + /// Start receiving packets on queue id = 0. In order to use this method the device should be open. Note that this method is /// blocking and will return if: /// - stopReceivePackets() was called from within the user callback /// - timeoutMS passed without receiving any packets @@ -203,7 +220,7 @@ namespace pcpp /// want to stop receiving packets. void stopReceivePackets(); - /// Send a vector of packet pointers. + /// Send a vector of packet pointers on queue id zero /// @param[in] packets A vector of packet pointers to send /// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true /// this method will wait until the number of packets in the completion ring is equal or greater to the number @@ -215,7 +232,7 @@ namespace pcpp bool sendPackets(const RawPacketVector& packets, bool waitForTxCompletion = false, int waitForTxCompletionTimeoutMS = 5000); - /// Send an array of packets. + /// Send an array of packets on queue id zero /// @param[in] packets An array of raw packets to send /// @param[in] packetCount The length of the packet array /// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true @@ -232,13 +249,144 @@ namespace pcpp XdpDeviceConfiguration* getConfig() const { // TODO: Return a copy or const ref to avoid user modifying config? - return m_Config.get(); + return m_Config; } - /// @return Current device statistics + /// @return Current device statistics for queue id zero XdpDeviceStats getStatistics(); + /// Get the XdpSocket object for the specified queue id + /// @param[in] queueid the queueid of the related socket + /// @return the pointer to the XdpSocket object if queueid valid otherwise nullptr + XdpSocket *getSocket(uint32_t queueid = 0) + { + if(queueid < m_Socket.size()) + { + return m_Socket[queueid].get(); + } + + return nullptr; + } + + private: + + struct XdpPrevDeviceStats + { + timespec timestamp; + uint64_t rxPackets; + uint64_t rxBytes; + uint64_t txSentPackets; + uint64_t txSentBytes; + uint64_t txCompletedPackets; + }; + + bool m_DeviceOpened = false; + + std::string m_InterfaceName; + XdpDeviceConfiguration* m_Config; + std::vector> m_Socket; + + bool sendPackets(const std::function& getPacketAt, + const std::function& getPacketCount, bool waitForTxCompletion = false, + int waitForTxCompletionTimeoutMS = 5000); + bool initConfig(); + + // for backward compatibility + OnPacketsArrive OnPacketsArriveCB_; + static void onPacketsArriveSocketZero(RawPacket packets[], uint32_t packetCount, XdpSocket* socket, void* userCookie); + + }; + + /// @class XdpSocket + /// A class wrapping the main functionality of a single AF_XDP (XSK) socket + /// which are optimized for high performance packet processing. + /// + /// It provides methods for configuring and initializing AF_XDP socket, and then send and receive packets through + /// it. It also provides a method for gathering statistics from the socket. + class XdpSocket + { + + friend class XdpDevice; + + public: + + /// A c'tor for this class. Please note that calling this c'tor doesn't initialize the AF_XDP socket. In order + /// to set up the socket call configureSocket(). + /// @param[in] device The custodial XdpDevice + /// @param[in] queueid The queue id for the socket + explicit XdpSocket(XdpDevice *device, uint32_t queueid); + + /// A d'tor for this class. It deletes the underlying socket resource and frees allocated UMEM + ~XdpSocket(); + + /// Configure the socket with device configuration. + /// This method initializes the UMEM, and then creates and configures the AF_XDP socket for a queue. If it succeeds the + /// sockets are ready to receive and send packets. + /// @return True if socket was configured successfully, false otherwise + bool configureSocket(); + + /// A socket is associated with a device + XdpDevice *getDevice() { return m_Device; } + + /// A socket is associated with one of the device queues. + uint32_t getQueueId() { return m_Queueid; } + + /// @typedef OnPacketsArriveSocket + /// The callback that is called whenever packets are received on the socket + /// @param[in] packets An array of the raw packets received + /// @param[in] packetCount The number of packets received + /// @param[in] socket The XdpSocket packets are received from (represents the AF_XDP socket) + /// @param[in] userCookie A pointer to an object set by the user when receivePackets() started + typedef void (*OnPacketsArriveSocket)(RawPacket packets[], uint32_t packetCount, XdpSocket* socket, void* userCookie); + + /// Start receiving packets on socket. In order to use this method the device should be open. Note that this method is + /// blocking and will return if: + /// - stopReceivePackets() was called from within the user callback + /// - timeoutMS passed without receiving any packets + /// - Some error occurred (an error log will be printed) + /// @param[in] onPacketsArriveSocket A callback to be called when packets are received on particular socket + /// @param[in] onPacketsArriveSocketUserCookie The callback is invoked with this cookie as a parameter. It can be used + /// to pass information from the user application to the callback + /// @param[in] timeoutMS Timeout in milliseconds to stop if no packets are received. The default value is 5000 + /// ms + /// @return True if stopped receiving packets because stopReceivePackets() was called or because timeoutMS + /// passed, or false if an error occurred. + bool receivePackets(OnPacketsArriveSocket onPacketsArriveSocket, void* onPacketsArriveSocketUserCookie, int timeoutMS = 5000); + + /// Stop receiving packets. Call this method from within the callback passed to receivePackets() whenever you + /// want to stop receiving packets on the socket. + void stopReceivePackets(); + + /// Send a vector of packet pointers. + /// @param[in] packets A vector of packet pointers to send + /// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true + /// this method will wait until the number of packets in the completion ring is equal or greater to the number + /// of packets that were sent. The default value is false + /// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with + /// this timeout. The default value is 5000 ms + /// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed. + /// Returns false if an error occurred or if poll timed out. + bool sendPackets(const RawPacketVector& packets, bool waitForTxCompletion = false, + int waitForTxCompletionTimeoutMS = 5000); + + /// Send an array of packets. + /// @param[in] packets An array of raw packets to send + /// @param[in] packetCount The length of the packet array + /// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true + /// this method will wait until the number of packets in the completion ring is equal or greater to the number + /// of packets sent. The default value is false + /// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with + /// this timeout. The default value is 5000 ms + /// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed. + /// Returns false if an error occurred or if poll timed out. + bool sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion = false, + int waitForTxCompletionTimeoutMS = 5000); + + /// @return Current device statistics + XdpDevice::XdpDeviceStats getStatistics(); + private: + class XdpUmem { public: @@ -282,35 +430,23 @@ namespace pcpp std::vector m_FreeFrames; }; - struct XdpPrevDeviceStats - { - timespec timestamp; - uint64_t rxPackets; - uint64_t rxBytes; - uint64_t txSentPackets; - uint64_t txSentBytes; - uint64_t txCompletedPackets; - }; + XdpDevice *m_Device; + uint32_t m_Queueid; - bool m_DeviceOpened = false; - - std::string m_InterfaceName; - std::unique_ptr m_Config; - bool m_ReceivingPackets; - XdpUmem* m_Umem; - void* m_SocketInfo; - XdpDeviceStats m_Stats; - XdpPrevDeviceStats m_PrevStats; + bool m_ReceivingPackets = false; + XdpUmem* m_Umem = nullptr; + void* m_SocketInfo = nullptr; + XdpDevice::XdpDeviceStats m_Stats; + XdpDevice::XdpPrevDeviceStats m_PrevStats; bool sendPackets(const std::function& getPacketAt, - const std::function& getPacketCount, bool waitForTxCompletion = false, - int waitForTxCompletionTimeoutMS = 5000); + const std::function& getPacketCount, bool waitForTxCompletion = false, + int waitForTxCompletionTimeoutMS = 5000); + bool populateFillRing(uint32_t count, uint32_t rxId = 0); bool populateFillRing(const std::vector& addresses, uint32_t rxId); uint32_t checkCompletionRing(); - bool configureSocket(); bool initUmem(); - bool populateConfigDefaults(XdpDeviceConfiguration& config) const; bool getSocketStats(); }; } // namespace pcpp diff --git a/Pcap++/src/XdpDevice.cpp b/Pcap++/src/XdpDevice.cpp index 1f17cc4eea..d7006f0cf0 100644 --- a/Pcap++/src/XdpDevice.cpp +++ b/Pcap++/src/XdpDevice.cpp @@ -11,12 +11,19 @@ #include #include #include +#include #include #include #include namespace pcpp { + struct xsk_socket_info + { + struct xsk_ring_cons rx; + struct xsk_ring_prod tx; + struct xsk_socket* xsk; + }; struct xsk_umem_info { @@ -25,20 +32,261 @@ namespace pcpp struct xsk_umem* umem; }; - struct xsk_socket_info - { - struct xsk_ring_cons rx; - struct xsk_ring_prod tx; - struct xsk_socket* xsk; - }; - #define DEFAULT_UMEM_NUM_FRAMES (XSK_RING_PROD__DEFAULT_NUM_DESCS * 2) #define DEFAULT_FILL_RING_SIZE (XSK_RING_PROD__DEFAULT_NUM_DESCS * 2) #define DEFAULT_COMPLETION_RING_SIZE XSK_RING_PROD__DEFAULT_NUM_DESCS #define DEFAULT_BATCH_SIZE 64 +#define DEFAULT_NUMBER_QUEUES 1 #define IS_POWER_OF_TWO(num) (num && ((num & (num - 1)) == 0)) - XdpDevice::XdpUmem::XdpUmem(uint16_t numFrames, uint16_t frameSize, uint32_t fillRingSize, + XdpDevice::XdpDevice(std::string interfaceName) + : m_InterfaceName(std::move(interfaceName)), m_Config(nullptr) + { + OnPacketsArriveCB_ = nullptr; + } + + XdpDevice::~XdpDevice() + { + close(); + } + + void XdpDevice::onPacketsArriveSocketZero(RawPacket packets[], uint32_t packetCount, XdpSocket* socket, void* userCookie) + { + auto device = socket->getDevice(); + + if (device && device->OnPacketsArriveCB_) + { + device->OnPacketsArriveCB_(packets, packetCount, device, userCookie); + } + } + + bool XdpDevice::receivePackets(OnPacketsArrive onPacketsArrive, void* onPacketsArriveUserCookie, int timeoutMS) + { + if (!m_DeviceOpened) + { + PCPP_LOG_ERROR("Device is not open"); + return false; + } + + if (m_Socket.empty()) + { + PCPP_LOG_ERROR("Device has no queues or sockets"); + return false; + } + + // we need to hold this + OnPacketsArriveCB_ = onPacketsArrive; + + // Backward Compatibility + // Supplant function to use socket type, pass in same cookie + auto res = m_Socket[0]->receivePackets(onPacketsArriveSocketZero, onPacketsArriveUserCookie, timeoutMS); + + return res; + } + + void XdpDevice::stopReceivePackets() + { + if (!m_Socket.empty()) + { + m_Socket[0]->m_ReceivingPackets = false; + } + } + + bool XdpDevice::sendPackets(const std::function& getPacketAt, + const std::function& getPacketCount, bool waitForTxCompletion, + int waitForTxCompletionTimeoutMS) + { + if (!m_DeviceOpened) + { + PCPP_LOG_ERROR("Device is not open"); + return false; + } + + if (m_Socket.empty()) + { + PCPP_LOG_ERROR("Device has no queues or sockets"); + return false; + } + + // Backward Compatibility + // Supplant function to use socket type, pass in same cookie + auto res = m_Socket[0]->sendPackets(getPacketAt, getPacketCount, waitForTxCompletion, waitForTxCompletionTimeoutMS); + return(res); + } + + bool XdpDevice::sendPackets(const RawPacketVector& packets, bool waitForTxCompletion, + int waitForTxCompletionTimeoutMS) + { + return sendPackets([&](uint32_t i) { return *packets.at(static_cast(i)); }, + [&]() { return packets.size(); }, waitForTxCompletion, waitForTxCompletionTimeoutMS); + } + + bool XdpDevice::sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion, + int waitForTxCompletionTimeoutMS) + { + return sendPackets([&](uint32_t i) { return packets[i]; }, [&]() { return static_cast(packetCount); }, + waitForTxCompletion, waitForTxCompletionTimeoutMS); + } + + bool XdpDevice::initConfig() + { + if (!m_Config) + { + m_Config = new XdpDeviceConfiguration(); + } + + uint16_t numFrames = m_Config->umemNumFrames ? m_Config->umemNumFrames : DEFAULT_UMEM_NUM_FRAMES; + uint16_t frameSize = m_Config->umemFrameSize ? m_Config->umemFrameSize : getpagesize(); + uint32_t fillRingSize = m_Config->fillRingSize ? m_Config->fillRingSize : DEFAULT_FILL_RING_SIZE; + uint32_t completionRingSize = + m_Config->completionRingSize ? m_Config->completionRingSize : DEFAULT_COMPLETION_RING_SIZE; + uint32_t rxSize = m_Config->rxSize ? m_Config->rxSize : XSK_RING_CONS__DEFAULT_NUM_DESCS; + uint32_t txSize = m_Config->txSize ? m_Config->txSize : XSK_RING_PROD__DEFAULT_NUM_DESCS; + uint32_t batchSize = m_Config->rxTxBatchSize ? m_Config->rxTxBatchSize : DEFAULT_BATCH_SIZE; + uint32_t nQueues = m_Config->numQueues ? m_Config->numQueues : DEFAULT_NUMBER_QUEUES; + + if (frameSize != getpagesize()) + { + PCPP_LOG_ERROR("UMEM frame size must match the memory page size (" << getpagesize() << ")"); + return false; + } + + if (!(IS_POWER_OF_TWO(fillRingSize) && IS_POWER_OF_TWO(completionRingSize) && IS_POWER_OF_TWO(rxSize) && + IS_POWER_OF_TWO(txSize))) + { + PCPP_LOG_ERROR("All ring sizes (fill ring, completion ring, rx ring, tx ring) should be a power of two"); + return false; + } + + if (fillRingSize > numFrames) + { + PCPP_LOG_ERROR("Fill ring size (" << fillRingSize + << ") must be lower or equal to the total number of UMEM frames (" + << numFrames << ")"); + return false; + } + + if (completionRingSize > numFrames) + { + PCPP_LOG_ERROR("Completion ring size (" << completionRingSize + << ") must be lower or equal to the total number of UMEM frames (" + << numFrames << ")"); + return false; + } + + if (rxSize > numFrames) + { + PCPP_LOG_ERROR("RX size (" << rxSize << ") must be lower or equal to the total number of UMEM frames (" + << numFrames << ")"); + return false; + } + + if (txSize > numFrames) + { + PCPP_LOG_ERROR("TX size (" << txSize << ") must be lower or equal to the total number of UMEM frames (" + << numFrames << ")"); + return false; + } + + if (batchSize > rxSize || batchSize > txSize) + { + PCPP_LOG_ERROR("RX/TX batch size (" << batchSize << ") must be lower or equal to RX/TX ring size"); + return false; + } + + unsigned int ncores = std::thread::hardware_concurrency(); + if (nQueues > ncores || nQueues > PCPP_MAX_XDP_NUMBER_QUEUES) + { + // Limit queues to be no more than hardware cores and hardware queues, + // for now trust that the application knows this + PCPP_LOG_ERROR("Number of queues (" << nQueues << ") must be lower than " << ncores << " cores and the maximum allowed"); + return false; + } + + m_Config->umemNumFrames = numFrames; + m_Config->umemFrameSize = frameSize; + m_Config->fillRingSize = fillRingSize; + m_Config->completionRingSize = completionRingSize; + m_Config->rxSize = rxSize; + m_Config->txSize = txSize; + m_Config->rxTxBatchSize = batchSize; + m_Config->numQueues = nQueues; + + return true; + } + + bool XdpDevice::open() + { + if (m_DeviceOpened) + { + PCPP_LOG_ERROR("Device already opened"); + return false; + } + + if (initConfig()) + { + // construct and configure for each queue and socket + for(uint32_t i = 0; i < m_Config->numQueues; i++) + { + auto socket = std::make_unique(this, i); + if(socket->configureSocket() == false) + { + PCPP_LOG_ERROR("Device failed to configure"); + + // this should delete any XdpSocket within using unique_ptr + m_Socket.clear(); + return false; + } + + m_Socket.push_back(std::move(socket)); + } + } + else + { + if (m_Config) + { + delete m_Config; + m_Config = nullptr; + } + return false; + } + + m_DeviceOpened = true; + return m_DeviceOpened; + } + + bool XdpDevice::open(const XdpDeviceConfiguration& config) + { + m_Config = new XdpDeviceConfiguration(config); + return open(); + } + + void XdpDevice::close() + { + if (isOpened()) + { + // this should free up all the sockets using unique_ptr + m_Socket.clear(); + + m_DeviceOpened = false; + delete m_Config; + m_Config = nullptr; + } + } + + XdpDevice::XdpDeviceStats XdpDevice::getStatistics() + { + if (!m_Socket.empty()) + { + return(m_Socket[0]->getStatistics()); + } + + XdpDeviceStats nullstats; + memset(&nullstats, 0, sizeof(XdpDeviceStats)); + return nullstats; + } + + XdpSocket::XdpUmem::XdpUmem(uint16_t numFrames, uint16_t frameSize, uint32_t fillRingSize, uint32_t completionRingSize) { size_t bufferSize = numFrames * frameSize; @@ -74,24 +322,24 @@ namespace pcpp m_FrameCount = numFrames; } - XdpDevice::XdpUmem::~XdpUmem() + XdpSocket::XdpUmem::~XdpUmem() { xsk_umem__delete(static_cast(m_UmemInfo)->umem); free(m_Buffer); } - const uint8_t* XdpDevice::XdpUmem::getDataPtr(uint64_t addr) const + const uint8_t* XdpSocket::XdpUmem::getDataPtr(uint64_t addr) const { return static_cast(xsk_umem__get_data(m_Buffer, addr)); } - void XdpDevice::XdpUmem::setData(uint64_t addr, const uint8_t* data, size_t dataLen) + void XdpSocket::XdpUmem::setData(uint64_t addr, const uint8_t* data, size_t dataLen) { auto dataPtr = static_cast(xsk_umem__get_data(m_Buffer, addr)); memcpy(dataPtr, data, dataLen); } - std::pair> XdpDevice::XdpUmem::allocateFrames(uint32_t count) + std::pair> XdpSocket::XdpUmem::allocateFrames(uint32_t count) { if (m_FreeFrames.size() < count) { @@ -110,28 +358,81 @@ namespace pcpp return { true, result }; } - void XdpDevice::XdpUmem::freeFrame(uint64_t addr) + void XdpSocket::XdpUmem::freeFrame(uint64_t addr) { auto frame = (uint64_t)((addr / m_FrameSize) * m_FrameSize); m_FreeFrames.push_back(frame); } - XdpDevice::XdpDevice(std::string interfaceName) - : m_InterfaceName(std::move(interfaceName)), m_Config(nullptr), m_ReceivingPackets(false), m_Umem(nullptr), - m_SocketInfo(nullptr) + bool XdpSocket::getSocketStats() { - memset(&m_Stats, 0, sizeof(m_Stats)); - memset(&m_PrevStats, 0, sizeof(m_PrevStats)); + auto socketInfo = static_cast(m_SocketInfo); + int fd = xsk_socket__fd(socketInfo->xsk); + + struct xdp_statistics socketStats; + socklen_t optlen = sizeof(socketStats); + + int err = getsockopt(fd, SOL_XDP, XDP_STATISTICS, &socketStats, &optlen); + if (err) + { + PCPP_LOG_ERROR("Error getting stats from socket, return error: " << err); + return false; + } + + if (optlen != sizeof(struct xdp_statistics)) + { + PCPP_LOG_ERROR("Error getting stats from socket: optlen (" << optlen << ") != expected size (" + << sizeof(struct xdp_statistics) << ")"); + return false; + } + + m_Stats.rxDroppedInvalidPackets = socketStats.rx_invalid_descs; + m_Stats.rxDroppedRxRingFullPackets = socketStats.rx_ring_full; + m_Stats.rxDroppedFillRingPackets = socketStats.rx_fill_ring_empty_descs; + m_Stats.rxDroppedTotalPackets = m_Stats.rxDroppedFillRingPackets + m_Stats.rxDroppedRxRingFullPackets + + m_Stats.rxDroppedInvalidPackets + socketStats.rx_dropped; + m_Stats.txDroppedInvalidPackets = socketStats.tx_invalid_descs; + + return true; } + - XdpDevice::~XdpDevice() + // XdpSocket implementation + + XdpSocket::XdpSocket(XdpDevice *device, uint32_t qid) { - close(); + m_Device = device; + m_Queueid = qid; + + m_ReceivingPackets = false; + m_Umem = nullptr; + m_SocketInfo = nullptr; + memset(&m_Stats, 0, sizeof(XdpDevice::XdpDeviceStats)); + memset(&m_PrevStats, 0, sizeof(XdpDevice::XdpPrevDeviceStats)); } - bool XdpDevice::receivePackets(OnPacketsArrive onPacketsArrive, void* onPacketsArriveUserCookie, int timeoutMS) + XdpSocket::~XdpSocket() { - if (!m_DeviceOpened) + // This is not a custodial pointer and should not be deleted + m_Device = nullptr; + + if(m_SocketInfo) + { + auto socketInfo = static_cast(m_SocketInfo); + xsk_socket__delete(socketInfo->xsk); + m_SocketInfo = nullptr; + } + + if(m_Umem) + { + delete m_Umem; + m_Umem = nullptr; + } + } + + bool XdpSocket::receivePackets(OnPacketsArriveSocket onPacketsArriveSocket, void* onPacketsArriveSocketUserCookie, int timeoutMS) + { + if (!m_Device->isOpened()) { PCPP_LOG_ERROR("Device is not open"); return false; @@ -169,6 +470,8 @@ namespace pcpp return true; } + XdpDevice::XdpDeviceConfiguration* m_Config = m_Device->getConfig(); + uint32_t receivedPacketsCount = xsk_ring_cons__peek(&socketInfo->rx, m_Config->rxTxBatchSize, &rxId); if (receivedPacketsCount == 0) @@ -198,7 +501,7 @@ namespace pcpp m_Umem->freeFrame(addr); } - onPacketsArrive(receiveBuffer.data(), receiveBuffer.size(), this, onPacketsArriveUserCookie); + onPacketsArriveSocket(receiveBuffer.data(), receiveBuffer.size(), this, onPacketsArriveSocketUserCookie); xsk_ring_cons__release(&socketInfo->rx, receivedPacketsCount); m_Stats.rxRingId = rxId + receivedPacketsCount; @@ -215,21 +518,22 @@ namespace pcpp return true; } - void XdpDevice::stopReceivePackets() + void XdpSocket::stopReceivePackets() { m_ReceivingPackets = false; } - bool XdpDevice::sendPackets(const std::function& getPacketAt, + bool XdpSocket::sendPackets(const std::function& getPacketAt, const std::function& getPacketCount, bool waitForTxCompletion, int waitForTxCompletionTimeoutMS) { - if (!m_DeviceOpened) + if (!m_Device->isOpened()) { PCPP_LOG_ERROR("Device is not open"); return false; } + auto socketInfo = static_cast(m_SocketInfo); checkCompletionRing(); @@ -310,21 +614,21 @@ namespace pcpp return true; } - bool XdpDevice::sendPackets(const RawPacketVector& packets, bool waitForTxCompletion, + bool XdpSocket::sendPackets(const RawPacketVector& packets, bool waitForTxCompletion, int waitForTxCompletionTimeoutMS) { return sendPackets([&](uint32_t i) { return *packets.at(static_cast(i)); }, [&]() { return packets.size(); }, waitForTxCompletion, waitForTxCompletionTimeoutMS); } - bool XdpDevice::sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion, + bool XdpSocket::sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion, int waitForTxCompletionTimeoutMS) { return sendPackets([&](uint32_t i) { return packets[i]; }, [&]() { return static_cast(packetCount); }, waitForTxCompletion, waitForTxCompletionTimeoutMS); } - bool XdpDevice::populateFillRing(uint32_t count, uint32_t rxId) + bool XdpSocket::populateFillRing(uint32_t count, uint32_t rxId) { auto frameResponse = m_Umem->allocateFrames(count); if (!frameResponse.first) @@ -344,7 +648,7 @@ namespace pcpp return result; } - bool XdpDevice::populateFillRing(const std::vector& addresses, uint32_t rxId) + bool XdpSocket::populateFillRing(const std::vector& addresses, uint32_t rxId) { auto umem = static_cast(m_Umem->getInfo()); auto count = static_cast(addresses.size()); @@ -367,8 +671,10 @@ namespace pcpp return true; } - uint32_t XdpDevice::checkCompletionRing() + uint32_t XdpSocket::checkCompletionRing() { + XdpDevice::XdpDeviceConfiguration* m_Config = m_Device->getConfig(); + uint32_t cqId = 0; auto umemInfo = static_cast(m_Umem->getInfo()); @@ -396,8 +702,21 @@ namespace pcpp return completedCount; } - bool XdpDevice::configureSocket() + bool XdpSocket::configureSocket() { + XdpDevice::XdpDeviceConfiguration* m_Config = m_Device->getConfig(); + + if (!(initUmem() && populateFillRing(std::min(m_Config->fillRingSize, static_cast(m_Config->umemNumFrames / 2))))) + { + if(m_Umem) + { + delete m_Umem; + m_Umem = nullptr; + } + + return false; + } + auto socketInfo = new xsk_socket_info(); auto umemInfo = static_cast(m_Umem->getInfo()); @@ -408,18 +727,18 @@ namespace pcpp xskConfig.libbpf_flags = 0; xskConfig.xdp_flags = 0; xskConfig.bind_flags = 0; - if (m_Config->attachMode == XdpDeviceConfiguration::SkbMode) + if (m_Config->attachMode == XdpDevice::XdpDeviceConfiguration::SkbMode) { xskConfig.xdp_flags = XDP_FLAGS_SKB_MODE; xskConfig.bind_flags &= ~XDP_ZEROCOPY; xskConfig.bind_flags |= XDP_COPY; } - else if (m_Config->attachMode == XdpDeviceConfiguration::DriverMode) + else if (m_Config->attachMode == XdpDevice::XdpDeviceConfiguration::DriverMode) { xskConfig.xdp_flags = XDP_FLAGS_DRV_MODE; } - int ret = xsk_socket__create(&socketInfo->xsk, m_InterfaceName.c_str(), 0, umemInfo->umem, &socketInfo->rx, + int ret = xsk_socket__create(&socketInfo->xsk, m_Device->m_InterfaceName.c_str(), m_Queueid, umemInfo->umem, &socketInfo->rx, &socketInfo->tx, &xskConfig); if (ret) { @@ -432,179 +751,25 @@ namespace pcpp return true; } - bool XdpDevice::initUmem() + bool XdpSocket::initUmem() { + XdpDevice::XdpDeviceConfiguration* m_Config = m_Device->getConfig(); + m_Umem = new XdpUmem(m_Config->umemNumFrames, m_Config->umemFrameSize, m_Config->fillRingSize, m_Config->completionRingSize); return true; } - bool XdpDevice::populateConfigDefaults(XdpDeviceConfiguration& config) const - { - uint16_t numFrames = config.umemNumFrames ? config.umemNumFrames : DEFAULT_UMEM_NUM_FRAMES; - uint16_t frameSize = config.umemFrameSize ? config.umemFrameSize : getpagesize(); - uint32_t fillRingSize = config.fillRingSize ? config.fillRingSize : DEFAULT_FILL_RING_SIZE; - uint32_t completionRingSize = - config.completionRingSize ? config.completionRingSize : DEFAULT_COMPLETION_RING_SIZE; - uint32_t rxSize = config.rxSize ? config.rxSize : XSK_RING_CONS__DEFAULT_NUM_DESCS; - uint32_t txSize = config.txSize ? config.txSize : XSK_RING_PROD__DEFAULT_NUM_DESCS; - uint32_t batchSize = config.rxTxBatchSize ? config.rxTxBatchSize : DEFAULT_BATCH_SIZE; - - if (frameSize != getpagesize()) - { - PCPP_LOG_ERROR("UMEM frame size must match the memory page size (" << getpagesize() << ")"); - return false; - } - - if (!(IS_POWER_OF_TWO(fillRingSize) && IS_POWER_OF_TWO(completionRingSize) && IS_POWER_OF_TWO(rxSize) && - IS_POWER_OF_TWO(txSize))) - { - PCPP_LOG_ERROR("All ring sizes (fill ring, completion ring, rx ring, tx ring) should be a power of two"); - return false; - } - - if (fillRingSize > numFrames) - { - PCPP_LOG_ERROR("Fill ring size (" << fillRingSize - << ") must be lower or equal to the total number of UMEM frames (" - << numFrames << ")"); - return false; - } - - if (completionRingSize > numFrames) - { - PCPP_LOG_ERROR("Completion ring size (" << completionRingSize - << ") must be lower or equal to the total number of UMEM frames (" - << numFrames << ")"); - return false; - } - - if (rxSize > numFrames) - { - PCPP_LOG_ERROR("RX size (" << rxSize << ") must be lower or equal to the total number of UMEM frames (" - << numFrames << ")"); - return false; - } - - if (txSize > numFrames) - { - PCPP_LOG_ERROR("TX size (" << txSize << ") must be lower or equal to the total number of UMEM frames (" - << numFrames << ")"); - return false; - } - - if (batchSize > rxSize || batchSize > txSize) - { - PCPP_LOG_ERROR("RX/TX batch size (" << batchSize << ") must be lower or equal to RX/TX ring size"); - return false; - } - - config.umemNumFrames = numFrames; - config.umemFrameSize = frameSize; - config.fillRingSize = fillRingSize; - config.completionRingSize = completionRingSize; - config.rxSize = rxSize; - config.txSize = txSize; - config.rxTxBatchSize = batchSize; - - return true; - } - - bool XdpDevice::open() - { - return open(XdpDeviceConfiguration{}); - } - - bool XdpDevice::open(const XdpDeviceConfiguration& config) - { - if (m_DeviceOpened) - { - PCPP_LOG_ERROR("Device already opened"); - return false; - } - - auto configCopy = std::make_unique(config); - if (!populateConfigDefaults(*configCopy)) - { - return false; - } - m_Config = std::move(configCopy); - - if (!(initUmem() && - populateFillRing(std::min(m_Config->fillRingSize, static_cast(m_Config->umemNumFrames / 2))) && - configureSocket())) - { - if (m_Umem) - { - delete m_Umem; - m_Umem = nullptr; - } - m_Config.reset(); - return false; - } - - memset(&m_Stats, 0, sizeof(m_Stats)); - memset(&m_PrevStats, 0, sizeof(m_PrevStats)); - - m_DeviceOpened = true; - return m_DeviceOpened; - } - - void XdpDevice::close() - { - if (m_DeviceOpened) - { - auto socketInfo = static_cast(m_SocketInfo); - xsk_socket__delete(socketInfo->xsk); - m_DeviceOpened = false; - delete m_Umem; - m_Config = nullptr; - m_Umem = nullptr; - } - } - - bool XdpDevice::getSocketStats() - { - auto socketInfo = static_cast(m_SocketInfo); - int fd = xsk_socket__fd(socketInfo->xsk); - - struct xdp_statistics socketStats; - socklen_t optlen = sizeof(socketStats); - - int err = getsockopt(fd, SOL_XDP, XDP_STATISTICS, &socketStats, &optlen); - if (err) - { - PCPP_LOG_ERROR("Error getting stats from socket, return error: " << err); - return false; - } - - if (optlen != sizeof(struct xdp_statistics)) - { - PCPP_LOG_ERROR("Error getting stats from socket: optlen (" << optlen << ") != expected size (" - << sizeof(struct xdp_statistics) << ")"); - return false; - } - - m_Stats.rxDroppedInvalidPackets = socketStats.rx_invalid_descs; - m_Stats.rxDroppedRxRingFullPackets = socketStats.rx_ring_full; - m_Stats.rxDroppedFillRingPackets = socketStats.rx_fill_ring_empty_descs; - m_Stats.rxDroppedTotalPackets = m_Stats.rxDroppedFillRingPackets + m_Stats.rxDroppedRxRingFullPackets + - m_Stats.rxDroppedInvalidPackets + socketStats.rx_dropped; - m_Stats.txDroppedInvalidPackets = socketStats.tx_invalid_descs; - - return true; - } - #define nanosec_gap(begin, end) ((end.tv_sec - begin.tv_sec) * 1'000'000'000.0 + (end.tv_nsec - begin.tv_nsec)) - XdpDevice::XdpDeviceStats XdpDevice::getStatistics() + XdpDevice::XdpDeviceStats XdpSocket::getStatistics() { timespec timestamp; clock_gettime(CLOCK_MONOTONIC, ×tamp); m_Stats.timestamp = timestamp; - if (m_DeviceOpened) + if (m_Device->isOpened()) { getSocketStats(); m_Stats.umemFreeFrames = m_Umem->getFreeFrameCount(); @@ -635,5 +800,4 @@ namespace pcpp return m_Stats; } - } // namespace pcpp